1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <my_global.h>
26 #include "Suma.hpp"
27 
28 #include <ndb_version.h>
29 
30 #include <NdbTCP.h>
31 #include <Bitmask.hpp>
32 #include <SimpleProperties.hpp>
33 
34 #include <signaldata/NodeFailRep.hpp>
35 #include <signaldata/ReadNodesConf.hpp>
36 
37 #include <signaldata/ListTables.hpp>
38 #include <signaldata/GetTabInfo.hpp>
39 #include <signaldata/GetTableId.hpp>
40 #include <signaldata/DictTabInfo.hpp>
41 #include <signaldata/SumaImpl.hpp>
42 #include <signaldata/ScanFrag.hpp>
43 #include <signaldata/TransIdAI.hpp>
44 #include <signaldata/CreateTrigImpl.hpp>
45 #include <signaldata/DropTrigImpl.hpp>
46 #include <signaldata/FireTrigOrd.hpp>
47 #include <signaldata/TrigAttrInfo.hpp>
48 #include <signaldata/CheckNodeGroups.hpp>
49 #include <signaldata/CreateTab.hpp>
50 #include <signaldata/DropTab.hpp>
51 #include <signaldata/AlterTable.hpp>
52 #include <signaldata/AlterTab.hpp>
53 #include <signaldata/DihScanTab.hpp>
54 #include <signaldata/SystemError.hpp>
55 #include <signaldata/GCP.hpp>
56 #include <signaldata/StopMe.hpp>
57 
58 #include <signaldata/DictLock.hpp>
59 #include <ndbapi/NdbDictionary.hpp>
60 
61 #include <DebuggerNames.hpp>
62 #include "../dbtup/Dbtup.hpp"
63 #include "../dbdih/Dbdih.hpp"
64 
65 #include <signaldata/CreateNodegroup.hpp>
66 #include <signaldata/CreateNodegroupImpl.hpp>
67 
68 #include <signaldata/DropNodegroup.hpp>
69 #include <signaldata/DropNodegroupImpl.hpp>
70 
71 #include <signaldata/DbinfoScan.hpp>
72 #include <signaldata/TransIdAI.hpp>
73 
74 #include <EventLogger.hpp>
75 extern EventLogger * g_eventLogger;
76 
77 //#define HANDOVER_DEBUG
78 //#define NODEFAIL_DEBUG
79 //#define NODEFAIL_DEBUG2
80 //#define DEBUG_SUMA_SEQUENCE
81 //#define EVENT_DEBUG
82 //#define EVENT_PH3_DEBUG
83 //#define EVENT_DEBUG2
84 #if 1
85 #undef DBUG_ENTER
86 #undef DBUG_PRINT
87 #undef DBUG_RETURN
88 #undef DBUG_VOID_RETURN
89 
90 #if 0
91 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
92 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
93 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
94 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
95 #else
96 #define DBUG_ENTER(a)
97 #define DBUG_PRINT(a,b)
98 #define DBUG_RETURN(a) return a
99 #define DBUG_VOID_RETURN return
100 #endif
101 
102 #endif
103 
104 #define DBG_3R 0
105 
106 /**
107  * @todo:
108  * SUMA crashes if an index is created at the same time as
109  * global replication. Very easy to reproduce using testIndex.
110  * Note: This only happens occasionally, but is quite easy to reprod.
111  */
112 
113 Uint32 g_subPtrI = RNIL;
114 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
115 
116 static const Uint32 MAX_CONCURRENT_GCP = 2;
117 
118 /**************************************************************
119  *
120  * Start of suma
121  *
122  */
123 
124 #define PRINT_ONLY 0
125 
126 void
execREAD_CONFIG_REQ(Signal * signal)127 Suma::execREAD_CONFIG_REQ(Signal* signal)
128 {
129   jamEntry();
130 
131   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
132 
133   Uint32 ref = req->senderRef;
134   Uint32 senderData = req->senderData;
135 
136   const ndb_mgm_configuration_iterator * p =
137     m_ctx.m_config.getOwnConfigIterator();
138   ndbrequire(p != 0);
139 
140   // SumaParticipant
141   Uint32 noTables, noAttrs, maxBufferedEpochs;
142   ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE,
143 			    &noTables);
144   ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE,
145 			    &noAttrs);
146   ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_EPOCHS,
147                             &maxBufferedEpochs);
148 
149   c_tablePool.setSize(noTables);
150   c_tables.setSize(noTables);
151 
152   c_subscriptions.setSize(noTables);
153 
154   Uint32 cnt = 0;
155   cnt = 0;
156   ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
157   if (cnt == 0)
158   {
159     jam();
160     cnt = noTables;
161   }
162   c_subscriptionPool.setSize(cnt);
163 
164   cnt *= 2;
165   {
166     Uint32 val = 0;
167     ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val);
168     if (val)
169     {
170       jam();
171       cnt =  val;
172     }
173   }
174   c_subscriberPool.setSize(cnt);
175 
176   cnt = 0;
177   ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
178   if (cnt)
179     c_subOpPool.setSize(cnt);
180   else
181     c_subOpPool.setSize(256);
182 
183   c_syncPool.setSize(2);
184 
185   // Trix: max 5 concurrent index stats ops with max 9 words bounds
186   Uint32 noOfBoundWords = 5 * 9;
187 
188   // XXX multiplies number of words by 15 ???
189   c_dataBufferPool.setSize(noAttrs + noOfBoundWords);
190 
191   c_maxBufferedEpochs = maxBufferedEpochs;
192 
193   // Calculate needed gcp pool as 10 records + the ones needed
194   // during a possible api timeout
195   Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0;
196   ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
197 			    &dbApiHbInterval);
198   ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
199                             &gcpInterval);
200   ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL,
201                             &microGcpInterval);
202 
203   if (microGcpInterval)
204   {
205     gcpInterval = microGcpInterval;
206   }
207   c_gcp_pool.setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval);
208 
209   c_page_chunk_pool.setSize(50);
210 
211   {
212     SLList<SyncRecord> tmp(c_syncPool);
213     Ptr<SyncRecord> ptr;
214     while(tmp.seize(ptr))
215       new (ptr.p) SyncRecord(* this, c_dataBufferPool);
216     tmp.release();
217   }
218 
219   // Suma
220   c_masterNodeId = getOwnNodeId();
221 
222   c_nodeGroup = c_noNodesInGroup = 0;
223   for (int i = 0; i < MAX_REPLICAS; i++) {
224     c_nodesInGroup[i]   = 0;
225   }
226 
227   m_first_free_page= RNIL;
228 
229   c_no_of_buckets = 0;
230   memset(c_buckets, 0, sizeof(c_buckets));
231   for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
232   {
233     Bucket* bucket= c_buckets+i;
234     bucket->m_buffer_tail = RNIL;
235     bucket->m_buffer_head.m_page_id = RNIL;
236     bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
237   }
238 
239   m_max_seen_gci = 0;      // FIRE_TRIG_ORD
240   m_max_sent_gci = 0;      // FIRE_TRIG_ORD -> send
241   m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
242   m_gcp_complete_rep_count = 0;
243   m_out_of_buffer_gci = 0;
244   m_missing_data = false;
245 
246   c_startup.m_wait_handover= false;
247   c_failedApiNodes.clear();
248 
249   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
250   conf->senderRef = reference();
251   conf->senderData = senderData;
252   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
253 	     ReadConfigConf::SignalLength, JBB);
254 }
255 
256 void
execSTTOR(Signal * signal)257 Suma::execSTTOR(Signal* signal) {
258   jamEntry();
259 
260   DBUG_ENTER("Suma::execSTTOR");
261   m_startphase  = signal->theData[1];
262   m_typeOfStart = signal->theData[7];
263 
264   DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
265 		     m_startphase, m_typeOfStart));
266 
267   if(m_startphase == 3)
268   {
269     jam();
270     void* ptr = m_ctx.m_mm.get_memroot();
271     c_page_pool.set((Buffer_page*)ptr, (Uint32)~0);
272   }
273 
274   if(m_startphase == 5)
275   {
276     jam();
277 
278     if (ERROR_INSERTED(13029)) /* Hold startphase 5 */
279     {
280       sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal,
281                           30, signal->getLength());
282       DBUG_VOID_RETURN;
283     }
284 
285     signal->theData[0] = reference();
286     sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
287     DBUG_VOID_RETURN;
288   }
289 
290   if(m_startphase == 7)
291   {
292     if (m_typeOfStart != NodeState::ST_NODE_RESTART &&
293 	m_typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
294     {
295       for( Uint32 i = 0; i < c_no_of_buckets; i++)
296       {
297 	if (get_responsible_node(i) == getOwnNodeId())
298 	{
299 	  // I'm running this bucket
300 	  DBUG_PRINT("info",("bucket %u set to true", i));
301 	  m_active_buckets.set(i);
302 	  ndbout_c("m_active_buckets.set(%d)", i);
303 	}
304       }
305     }
306 
307     if(!m_active_buckets.isclear())
308     {
309       NdbNodeBitmask tmp;
310       Uint32 bucket = 0;
311       while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound)
312       {
313 	tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask));
314 	bucket++;
315       }
316 
317       ndbassert(tmp.get(getOwnNodeId()));
318       m_gcp_complete_rep_count = m_active_buckets.count();
319     }
320     else
321       m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep
322 
323     if(m_typeOfStart == NodeState::ST_INITIAL_START &&
324        c_masterNodeId == getOwnNodeId())
325     {
326       jam();
327       createSequence(signal);
328       DBUG_VOID_RETURN;
329     }//if
330 
331     if (ERROR_INSERTED(13030))
332     {
333       ndbout_c("Dont start handover");
334       DBUG_VOID_RETURN;
335     }
336   }//if
337 
338   if(m_startphase == 100)
339   {
340     /**
341      * Allow API's to connect
342      */
343     sendSTTORRY(signal);
344     DBUG_VOID_RETURN;
345   }
346 
347   if(m_startphase == 101)
348   {
349     if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
350 	m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
351     {
352       /**
353        * Handover code here
354        */
355       c_startup.m_wait_handover= true;
356       check_start_handover(signal);
357       DBUG_VOID_RETURN;
358     }
359   }
360   sendSTTORRY(signal);
361 
362   DBUG_VOID_RETURN;
363 }
364 
365 #include <ndb_version.h>
366 
367 void
send_dict_lock_req(Signal * signal,Uint32 state)368 Suma::send_dict_lock_req(Signal* signal, Uint32 state)
369 {
370   if (state == DictLockReq::SumaStartMe &&
371       !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
372   {
373     jam();
374     goto notsupported;
375   }
376   else if (state == DictLockReq::SumaHandOver &&
377            !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
378   {
379     jam();
380     goto notsupported;
381   }
382 
383   {
384     jam();
385     DictLockReq* req = (DictLockReq*)signal->getDataPtrSend();
386     req->lockType = state;
387     req->userPtr = state;
388     req->userRef = reference();
389     sendSignal(calcDictBlockRef(c_masterNodeId),
390                GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
391   }
392   return;
393 
394 notsupported:
395   DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
396   conf->userPtr = state;
397   execDICT_LOCK_CONF(signal);
398 }
399 
400 void
execDICT_LOCK_CONF(Signal * signal)401 Suma::execDICT_LOCK_CONF(Signal* signal)
402 {
403   jamEntry();
404 
405   DictLockConf* conf = (DictLockConf*)signal->getDataPtr();
406   Uint32 state = conf->userPtr;
407 
408   switch(state){
409   case DictLockReq::SumaStartMe:
410     jam();
411     c_startup.m_restart_server_node_id = 0;
412     CRASH_INSERTION(13039);
413     send_start_me_req(signal);
414     return;
415   case DictLockReq::SumaHandOver:
416     jam();
417     send_handover_req(signal, SumaHandoverReq::RT_START_NODE);
418     return;
419   default:
420     jam();
421     jamLine(state);
422     ndbrequire(false);
423   }
424 }
425 
426 void
execDICT_LOCK_REF(Signal * signal)427 Suma::execDICT_LOCK_REF(Signal* signal)
428 {
429   jamEntry();
430 
431   DictLockRef* ref = (DictLockRef*)signal->getDataPtr();
432   Uint32 state = ref->userPtr;
433 
434   ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
435   signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
436   signal->theData[1] = state;
437   sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 2);
438 }
439 
440 void
send_dict_unlock_ord(Signal * signal,Uint32 state)441 Suma::send_dict_unlock_ord(Signal* signal, Uint32 state)
442 {
443   if (state == DictLockReq::SumaStartMe &&
444       !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
445   {
446     jam();
447     return;
448   }
449   else if (state == DictLockReq::SumaHandOver &&
450            !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
451   {
452     jam();
453     return;
454   }
455 
456   jam();
457   DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend();
458   ord->lockPtr = 0;
459   ord->lockType = state;
460   ord->senderData = state;
461   ord->senderRef = reference();
462   sendSignal(calcDictBlockRef(c_masterNodeId),
463              GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB);
464 }
465 
466 void
send_start_me_req(Signal * signal)467 Suma::send_start_me_req(Signal* signal)
468 {
469   Uint32 nodeId= c_startup.m_restart_server_node_id;
470   do {
471     nodeId = c_alive_nodes.find(nodeId + 1);
472 
473     if(nodeId == getOwnNodeId())
474       continue;
475     if(nodeId == NdbNodeBitmask::NotFound)
476     {
477       nodeId = 0;
478       continue;
479     }
480     break;
481   } while(true);
482 
483 
484   infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
485   c_startup.m_restart_server_node_id= nodeId;
486   sendSignal(calcSumaBlockRef(nodeId),
487 	     GSN_SUMA_START_ME_REQ, signal, 1, JBB);
488 }
489 
490 void
execSUMA_START_ME_REF(Signal * signal)491 Suma::execSUMA_START_ME_REF(Signal* signal)
492 {
493   const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
494 
495   Uint32 error = ref->errorCode;
496   if (error != SumaStartMeRef::Busy && error != SumaStartMeRef::NotStarted)
497   {
498     jam();
499     // for some reason we did not manage to create a subscription
500     // on the starting node
501     SystemError * const sysErr = (SystemError*)&signal->theData[0];
502     sysErr->errorCode = SystemError::CopySubscriptionRef;
503     sysErr->errorRef = reference();
504     sysErr->data[0] = error;
505     sysErr->data[1] = 0;
506     sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal,
507                SystemError::SignalLength, JBB);
508     return;
509   }
510 
511   infoEvent("Suma: node %d refused %d",
512 	    c_startup.m_restart_server_node_id, ref->errorCode);
513 
514   send_start_me_req(signal);
515 }
516 
517 void
execSUMA_START_ME_CONF(Signal * signal)518 Suma::execSUMA_START_ME_CONF(Signal* signal)
519 {
520   infoEvent("Suma: node %d has completed restoring me",
521 	    c_startup.m_restart_server_node_id);
522   sendSTTORRY(signal);
523   send_dict_unlock_ord(signal, DictLockReq::SumaStartMe);
524   c_startup.m_restart_server_node_id= 0;
525 }
526 
527 void
createSequence(Signal * signal)528 Suma::createSequence(Signal* signal)
529 {
530   jam();
531   DBUG_ENTER("Suma::createSequence");
532 
533   UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
534 
535   req->senderData  = RNIL;
536   req->sequenceId  = SUMA_SEQUENCE;
537   req->requestType = UtilSequenceReq::Create;
538   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
539 	     signal, UtilSequenceReq::SignalLength, JBB);
540   // execUTIL_SEQUENCE_CONF will call createSequenceReply()
541   DBUG_VOID_RETURN;
542 }
543 
544 void
createSequenceReply(Signal * signal,UtilSequenceConf * conf,UtilSequenceRef * ref)545 Suma::createSequenceReply(Signal* signal,
546 			  UtilSequenceConf * conf,
547 			  UtilSequenceRef * ref)
548 {
549   jam();
550 
551   if (ref != NULL)
552   {
553     switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
554     {
555       case UtilSequenceRef::NoSuchSequence:
556         ndbrequire(false);
557       case UtilSequenceRef::TCError:
558       {
559         char buf[128];
560         BaseString::snprintf(buf, sizeof(buf),
561                  "Startup failed during sequence creation. TC error %d",
562                  ref->TCErrorCode);
563         progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
564       }
565     }
566     ndbrequire(false);
567   }
568 
569   sendSTTORRY(signal);
570 }
571 
572 void
execREAD_NODESCONF(Signal * signal)573 Suma::execREAD_NODESCONF(Signal* signal){
574   jamEntry();
575   ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
576 
577   if(getNodeState().getNodeRestartInProgress())
578   {
579     c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
580     c_alive_nodes.set(getOwnNodeId());
581   }
582   else
583   {
584     c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
585     NdbNodeBitmask tmp;
586     tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
587     ndbrequire(tmp.isclear()); // No nodes can be started during SR
588   }
589 
590   if (DBG_3R)
591   {
592     for (Uint32 i = 0; i<MAX_NDB_NODES; i++)
593     {
594       if (c_alive_nodes.get(i))
595         ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, i);
596     }
597   }
598 
599   c_masterNodeId = conf->masterNodeId;
600 
601   getNodeGroupMembers(signal);
602 }
603 
604 void
getNodeGroupMembers(Signal * signal)605 Suma::getNodeGroupMembers(Signal* signal)
606 {
607   jam();
608   DBUG_ENTER("Suma::getNodeGroupMembers");
609   /**
610    * Ask DIH for nodeGroupMembers
611    */
612   CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
613   sd->blockRef = reference();
614   sd->requestType = CheckNodeGroups::GetNodeGroupMembers;
615   sd->nodeId = getOwnNodeId();
616   sd->senderData = RNIL;
617   sendSignal(DBDIH_REF, GSN_CHECKNODEGROUPSREQ, signal,
618              CheckNodeGroups::SignalLength, JBB);
619   DBUG_VOID_RETURN;
620 }
621 
622 static
623 bool
valid_seq(Uint32 n,Uint32 r,Uint16 dst[])624 valid_seq(Uint32 n, Uint32 r, Uint16 dst[])
625 {
626   Uint16 tmp[MAX_REPLICAS];
627   for (Uint32 i = 0; i<r; i++)
628   {
629     tmp[i] = n % r;
630     for (Uint32 j = 0; j<i; j++)
631       if (tmp[j] == tmp[i])
632         return false;
633     n /= r;
634   }
635 
636   /**
637    * reverse order for backward compatibility (with 2 replica)
638    */
639   for (Uint32 i = 0; i<r; i++)
640     dst[i] = tmp[r-i-1];
641 
642   return true;
643 }
644 
645 void
fix_nodegroup()646 Suma::fix_nodegroup()
647 {
648   Uint32 i, pos= 0;
649 
650   for (i = 0; i < MAX_NDB_NODES; i++)
651   {
652     if (c_nodes_in_nodegroup_mask.get(i))
653     {
654       c_nodesInGroup[pos++] = i;
655     }
656   }
657 
658   const Uint32 replicas= c_noNodesInGroup = pos;
659 
660   if (replicas)
661   {
662     Uint32 buckets= 1;
663     for(i = 1; i <= replicas; i++)
664       buckets *= i;
665 
666     Uint32 tot = 0;
667     switch(replicas){
668     case 1:
669       tot = 1;
670       break;
671     case 2:
672       tot = 4; // 2^2
673       break;
674     case 3:
675       tot = 27; // 3^3
676       break;
677     case 4:
678       tot = 256; // 4^4
679       break;
680       ndbrequire(false);
681     }
682     Uint32 cnt = 0;
683     for (i = 0; i<tot; i++)
684     {
685       Bucket* ptr= c_buckets + cnt;
686       if (valid_seq(i, replicas, ptr->m_nodes))
687       {
688         jam();
689         if (DBG_3R) printf("bucket %u : ", cnt);
690         for (Uint32 j = 0; j<replicas; j++)
691         {
692           ptr->m_nodes[j] = c_nodesInGroup[ptr->m_nodes[j]];
693           if (DBG_3R) printf("%u ", ptr->m_nodes[j]);
694         }
695         if (DBG_3R) printf("\n");
696         cnt++;
697       }
698     }
699     ndbrequire(cnt == buckets);
700     c_no_of_buckets= buckets;
701   }
702   else
703   {
704     jam();
705     c_no_of_buckets = 0;
706   }
707 }
708 
709 
710 void
execCHECKNODEGROUPSCONF(Signal * signal)711 Suma::execCHECKNODEGROUPSCONF(Signal *signal)
712 {
713   const CheckNodeGroups *sd = (const CheckNodeGroups *)signal->getDataPtrSend();
714   DBUG_ENTER("Suma::execCHECKNODEGROUPSCONF");
715   jamEntry();
716 
717   c_nodeGroup = sd->output;
718   c_nodes_in_nodegroup_mask.assign(sd->mask);
719   c_noNodesInGroup = c_nodes_in_nodegroup_mask.count();
720 
721   fix_nodegroup();
722 
723 #ifndef DBUG_OFF
724   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
725     DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, "
726 		       "member[%u] %u",
727 		       c_nodeGroup, getOwnNodeId(),
728 		       i, c_nodesInGroup[i]));
729   }
730 #endif
731 
732   c_startup.m_restart_server_node_id = 0;
733   if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
734       m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
735   {
736     jam();
737 
738     send_dict_lock_req(signal, DictLockReq::SumaStartMe);
739 
740     return;
741   }
742 
743   c_startup.m_restart_server_node_id = 0;
744   sendSTTORRY(signal);
745 
746   DBUG_VOID_RETURN;
747 }
748 
749 void
execAPI_START_REP(Signal * signal)750 Suma::execAPI_START_REP(Signal* signal)
751 {
752   Uint32 nodeId = signal->theData[0];
753   c_connected_nodes.set(nodeId);
754 
755   check_start_handover(signal);
756 }
757 
758 void
check_start_handover(Signal * signal)759 Suma::check_start_handover(Signal* signal)
760 {
761   if(c_startup.m_wait_handover)
762   {
763     NodeBitmask tmp;
764     tmp.assign(c_connected_nodes);
765     tmp.bitAND(c_subscriber_nodes);
766     if(!c_subscriber_nodes.equal(tmp))
767     {
768       return;
769     }
770 
771     c_startup.m_wait_handover= false;
772 
773     if (c_no_of_buckets)
774     {
775       jam();
776       send_dict_lock_req(signal, DictLockReq::SumaHandOver);
777     }
778     else
779     {
780       jam();
781       sendSTTORRY(signal);
782     }
783   }
784 }
785 
786 void
send_handover_req(Signal * signal,Uint32 type)787 Suma::send_handover_req(Signal* signal, Uint32 type)
788 {
789   jam();
790   c_startup.m_handover_nodes.assign(c_alive_nodes);
791   c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask);
792   c_startup.m_handover_nodes.clear(getOwnNodeId());
793   Uint32 gci= Uint32(m_last_complete_gci >> 32) + 3;
794 
795   SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
796   char buf[255];
797   c_startup.m_handover_nodes.getText(buf);
798   infoEvent("Suma: initiate handover for %s with nodes %s GCI: %u",
799             (type == SumaHandoverReq::RT_START_NODE ? "startup" : "shutdown"),
800             buf,
801             gci);
802 
803   req->gci = gci;
804   req->nodeId = getOwnNodeId();
805   req->requestType = type;
806 
807   NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes);
808   sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
809              SumaHandoverReq::SignalLength, JBB);
810 }
811 
812 void
sendSTTORRY(Signal * signal)813 Suma::sendSTTORRY(Signal* signal){
814   signal->theData[0] = 0;
815   signal->theData[3] = 1;
816   signal->theData[4] = 3;
817   signal->theData[5] = 5;
818   signal->theData[6] = 7;
819   signal->theData[7] = 100;
820   signal->theData[8] = 101;
821   signal->theData[9] = 255; // No more start phases from missra
822   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
823 }
824 
825 void
execNDB_STTOR(Signal * signal)826 Suma::execNDB_STTOR(Signal* signal)
827 {
828   jamEntry();
829 }
830 
831 void
execCONTINUEB(Signal * signal)832 Suma::execCONTINUEB(Signal* signal){
833   jamEntry();
834   Uint32 type= signal->theData[0];
835   switch(type){
836   case SumaContinueB::RELEASE_GCI:
837   {
838     Uint32 gci_hi = signal->theData[2];
839     Uint32 gci_lo = signal->theData[3];
840     Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
841     release_gci(signal, signal->theData[1], gci);
842     return;
843   }
844   case SumaContinueB::RESEND_BUCKET:
845   {
846     Uint32 min_gci_hi = signal->theData[2];
847     Uint32 min_gci_lo = signal->theData[5];
848     Uint32 last_gci_hi = signal->theData[4];
849     Uint32 last_gci_lo = signal->theData[6];
850     Uint64 min_gci = min_gci_lo | (Uint64(min_gci_hi) << 32);
851     Uint64 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
852     resend_bucket(signal,
853 		  signal->theData[1],
854 		  min_gci,
855 		  signal->theData[3],
856 		  last_gci);
857     return;
858   }
859   case SumaContinueB::OUT_OF_BUFFER_RELEASE:
860     out_of_buffer_release(signal, signal->theData[1]);
861     return;
862   case SumaContinueB::API_FAIL_GCI_LIST:
863     api_fail_gci_list(signal, signal->theData[1]);
864     return;
865   case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
866     api_fail_subscriber_list(signal,
867                              signal->theData[1]);
868     return;
869   case SumaContinueB::API_FAIL_SUBSCRIPTION:
870     api_fail_subscription(signal);
871     return;
872   case SumaContinueB::SUB_STOP_REQ:
873     sub_stop_req(signal);
874     return;
875   case SumaContinueB::RETRY_DICT_LOCK:
876     jam();
877     send_dict_lock_req(signal, signal->theData[1]);
878     return;
879   }
880 }
881 
882 /*****************************************************************************
883  *
884  * Node state handling
885  *
886  *****************************************************************************/
887 
execAPI_FAILREQ(Signal * signal)888 void Suma::execAPI_FAILREQ(Signal* signal)
889 {
890   jamEntry();
891   DBUG_ENTER("Suma::execAPI_FAILREQ");
892   Uint32 failedApiNode = signal->theData[0];
893   ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
894 
895   c_connected_nodes.clear(failedApiNode);
896 
897   if (c_failedApiNodes.get(failedApiNode))
898   {
899     jam();
900     /* Being handled already, just conf */
901     goto CONF;
902   }
903 
904   if (!c_subscriber_nodes.get(failedApiNode))
905   {
906     jam();
907     /* No Subscribers on that node, no SUMA
908      * specific work to do
909      */
910     goto BLOCK_CLEANUP;
911   }
912 
913   c_failedApiNodes.set(failedApiNode);
914   c_subscriber_nodes.clear(failedApiNode);
915   c_subscriber_per_node[failedApiNode] = 0;
916   c_failedApiNodesState[failedApiNode] = __LINE__;
917 
918   check_start_handover(signal);
919 
920   signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
921   signal->theData[1] = failedApiNode;
922   sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
923   return;
924 
925 BLOCK_CLEANUP:
926   jam();
927   api_fail_block_cleanup(signal, failedApiNode);
928   DBUG_VOID_RETURN;
929 
930 CONF:
931   jam();
932   signal->theData[0] = failedApiNode;
933   signal->theData[1] = reference();
934   sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
935 
936   c_failedApiNodesState[failedApiNode] = 0;
937 
938   DBUG_VOID_RETURN;
939 }//execAPI_FAILREQ()
940 
941 void
api_fail_block_cleanup_callback(Signal * signal,Uint32 failedNodeId,Uint32 elementsCleaned)942 Suma::api_fail_block_cleanup_callback(Signal* signal,
943                                       Uint32 failedNodeId,
944                                       Uint32 elementsCleaned)
945 {
946   jamEntry();
947 
948   /* Suma should not have any block level elements
949    * to be cleaned (Fragmented send/receive structures etc.)
950    * As it only uses Fragmented send/receive locally
951    */
952   ndbassert(elementsCleaned == 0);
953 
954   /* Node failure handling is complete */
955   signal->theData[0] = failedNodeId;
956   signal->theData[1] = reference();
957   sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
958   c_failedApiNodes.clear(failedNodeId);
959   c_failedApiNodesState[failedNodeId] = 0;
960 }
961 
962 void
api_fail_block_cleanup(Signal * signal,Uint32 failedNode)963 Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
964 {
965   jam();
966 
967   c_failedApiNodesState[failedNode] = __LINE__;
968 
969   Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
970                  failedNode};
971 
972   simBlockNodeFailure(signal, failedNode, cb);
973 }
974 
975 void
api_fail_gci_list(Signal * signal,Uint32 nodeId)976 Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
977 {
978   jam();
979 
980   Ptr<Gcp_record> gcp;
981   if (c_gcp_list.first(gcp))
982   {
983     jam();
984     gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
985 
986     if (gcp.p->m_subscribers.isclear())
987     {
988       jam();
989 
990       SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend();
991       ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32);
992       ack->rep.gci_lo = Uint32(gcp.p->m_gci);
993       ack->rep.senderRef = reference();
994       NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
995       sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
996                  SubGcpCompleteAck::SignalLength, JBB);
997 
998       c_gcp_list.release(gcp);
999 
1000       c_failedApiNodesState[nodeId] = __LINE__;
1001       signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
1002       signal->theData[1] = nodeId;
1003       sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
1004       return;
1005     }
1006   }
1007 
1008   if (ERROR_INSERTED(13023))
1009   {
1010     CLEAR_ERROR_INSERT_VALUE;
1011   }
1012 
1013   signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1014   signal->theData[1] = nodeId;
1015   signal->theData[2] = RNIL; // SubOpPtr
1016   signal->theData[3] = RNIL; // c_subscribers bucket
1017   signal->theData[4] = RNIL; // subscriptionId
1018   signal->theData[5] = RNIL; // SubscriptionKey
1019 
1020   Ptr<SubOpRecord> subOpPtr;
1021   if (c_subOpPool.seize(subOpPtr))
1022   {
1023     c_failedApiNodesState[nodeId] = __LINE__;
1024     signal->theData[2] = subOpPtr.i;
1025     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1026   }
1027   else
1028   {
1029     c_failedApiNodesState[nodeId] = __LINE__;
1030     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1031   }
1032 
1033   return;
1034 }
1035 
1036 void
api_fail_subscriber_list(Signal * signal,Uint32 nodeId)1037 Suma::api_fail_subscriber_list(Signal* signal, Uint32 nodeId)
1038 {
1039   jam();
1040   Ptr<SubOpRecord> subOpPtr;
1041 
1042   if (c_outstanding_drop_trig_req > 9)
1043   {
1044     jam();
1045     /**
1046      * Make sure not to overflow DbtupProxy with too many GSN_DROP_TRIG_IMPL_REQ
1047      *   9 is arbitrary number...
1048      */
1049     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
1050                         signal->getLength());
1051     return;
1052   }
1053 
1054   subOpPtr.i = signal->theData[2];
1055   if (subOpPtr.i == RNIL)
1056   {
1057     if (c_subOpPool.seize(subOpPtr))
1058     {
1059       signal->theData[3] = RNIL;
1060     }
1061     else
1062     {
1063       jam();
1064       sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1065       c_failedApiNodesState[nodeId] = __LINE__;
1066       return;
1067     }
1068   }
1069   else
1070   {
1071     jam();
1072     c_subOpPool.getPtr(subOpPtr);
1073   }
1074 
1075   Uint32 bucket = signal->theData[3];
1076   Uint32 subscriptionId = signal->theData[4];
1077   Uint32 subscriptionKey = signal->theData[5];
1078 
1079   DLHashTable<Subscription>::Iterator iter;
1080   if (bucket == RNIL)
1081   {
1082     jam();
1083     c_subscriptions.first(iter);
1084     c_failedApiNodesState[nodeId] = __LINE__;
1085   }
1086   else
1087   {
1088     jam();
1089 
1090     Subscription key;
1091     key.m_subscriptionId = subscriptionId;
1092     key.m_subscriptionKey = subscriptionKey;
1093     if (c_subscriptions.find(iter.curr, key) == false)
1094     {
1095       jam();
1096       /**
1097        * We restart from this bucket :-(
1098        */
1099       c_subscriptions.next(bucket, iter);
1100       c_failedApiNodesState[nodeId] = __LINE__;
1101     }
1102     else
1103     {
1104       iter.bucket = bucket;
1105     }
1106   }
1107 
1108   if (iter.curr.isNull())
1109   {
1110     jam();
1111     api_fail_block_cleanup(signal, nodeId);
1112     c_failedApiNodesState[nodeId] = __LINE__;
1113     return;
1114   }
1115 
1116   subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
1117   subOpPtr.p->m_subPtrI = iter.curr.i;
1118   subOpPtr.p->m_senderRef = nodeId;
1119   subOpPtr.p->m_senderData = iter.bucket;
1120 
1121   LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req);
1122   bool empty = list.isEmpty();
1123   list.add(subOpPtr);
1124 
1125   if (empty)
1126   {
1127     jam();
1128     c_failedApiNodesState[nodeId] = __LINE__;
1129     signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1130     signal->theData[1] = subOpPtr.i;
1131     signal->theData[2] = RNIL;
1132     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1133   }
1134   else
1135   {
1136     jam();
1137     c_failedApiNodesState[nodeId] = __LINE__;
1138   }
1139 }
1140 
1141 void
api_fail_subscription(Signal * signal)1142 Suma::api_fail_subscription(Signal* signal)
1143 {
1144   jam();
1145   Ptr<SubOpRecord> subOpPtr;
1146   c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
1147 
1148   Uint32 nodeId = subOpPtr.p->m_senderRef;
1149 
1150   Ptr<Subscription> subPtr;
1151   c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
1152 
1153   Ptr<Subscriber> ptr;
1154   {
1155     LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
1156     if (signal->theData[2] == RNIL)
1157     {
1158       jam();
1159       list.first(ptr);
1160     }
1161     else
1162     {
1163       jam();
1164       list.getPtr(ptr, signal->theData[2]);
1165     }
1166 
1167     for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
1168     {
1169       jam();
1170       if (refToNode(ptr.p->m_senderRef) == nodeId)
1171       {
1172         jam();
1173 
1174         Ptr<Subscriber> tmp = ptr;
1175         list.next(ptr);
1176         list.remove(tmp);
1177 
1178         /**
1179          * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
1180          */
1181         bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
1182 
1183         send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP,
1184                                   report, list);
1185 
1186         c_subscriberPool.release(tmp);
1187       }
1188       else
1189       {
1190         jam();
1191         list.next(ptr);
1192       }
1193     }
1194   }
1195 
1196   if (!ptr.isNull())
1197   {
1198     jam();
1199     c_failedApiNodesState[nodeId] = __LINE__;
1200     signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1201     signal->theData[1] = subOpPtr.i;
1202     signal->theData[2] = ptr.i;
1203     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1204     return;
1205   }
1206 
1207   // Start potential waiter(s)
1208   check_remove_queue(signal, subPtr, subOpPtr, true, false);
1209   check_release_subscription(signal, subPtr);
1210 
1211   // Continue iterating through subscriptions
1212   DLHashTable<Subscription>::Iterator iter;
1213   iter.bucket = subOpPtr.p->m_senderData;
1214   iter.curr = subPtr;
1215 
1216   if (c_subscriptions.next(iter))
1217   {
1218     jam();
1219     c_failedApiNodesState[nodeId] = __LINE__;
1220     signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1221     signal->theData[1] = nodeId;
1222     signal->theData[2] = subOpPtr.i;
1223     signal->theData[3] = iter.bucket;
1224     signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId
1225     signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey
1226     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1227     return;
1228   }
1229 
1230   c_subOpPool.release(subOpPtr);
1231 
1232   /* Now do block level cleanup */
1233   api_fail_block_cleanup(signal, nodeId);
1234 }
1235 
1236 void
execNODE_FAILREP(Signal * signal)1237 Suma::execNODE_FAILREP(Signal* signal){
1238   jamEntry();
1239   DBUG_ENTER("Suma::execNODE_FAILREP");
1240   ndbassert(signal->getNoOfSections() == 0);
1241 
1242   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
1243   NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
1244 
1245   if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref)))
1246   {
1247     jam();
1248 
1249     if (c_restart.m_waiting_on_self)
1250     {
1251       jam();
1252       c_restart.m_abort = 1;
1253     }
1254     else
1255     {
1256       jam();
1257       Ptr<Subscription> subPtr;
1258       c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
1259       abort_start_me(signal, subPtr, false);
1260     }
1261   }
1262 
1263   if (ERROR_INSERTED(13032))
1264   {
1265     Uint32 node = c_subscriber_nodes.find(0);
1266     if (node != NodeBitmask::NotFound)
1267     {
1268       ndbout_c("Inserting API_FAILREQ node: %u", node);
1269       signal->theData[0] = node;
1270       sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
1271     }
1272   }
1273 
1274   NdbNodeBitmask tmp;
1275   tmp.assign(c_alive_nodes);
1276   tmp.bitANDC(failed);
1277 
1278   NdbNodeBitmask takeover_nodes;
1279 
1280   if(c_nodes_in_nodegroup_mask.overlaps(failed))
1281   {
1282     for( Uint32 i = 0; i < c_no_of_buckets; i++)
1283     {
1284       if(m_active_buckets.get(i))
1285 	continue;
1286       else if(m_switchover_buckets.get(i))
1287       {
1288 	Uint32 state= c_buckets[i].m_state;
1289 	if((state & Bucket::BUCKET_HANDOVER) &&
1290 	   failed.get(get_responsible_node(i)))
1291 	{
1292 	  m_active_buckets.set(i);
1293 	  m_switchover_buckets.clear(i);
1294 	  ndbout_c("aborting handover");
1295 	}
1296 	else if(state & Bucket::BUCKET_STARTING)
1297 	{
1298 	  progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
1299 		    "Nodefailure during SUMA takeover");
1300 	}
1301         else if (state & Bucket::BUCKET_SHUTDOWN_TO)
1302         {
1303           jam();
1304           c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO);
1305           m_switchover_buckets.clear(i);
1306           ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId());
1307           start_resend(signal, i);
1308         }
1309       }
1310       else if(get_responsible_node(i, tmp) == getOwnNodeId())
1311       {
1312 	start_resend(signal, i);
1313       }
1314     }
1315   }
1316 
1317   /* Block level cleanup */
1318   for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
1319     jam();
1320     if(failed.get(i)) {
1321       jam();
1322       Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
1323       ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals
1324       (void) elementsCleaned; // Avoid compiler error
1325     }//if
1326   }//for
1327 
1328   c_alive_nodes.assign(tmp);
1329 
1330   DBUG_VOID_RETURN;
1331 }
1332 
1333 void
execINCL_NODEREQ(Signal * signal)1334 Suma::execINCL_NODEREQ(Signal* signal){
1335   jamEntry();
1336 
1337   const Uint32 senderRef = signal->theData[0];
1338   const Uint32 nodeId  = signal->theData[1];
1339 
1340   ndbrequire(!c_alive_nodes.get(nodeId));
1341   if (c_nodes_in_nodegroup_mask.get(nodeId))
1342   {
1343     /**
1344      *
1345      * XXX TODO: This should be removed
1346      *           But, other nodes are (incorrectly) reported as started
1347      *                even if they're not "started", but only INCL_NODEREQ'ed
1348      */
1349     c_alive_nodes.set(nodeId);
1350 
1351     /**
1352      *
1353      * Nodes in nodegroup will be "alive" when
1354      *   sending SUMA_HANDOVER_REQ
1355      */
1356   }
1357   else
1358   {
1359     jam();
1360     c_alive_nodes.set(nodeId);
1361   }
1362 
1363   signal->theData[0] = nodeId;
1364   signal->theData[1] = reference();
1365   sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
1366 }
1367 
1368 void
execSIGNAL_DROPPED_REP(Signal * signal)1369 Suma::execSIGNAL_DROPPED_REP(Signal* signal){
1370   jamEntry();
1371   ndbrequire(false);
1372 }
1373 
1374 /********************************************************************
1375  *
1376  * Dump state
1377  *
1378  */
1379 static
1380 const char*
cstr(Suma::Subscription::State s)1381 cstr(Suma::Subscription::State s)
1382 {
1383   switch(s){
1384   case Suma::Subscription::UNDEFINED:
1385     return "undefined";
1386   case Suma::Subscription::DEFINED:
1387     return "defined";
1388   case Suma::Subscription::DEFINING:
1389     return "defining";
1390   }
1391   return "<unknown>";
1392 }
1393 
1394 static
1395 const char*
cstr(Suma::Subscription::TriggerState s)1396 cstr(Suma::Subscription::TriggerState s)
1397 {
1398   switch(s){
1399   case Suma::Subscription::T_UNDEFINED:
1400     return "undefined";
1401   case Suma::Subscription::T_CREATING:
1402     return "creating";
1403   case Suma::Subscription::T_DEFINED:
1404     return "defined";
1405   case Suma::Subscription::T_DROPPING:
1406     return "dropping";
1407   case Suma::Subscription::T_ERROR:
1408     return "error";
1409   }
1410   return "<uknown>";
1411 }
1412 
1413 static
1414 const char*
cstr(Suma::Subscription::Options s)1415 cstr(Suma::Subscription::Options s)
1416 {
1417   static char buf[256];
1418   buf[0] = 0;
1419   strcat(buf, "[");
1420   if (s & Suma::Subscription::REPORT_ALL)
1421     strcat(buf, " reportall");
1422   if (s & Suma::Subscription::REPORT_SUBSCRIBE)
1423     strcat(buf, " reportsubscribe");
1424   if (s & Suma::Subscription::MARKED_DROPPED)
1425     strcat(buf, " dropped");
1426   if (s & Suma::Subscription::NO_REPORT_DDL)
1427     strcat(buf, " noreportddl");
1428   strcat(buf, " ]");
1429   return buf;
1430 }
1431 
1432 static
1433 const char*
cstr(Suma::Table::State s)1434 cstr(Suma::Table::State s)
1435 {
1436   switch(s){
1437   case Suma::Table::UNDEFINED:
1438     return "undefined";
1439   case Suma::Table::DEFINING:
1440     return "defining";
1441   case Suma::Table::DEFINED:
1442     return "defined";
1443   case Suma::Table::DROPPED:
1444     return "dropped";
1445   }
1446   return "<unknown>";
1447 }
1448 
1449 void
execDUMP_STATE_ORD(Signal * signal)1450 Suma::execDUMP_STATE_ORD(Signal* signal){
1451   jamEntry();
1452 
1453   Uint32 tCase = signal->theData[0];
1454 #if 0
1455   if(tCase >= 8000 && tCase <= 8003){
1456     SubscriptionPtr subPtr;
1457     c_subscriptions.getPtr(subPtr, g_subPtrI);
1458 
1459     Ptr<SyncRecord> syncPtr;
1460     c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
1461 
1462     if(tCase == 8000){
1463       syncPtr.p->startMeta(signal);
1464     }
1465 
1466     if(tCase == 8001){
1467       syncPtr.p->startScan(signal);
1468     }
1469 
1470     if(tCase == 8002){
1471       syncPtr.p->startTrigger(signal);
1472     }
1473 
1474     if(tCase == 8003){
1475       subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
1476       LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
1477       Uint32 tab = 0;
1478       Uint32 att[] = { 0, 1, 1 };
1479       syncPtr.p->m_tableList.append(&tab, 1);
1480       attrs.append(att, 3);
1481     }
1482   }
1483 #endif
1484   if(tCase == 8004){
1485     infoEvent("Suma: c_subscriberPool  size: %d free: %d",
1486 	      c_subscriberPool.getSize(),
1487 	      c_subscriberPool.getNoOfFree());
1488 
1489     infoEvent("Suma: c_tablePool  size: %d free: %d",
1490 	      c_tablePool.getSize(),
1491 	      c_tablePool.getNoOfFree());
1492 
1493     infoEvent("Suma: c_subscriptionPool  size: %d free: %d",
1494 	      c_subscriptionPool.getSize(),
1495 	      c_subscriptionPool.getNoOfFree());
1496 
1497     infoEvent("Suma: c_syncPool  size: %d free: %d",
1498 	      c_syncPool.getSize(),
1499 	      c_syncPool.getNoOfFree());
1500 
1501     infoEvent("Suma: c_dataBufferPool  size: %d free: %d",
1502 	      c_dataBufferPool.getSize(),
1503 	      c_dataBufferPool.getNoOfFree());
1504 
1505     infoEvent("Suma: c_subOpPool  size: %d free: %d",
1506 	      c_subOpPool.getSize(),
1507 	      c_subOpPool.getNoOfFree());
1508 
1509 #if 0
1510     infoEvent("Suma: c_dataSubscribers count: %d",
1511 	      count_subscribers(c_dataSubscribers));
1512     infoEvent("Suma: c_prepDataSubscribers count: %d",
1513 	      count_subscribers(c_prepDataSubscribers));
1514 #endif
1515   }
1516 
1517   if(tCase == 8005)
1518   {
1519     for(Uint32 i = 0; i<c_no_of_buckets; i++)
1520     {
1521       Bucket* ptr= c_buckets + i;
1522       infoEvent("Bucket %d %d%d-%x switch gci: %llu max_acked_gci: %llu max_gci: %llu tail: %d head: %d",
1523 		i,
1524 		m_active_buckets.get(i),
1525 		m_switchover_buckets.get(i),
1526 		ptr->m_state,
1527 		ptr->m_switchover_gci,
1528 		ptr->m_max_acked_gci,
1529 		ptr->m_buffer_head.m_max_gci,
1530 		ptr->m_buffer_tail,
1531 		ptr->m_buffer_head.m_page_id);
1532     }
1533   }
1534 
1535   if (tCase == 8006)
1536   {
1537     SET_ERROR_INSERT_VALUE(13029);
1538   }
1539 
1540   if (tCase == 8007)
1541   {
1542     c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1;
1543     SET_ERROR_INSERT_VALUE(13029);
1544   }
1545 
1546   if (tCase == 8008)
1547   {
1548     CLEAR_ERROR_INSERT_VALUE;
1549   }
1550 
1551   if (tCase == 8010)
1552   {
1553     char buf1[255], buf2[255];
1554     c_subscriber_nodes.getText(buf1);
1555     c_connected_nodes.getText(buf2);
1556     infoEvent("c_subscriber_nodes: %s", buf1);
1557     infoEvent("c_connected_nodes: %s", buf2);
1558   }
1559 
1560   if (tCase == 8009)
1561   {
1562     if (ERROR_INSERTED(13030))
1563     {
1564       CLEAR_ERROR_INSERT_VALUE;
1565       sendSTTORRY(signal);
1566     }
1567     else
1568     {
1569       SET_ERROR_INSERT_VALUE(13030);
1570     }
1571     return;
1572   }
1573 
1574   if (tCase == 8011)
1575   {
1576     jam();
1577     Uint32 bucket = signal->theData[1];
1578     KeyTable<Table>::Iterator it;
1579     if (signal->getLength() == 1)
1580     {
1581       jam();
1582       bucket = 0;
1583       infoEvent("-- Starting dump of subscribers --");
1584     }
1585 
1586     c_tables.next(bucket, it);
1587     const Uint32 RT_BREAK = 16;
1588     for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1589     {
1590       jam();
1591       if(it.curr.i == RNIL)
1592       {
1593         jam();
1594         infoEvent("-- Ending dump of subscribers --");
1595         return;
1596       }
1597 
1598       infoEvent("Table %u ver %u",
1599                 it.curr.p->m_tableId,
1600                 it.curr.p->m_schemaVersion);
1601 
1602       Uint32 cnt = 0;
1603       Ptr<Subscription> subPtr;
1604       LocalDLList<Subscription> subList(c_subscriptionPool,
1605                                         it.curr.p->m_subscriptions);
1606       for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
1607       {
1608         infoEvent(" Subcription %u", subPtr.i);
1609         {
1610           Ptr<Subscriber> ptr;
1611           LocalDLList<Subscriber> list(c_subscriberPool,
1612                                        subPtr.p->m_subscribers);
1613           for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1614           {
1615             jam();
1616             cnt++;
1617             infoEvent("  Subscriber [ %x %u %u ]",
1618                       ptr.p->m_senderRef,
1619                       ptr.p->m_senderData,
1620                       subPtr.i);
1621           }
1622         }
1623 
1624         {
1625           Ptr<SubOpRecord> ptr;
1626           LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1627                                        subPtr.p->m_create_req);
1628 
1629           for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1630           {
1631             jam();
1632             infoEvent("  create [ %x %u ]",
1633                       ptr.p->m_senderRef,
1634                       ptr.p->m_senderData);
1635           }
1636         }
1637 
1638         {
1639           Ptr<SubOpRecord> ptr;
1640           LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1641                                        subPtr.p->m_start_req);
1642 
1643           for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1644           {
1645             jam();
1646             infoEvent("  start [ %x %u ]",
1647                       ptr.p->m_senderRef,
1648                       ptr.p->m_senderData);
1649           }
1650         }
1651 
1652         {
1653           Ptr<SubOpRecord> ptr;
1654           LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1655                                         subPtr.p->m_stop_req);
1656 
1657           for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1658           {
1659             jam();
1660             infoEvent("  stop [ %u %x %u ]",
1661                       ptr.p->m_opType,
1662                       ptr.p->m_senderRef,
1663                       ptr.p->m_senderData);
1664           }
1665         }
1666       }
1667       infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
1668       c_tables.next(it);
1669     }
1670 
1671     signal->theData[0] = tCase;
1672     signal->theData[1] = it.bucket;
1673     sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1674     return;
1675   }
1676 
1677   if (tCase == 8012)
1678   {
1679     jam();
1680     Uint32 bucket = signal->theData[1];
1681     KeyTable<Subscription>::Iterator it;
1682     if (signal->getLength() == 1)
1683     {
1684       jam();
1685       bucket = 0;
1686       infoEvent("-- Starting dump of subscribers --");
1687     }
1688 
1689     c_subscriptions.next(bucket, it);
1690     const Uint32 RT_BREAK = 16;
1691     for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1692     {
1693       jam();
1694       if(it.curr.i == RNIL)
1695       {
1696         jam();
1697         infoEvent("-- Ending dump of subscribers --");
1698         return;
1699       }
1700 
1701       Ptr<Subscription> subPtr = it.curr;
1702       Ptr<Table> tabPtr;
1703       c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
1704       infoEvent("Subcription %u id: 0x%.8x key: 0x%.8x state: %s",
1705                 subPtr.i,
1706                 subPtr.p->m_subscriptionId,
1707                 subPtr.p->m_subscriptionKey,
1708                 cstr(subPtr.p->m_state));
1709       infoEvent("  trigger state: %s options: %s",
1710                 cstr(subPtr.p->m_trigger_state),
1711                 cstr((Suma::Subscription::Options)subPtr.p->m_options));
1712       infoEvent("  tablePtr: %u tableId: %u schemaVersion: 0x%.8x state: %s",
1713                 tabPtr.i,
1714                 subPtr.p->m_tableId,
1715                 tabPtr.p->m_schemaVersion,
1716                 cstr(tabPtr.p->m_state));
1717       {
1718         Ptr<Subscriber> ptr;
1719         LocalDLList<Subscriber> list(c_subscriberPool,
1720                                      subPtr.p->m_subscribers);
1721         for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1722         {
1723           jam();
1724           infoEvent("  Subscriber [ %x %u %u ]",
1725                     ptr.p->m_senderRef,
1726                     ptr.p->m_senderData,
1727                     subPtr.i);
1728         }
1729       }
1730 
1731       {
1732         Ptr<SubOpRecord> ptr;
1733         LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1734                                           subPtr.p->m_create_req);
1735 
1736         for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1737         {
1738           jam();
1739           infoEvent("  create [ %x %u ]",
1740                     ptr.p->m_senderRef,
1741                     ptr.p->m_senderData);
1742         }
1743       }
1744 
1745       {
1746         Ptr<SubOpRecord> ptr;
1747         LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1748                                           subPtr.p->m_start_req);
1749 
1750         for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1751         {
1752           jam();
1753           infoEvent("  start [ %x %u ]",
1754                     ptr.p->m_senderRef,
1755                     ptr.p->m_senderData);
1756         }
1757       }
1758 
1759       {
1760         Ptr<SubOpRecord> ptr;
1761         LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1762                                           subPtr.p->m_stop_req);
1763 
1764         for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1765         {
1766           jam();
1767           infoEvent("  stop [ %u %x %u ]",
1768                     ptr.p->m_opType,
1769                     ptr.p->m_senderRef,
1770                     ptr.p->m_senderData);
1771         }
1772       }
1773       c_subscriptions.next(it);
1774     }
1775 
1776     signal->theData[0] = tCase;
1777     signal->theData[1] = it.bucket;
1778     sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1779     return;
1780   }
1781 
1782   if (tCase == 7019 && signal->getLength() == 2)
1783   {
1784     jam();
1785     Uint32 nodeId = signal->theData[1];
1786     if (nodeId < MAX_NODES)
1787     {
1788       warningEvent(" Suma 7019 %u line: %u", nodeId,
1789                    c_failedApiNodesState[nodeId]);
1790       warningEvent("   c_connected_nodes.get(): %u",
1791                    c_connected_nodes.get(nodeId));
1792       warningEvent("   c_failedApiNodes.get(): %u",
1793                    c_failedApiNodes.get(nodeId));
1794       warningEvent("   c_subscriber_nodes.get(): %u",
1795                    c_subscriber_nodes.get(nodeId));
1796       warningEvent(" c_subscriber_per_node[%u]: %u",
1797                    nodeId, c_subscriber_per_node[nodeId]);
1798     }
1799     else
1800     {
1801       warningEvent(" SUMP: dump-7019 to unknown node: %u", nodeId);
1802     }
1803   }
1804 }
1805 
execDBINFO_SCANREQ(Signal * signal)1806 void Suma::execDBINFO_SCANREQ(Signal *signal)
1807 {
1808   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
1809   const Ndbinfo::ScanCursor* cursor =
1810     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
1811   Ndbinfo::Ratelimit rl;
1812 
1813   jamEntry();
1814 
1815   switch(req.tableId){
1816   case Ndbinfo::POOLS_TABLEID:
1817   {
1818     Ndbinfo::pool_entry pools[] =
1819     {
1820       { "Subscriber",
1821         c_subscriberPool.getUsed(),
1822         c_subscriberPool.getSize(),
1823         c_subscriberPool.getEntrySize(),
1824         c_subscriberPool.getUsedHi(),
1825         { CFG_DB_SUBSCRIBERS,
1826           CFG_DB_SUBSCRIPTIONS,
1827           CFG_DB_NO_TABLES,0 }},
1828       { "Table",
1829         c_tablePool.getUsed(),
1830         c_tablePool.getSize(),
1831         c_tablePool.getEntrySize(),
1832         c_tablePool.getUsedHi(),
1833         { CFG_DB_NO_TABLES,0,0,0 }},
1834       { "Subscription",
1835         c_subscriptionPool.getUsed(),
1836         c_subscriptionPool.getSize(),
1837         c_subscriptionPool.getEntrySize(),
1838         c_subscriptionPool.getUsedHi(),
1839         { CFG_DB_SUBSCRIPTIONS,
1840           CFG_DB_NO_TABLES,0,0 }},
1841       { "Sync",
1842         c_syncPool.getUsed(),
1843         c_syncPool.getSize(),
1844         c_syncPool.getEntrySize(),
1845         c_syncPool.getUsedHi(),
1846         { 0,0,0,0 }},
1847       { "Data Buffer",
1848         c_dataBufferPool.getUsed(),
1849         c_dataBufferPool.getSize(),
1850         c_dataBufferPool.getEntrySize(),
1851         c_dataBufferPool.getUsedHi(),
1852         { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
1853       { "SubOp",
1854         c_subOpPool.getUsed(),
1855         c_subOpPool.getSize(),
1856         c_subOpPool.getEntrySize(),
1857         c_subOpPool.getUsedHi(),
1858         { CFG_DB_SUB_OPERATIONS,0,0,0 }},
1859       { "Page Chunk",
1860         c_page_chunk_pool.getUsed(),
1861         c_page_chunk_pool.getSize(),
1862         c_page_chunk_pool.getEntrySize(),
1863         c_page_chunk_pool.getUsedHi(),
1864         { 0,0,0,0 }},
1865       { "GCP",
1866         c_gcp_pool.getUsed(),
1867         c_gcp_pool.getSize(),
1868         c_gcp_pool.getEntrySize(),
1869         c_gcp_pool.getUsedHi(),
1870         { CFG_DB_API_HEARTBEAT_INTERVAL,
1871           CFG_DB_GCP_INTERVAL,0,0 }},
1872       { NULL, 0,0,0,0, { 0,0,0,0 }}
1873     };
1874 
1875     const size_t num_config_params =
1876       sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
1877     Uint32 pool = cursor->data[0];
1878     BlockNumber bn = blockToMain(number());
1879     while(pools[pool].poolname)
1880     {
1881       jam();
1882       Ndbinfo::Row row(signal, req);
1883       row.write_uint32(getOwnNodeId());
1884       row.write_uint32(bn);           // block number
1885       row.write_uint32(instance());   // block instance
1886       row.write_string(pools[pool].poolname);
1887       row.write_uint64(pools[pool].used);
1888       row.write_uint64(pools[pool].total);
1889       row.write_uint64(pools[pool].used_hi);
1890       row.write_uint64(pools[pool].entry_size);
1891       for (size_t i = 0; i < num_config_params; i++)
1892         row.write_uint32(pools[pool].config_params[i]);
1893       ndbinfo_send_row(signal, req, row, rl);
1894       pool++;
1895       if (rl.need_break(req))
1896       {
1897         jam();
1898         ndbinfo_send_scan_break(signal, req, rl, pool);
1899         return;
1900       }
1901     }
1902     break;
1903   }
1904   default:
1905     break;
1906   }
1907 
1908   ndbinfo_send_scan_conf(signal, req, rl);
1909 }
1910 
1911 /*************************************************************
1912  *
1913  * Creation of subscription id's
1914  *
1915  ************************************************************/
1916 
1917 void
execCREATE_SUBID_REQ(Signal * signal)1918 Suma::execCREATE_SUBID_REQ(Signal* signal)
1919 {
1920   jamEntry();
1921   DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
1922   ndbassert(signal->getNoOfSections() == 0);
1923   CRASH_INSERTION(13001);
1924 
1925   CreateSubscriptionIdReq const * req =
1926     (CreateSubscriptionIdReq*)signal->getDataPtr();
1927   SubscriberPtr subbPtr;
1928   if(!c_subscriberPool.seize(subbPtr)){
1929     jam();
1930     sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
1931     DBUG_VOID_RETURN;
1932   }
1933   DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
1934 		     c_subscriberPool.getSize(),
1935 		     c_subscriberPool.getNoOfFree()));
1936 
1937   subbPtr.p->m_senderRef  = req->senderRef;
1938   subbPtr.p->m_senderData = req->senderData;
1939 
1940   UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
1941   utilReq->senderData  = subbPtr.i;
1942   utilReq->sequenceId  = SUMA_SEQUENCE;
1943   utilReq->requestType = UtilSequenceReq::NextVal;
1944   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
1945 	     signal, UtilSequenceReq::SignalLength, JBB);
1946 
1947   DBUG_VOID_RETURN;
1948 }
1949 
1950 void
execUTIL_SEQUENCE_CONF(Signal * signal)1951 Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
1952 {
1953   jamEntry();
1954   DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
1955   ndbassert(signal->getNoOfSections() == 0);
1956   CRASH_INSERTION(13002);
1957 
1958   UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
1959   if(conf->requestType == UtilSequenceReq::Create) {
1960     jam();
1961     createSequenceReply(signal, conf, NULL);
1962     DBUG_VOID_RETURN;
1963   }
1964 
1965   Uint64 subId;
1966   memcpy(&subId,conf->sequenceValue,8);
1967   SubscriberPtr subbPtr;
1968   c_subscriberPool.getPtr(subbPtr,conf->senderData);
1969 
1970   CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
1971   subconf->senderRef      = reference();
1972   subconf->senderData     = subbPtr.p->m_senderData;
1973   subconf->subscriptionId = (Uint32)subId;
1974   subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
1975 
1976   sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
1977 	     CreateSubscriptionIdConf::SignalLength, JBB);
1978 
1979   c_subscriberPool.release(subbPtr);
1980   DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
1981 		     c_subscriberPool.getSize(),
1982 		     c_subscriberPool.getNoOfFree()));
1983   DBUG_VOID_RETURN;
1984 }
1985 
1986 void
execUTIL_SEQUENCE_REF(Signal * signal)1987 Suma::execUTIL_SEQUENCE_REF(Signal* signal)
1988 {
1989   jamEntry();
1990   DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
1991   ndbassert(signal->getNoOfSections() == 0);
1992   UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
1993   Uint32 err= ref->errorCode;
1994 
1995   if(ref->requestType == UtilSequenceReq::Create) {
1996     jam();
1997     createSequenceReply(signal, NULL, ref);
1998     DBUG_VOID_RETURN;
1999   }
2000 
2001   Uint32 subData = ref->senderData;
2002 
2003   SubscriberPtr subbPtr;
2004   c_subscriberPool.getPtr(subbPtr,subData);
2005   if (err == UtilSequenceRef::TCError)
2006   {
2007     jam();
2008     err = ref->TCErrorCode;
2009   }
2010   sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
2011   c_subscriberPool.release(subbPtr);
2012   DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
2013 		     c_subscriberPool.getSize(),
2014 		     c_subscriberPool.getNoOfFree()));
2015   DBUG_VOID_RETURN;
2016 }//execUTIL_SEQUENCE_REF()
2017 
2018 
2019 void
sendSubIdRef(Signal * signal,Uint32 senderRef,Uint32 senderData,Uint32 errCode)2020 Suma::sendSubIdRef(Signal* signal,
2021 			      Uint32 senderRef, Uint32 senderData, Uint32 errCode)
2022 {
2023   jam();
2024   DBUG_ENTER("Suma::sendSubIdRef");
2025   CreateSubscriptionIdRef  * ref =
2026     (CreateSubscriptionIdRef *)signal->getDataPtrSend();
2027 
2028   ref->senderRef  = reference();
2029   ref->senderData = senderData;
2030   ref->errorCode  = errCode;
2031   sendSignal(senderRef,
2032 	     GSN_CREATE_SUBID_REF,
2033 	     signal,
2034 	     CreateSubscriptionIdRef::SignalLength,
2035 	     JBB);
2036 
2037   DBUG_VOID_RETURN;
2038 }
2039 
2040 /**********************************************************
2041  * Suma participant interface
2042  *
2043  * Creation of subscriptions
2044  */
2045 void
execSUB_CREATE_REQ(Signal * signal)2046 Suma::execSUB_CREATE_REQ(Signal* signal)
2047 {
2048   jamEntry();
2049   DBUG_ENTER("Suma::execSUB_CREATE_REQ");
2050   ndbassert(signal->getNoOfSections() == 0);
2051   CRASH_INSERTION(13003);
2052 
2053   const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
2054 
2055   const Uint32 senderRef  = req.senderRef;
2056   const Uint32 senderData = req.senderData;
2057   const Uint32 subId   = req.subscriptionId;
2058   const Uint32 subKey  = req.subscriptionKey;
2059   const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;
2060   const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
2061   const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
2062     Subscription::REPORT_ALL : 0;
2063   const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
2064     Subscription::REPORT_SUBSCRIBE : 0;
2065   const Uint32 noReportDDL = (flags & SubCreateReq::NoReportDDL) ?
2066     Subscription::NO_REPORT_DDL : 0;
2067   const Uint32 tableId = req.tableId;
2068   const Uint32 schemaTransId = req.schemaTransId;
2069 
2070   bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped;
2071 
2072   /**
2073    * This 2 options are only allowed during NR
2074    */
2075   if (subDropped)
2076   {
2077     ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id);
2078   }
2079 
2080   Subscription key;
2081   key.m_subscriptionId  = subId;
2082   key.m_subscriptionKey = subKey;
2083 
2084   DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
2085 		      key.m_subscriptionId, key.m_subscriptionKey));
2086 
2087   SubscriptionPtr subPtr;
2088 
2089   bool found = c_subscriptions.find(subPtr, key);
2090 
2091   if (c_startup.m_restart_server_node_id == RNIL)
2092   {
2093     jam();
2094 
2095     /**
2096      * We havent started syncing yet
2097      */
2098     sendSubCreateRef(signal, senderRef, senderData,
2099                      SubCreateRef::NotStarted);
2100     return;
2101   }
2102 
2103   CRASH_INSERTION2(13040, c_startup.m_restart_server_node_id != RNIL);
2104   CRASH_INSERTION(13041);
2105 
2106   bool allowDup = true; //c_startup.m_restart_server_node_id;
2107 
2108   if (found && !allowDup)
2109   {
2110     jam();
2111     sendSubCreateRef(signal, senderRef, senderData,
2112                      SubCreateRef::SubscriptionAlreadyExist);
2113     return;
2114   }
2115 
2116   if (found == false)
2117   {
2118     jam();
2119     if(!c_subscriptions.seize(subPtr))
2120     {
2121       jam();
2122       sendSubCreateRef(signal, senderRef, senderData,
2123                        SubCreateRef::OutOfSubscriptionRecords);
2124       return;
2125     }
2126 
2127     new (subPtr.p) Subscription();
2128     subPtr.p->m_seq_no           = c_current_seq;
2129     subPtr.p->m_subscriptionId   = subId;
2130     subPtr.p->m_subscriptionKey  = subKey;
2131     subPtr.p->m_subscriptionType = type;
2132     subPtr.p->m_tableId          = tableId;
2133     subPtr.p->m_table_ptrI       = RNIL;
2134     subPtr.p->m_state            = Subscription::UNDEFINED;
2135     subPtr.p->m_trigger_state    =  Subscription::T_UNDEFINED;
2136     subPtr.p->m_triggers[0]      = ILLEGAL_TRIGGER_ID;
2137     subPtr.p->m_triggers[1]      = ILLEGAL_TRIGGER_ID;
2138     subPtr.p->m_triggers[2]      = ILLEGAL_TRIGGER_ID;
2139     subPtr.p->m_errorCode        = 0;
2140     subPtr.p->m_options          = reportSubscribe | reportAll | noReportDDL;
2141     subPtr.p->m_schemaTransId    = schemaTransId;
2142   }
2143 
2144   Ptr<SubOpRecord> subOpPtr;
2145   LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req);
2146   if ((ERROR_INSERTED(13044) && found == false) ||
2147       subOpList.seize(subOpPtr) == false)
2148   {
2149     jam();
2150     if (found == false)
2151     {
2152       jam();
2153       if (ERROR_INSERTED(13044))
2154       {
2155         CLEAR_ERROR_INSERT_VALUE;
2156       }
2157       c_subscriptionPool.release(subPtr); // not yet in hash
2158     }
2159     sendSubCreateRef(signal, senderRef, senderData,
2160                      SubCreateRef::OutOfTableRecords);
2161     return;
2162   }
2163 
2164   subOpPtr.p->m_senderRef = senderRef;
2165   subOpPtr.p->m_senderData = senderData;
2166 
2167   if (subDropped)
2168   {
2169     jam();
2170     subPtr.p->m_options |= Subscription::MARKED_DROPPED;
2171   }
2172 
2173   TablePtr tabPtr;
2174   if (found)
2175   {
2176     jam();
2177     c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2178   }
2179   else if (c_tables.find(tabPtr, tableId))
2180   {
2181     jam();
2182   }
2183   else
2184   {
2185     jam();
2186     if (ERROR_INSERTED(13045) || c_tablePool.seize(tabPtr) == false)
2187     {
2188       jam();
2189       if (ERROR_INSERTED(13045))
2190       {
2191         CLEAR_ERROR_INSERT_VALUE;
2192       }
2193 
2194       subOpList.release(subOpPtr);
2195       c_subscriptionPool.release(subPtr); // not yet in hash
2196       sendSubCreateRef(signal, senderRef, senderData,
2197                        SubCreateRef::OutOfTableRecords);
2198       return;
2199     }
2200 
2201     new (tabPtr.p) Table;
2202     tabPtr.p->m_tableId= tableId;
2203     tabPtr.p->m_ptrI= tabPtr.i;
2204     tabPtr.p->m_error = 0;
2205     tabPtr.p->m_schemaVersion = RNIL;
2206     tabPtr.p->m_state = Table::UNDEFINED;
2207     tabPtr.p->m_schemaTransId = schemaTransId;
2208     c_tables.add(tabPtr);
2209   }
2210 
2211   if (found == false)
2212   {
2213     jam();
2214     c_subscriptions.add(subPtr);
2215     LocalDLList<Subscription> list(c_subscriptionPool,
2216                                    tabPtr.p->m_subscriptions);
2217     list.add(subPtr);
2218     subPtr.p->m_table_ptrI = tabPtr.i;
2219   }
2220 
2221   switch(tabPtr.p->m_state){
2222   case Table::DEFINED:{
2223     jam();
2224     // Send conf
2225     subOpList.release(subOpPtr);
2226     subPtr.p->m_state = Subscription::DEFINED;
2227     SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2228     conf->senderRef  = reference();
2229     conf->senderData = senderData;
2230     sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
2231                SubCreateConf::SignalLength, JBB);
2232     return;
2233   }
2234   case Table::UNDEFINED:{
2235     jam();
2236     tabPtr.p->m_state = Table::DEFINING;
2237     subPtr.p->m_state = Subscription::DEFINING;
2238 
2239     if (ERROR_INSERTED(13031))
2240     {
2241       jam();
2242       CLEAR_ERROR_INSERT_VALUE;
2243       GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
2244       ref->tableId = tableId;
2245       ref->senderData = tabPtr.i;
2246       ref->errorCode = GetTabInfoRef::TableNotDefined;
2247       sendSignal(reference(), GSN_GET_TABINFOREF, signal,
2248                  GetTabInfoRef::SignalLength, JBB);
2249       return;
2250     }
2251 
2252     GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2253     req->senderRef = reference();
2254     req->senderData = tabPtr.i;
2255     req->requestType =
2256       GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2257     req->tableId = tableId;
2258     req->schemaTransId = schemaTransId;
2259 
2260     sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2261                GetTabInfoReq::SignalLength, JBB);
2262     return;
2263   }
2264   case Table::DEFINING:
2265   {
2266     jam();
2267     /**
2268      * just wait for completion
2269      */
2270     subPtr.p->m_state = Subscription::DEFINING;
2271     return;
2272   }
2273   case Table::DROPPED:
2274   {
2275     subOpList.release(subOpPtr);
2276 
2277     {
2278       LocalDLList<Subscription> list(c_subscriptionPool,
2279                                      tabPtr.p->m_subscriptions);
2280       list.remove(subPtr);
2281     }
2282     c_subscriptions.release(subPtr);
2283 
2284     sendSubCreateRef(signal, senderRef, senderData,
2285                      SubCreateRef::TableDropped);
2286     return;
2287   }
2288   }
2289 
2290   ndbrequire(false);
2291 }
2292 
2293 void
sendSubCreateRef(Signal * signal,Uint32 retRef,Uint32 data,Uint32 errCode)2294 Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data,
2295                        Uint32 errCode)
2296 {
2297   jam();
2298   SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
2299   ref->errorCode  = errCode;
2300   ref->senderData = data;
2301   sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
2302   	     SubCreateRef::SignalLength, JBB);
2303   return;
2304 }
2305 
2306 /**********************************************************
2307  *
2308  * Setting upp trigger for subscription
2309  *
2310  */
2311 
2312 void
execSUB_SYNC_REQ(Signal * signal)2313 Suma::execSUB_SYNC_REQ(Signal* signal)
2314 {
2315   jamEntry();
2316 
2317   CRASH_INSERTION(13004);
2318 
2319   SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
2320 
2321   SubscriptionPtr subPtr;
2322   Subscription key;
2323   key.m_subscriptionId = req->subscriptionId;
2324   key.m_subscriptionKey = req->subscriptionKey;
2325 
2326   SectionHandle handle(this, signal);
2327   if(!c_subscriptions.find(subPtr, key))
2328   {
2329     jam();
2330     releaseSections(handle);
2331     sendSubSyncRef(signal, 1407);
2332     return;
2333   }
2334 
2335   Ptr<SyncRecord> syncPtr;
2336   LocalDLList<SyncRecord> list(c_syncPool, subPtr.p->m_syncRecords);
2337   if(!list.seize(syncPtr))
2338   {
2339     jam();
2340     releaseSections(handle);
2341     sendSubSyncRef(signal, 1416);
2342     return;
2343   }
2344 
2345   new (syncPtr.p) Ptr<SyncRecord>;
2346   syncPtr.p->m_senderRef        = req->senderRef;
2347   syncPtr.p->m_senderData       = req->senderData;
2348   syncPtr.p->m_subscriptionPtrI = subPtr.i;
2349   syncPtr.p->ptrI               = syncPtr.i;
2350   syncPtr.p->m_error            = 0;
2351   syncPtr.p->m_requestInfo      = req->requestInfo;
2352   syncPtr.p->m_frag_cnt         = req->fragCount;
2353   syncPtr.p->m_frag_id          = req->fragId;
2354   syncPtr.p->m_tableId          = subPtr.p->m_tableId;
2355 
2356   {
2357     jam();
2358     if(handle.m_cnt > 0)
2359     {
2360       SegmentedSectionPtr ptr;
2361       handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
2362       LocalDataBuffer<15> attrBuf(c_dataBufferPool, syncPtr.p->m_attributeList);
2363       append(attrBuf, ptr, getSectionSegmentPool());
2364     }
2365     if (req->requestInfo & SubSyncReq::RangeScan)
2366     {
2367       jam();
2368       ndbrequire(handle.m_cnt > 1)
2369       SegmentedSectionPtr ptr;
2370       handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO);
2371       LocalDataBuffer<15> boundBuf(c_dataBufferPool, syncPtr.p->m_boundInfo);
2372       append(boundBuf, ptr, getSectionSegmentPool());
2373     }
2374     releaseSections(handle);
2375   }
2376 
2377   /**
2378    * We need to gather fragment info
2379    */
2380   {
2381     jam();
2382     DihScanTabReq* req = (DihScanTabReq*)signal->getDataPtrSend();
2383     req->senderRef = reference();
2384     req->senderData = syncPtr.i;
2385     req->tableId = subPtr.p->m_tableId;
2386     req->schemaTransId = subPtr.p->m_schemaTransId;
2387     sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2388                DihScanTabReq::SignalLength, JBB);
2389   }
2390 }
2391 
2392 void
sendSubSyncRef(Signal * signal,Uint32 errCode)2393 Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){
2394   jam();
2395   SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend();
2396   ref->errorCode = errCode;
2397   sendSignal(signal->getSendersBlockRef(),
2398 	     GSN_SUB_SYNC_REF,
2399 	     signal,
2400 	     SubSyncRef::SignalLength,
2401 	     JBB);
2402   return;
2403 }
2404 
2405 void
execDIH_SCAN_TAB_REF(Signal * signal)2406 Suma::execDIH_SCAN_TAB_REF(Signal* signal)
2407 {
2408   jamEntry();
2409   DBUG_ENTER("Suma::execDI_FCOUNTREF");
2410   DihScanTabRef * ref = (DihScanTabRef*)signal->getDataPtr();
2411   switch ((DihScanTabRef::ErrorCode) ref->error)
2412   {
2413   case DihScanTabRef::ErroneousTableState:
2414     jam();
2415     if (ref->tableStatus == Dbdih::TabRecord::TS_CREATING)
2416     {
2417       const Uint32 tableId = ref->tableId;
2418       const Uint32 synPtrI = ref->senderData;
2419       const Uint32 schemaTransId = ref->schemaTransId;
2420       DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
2421 
2422       req->senderData = synPtrI;
2423       req->senderRef = reference();
2424       req->tableId = tableId;
2425       req->schemaTransId = schemaTransId;
2426       sendSignalWithDelay(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2427                           DihScanTabReq::SignalLength,
2428                           DihScanTabReq::RetryInterval);
2429       DBUG_VOID_RETURN;
2430     }
2431     ndbrequire(false);
2432   default:
2433     ndbrequire(false);
2434   }
2435 
2436   DBUG_VOID_RETURN;
2437 }
2438 
2439 void
execDIH_SCAN_TAB_CONF(Signal * signal)2440 Suma::execDIH_SCAN_TAB_CONF(Signal* signal)
2441 {
2442   jamEntry();
2443   DBUG_ENTER("Suma::execDI_FCOUNTCONF");
2444   ndbassert(signal->getNoOfSections() == 0);
2445   DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
2446   const Uint32 tableId = conf->tableId;
2447   const Uint32 fragCount = conf->fragmentCount;
2448   const Uint32 scanCookie = conf->scanCookie;
2449 
2450   Ptr<SyncRecord> ptr;
2451   c_syncPool.getPtr(ptr, conf->senderData);
2452 
2453   LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2454   ndbrequire(fragBuf.getSize() == 0);
2455 
2456   ndbassert(fragCount >= ptr.p->m_frag_cnt);
2457   if (ptr.p->m_frag_cnt == 0)
2458   {
2459     jam();
2460     ptr.p->m_frag_cnt = fragCount;
2461   }
2462   ptr.p->m_scan_cookie = scanCookie;
2463 
2464   DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2465   req->senderRef = reference();
2466   req->senderData = ptr.i;
2467   req->tableId = tableId;
2468   req->fragId = 0;
2469   req->scanCookie = scanCookie;
2470   sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2471              DihScanGetNodesReq::SignalLength, JBB);
2472 
2473   DBUG_VOID_RETURN;
2474 }
2475 
2476 void
execDIH_SCAN_GET_NODES_CONF(Signal * signal)2477 Suma::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
2478 {
2479   jamEntry();
2480   DBUG_ENTER("Suma::execDIGETPRIMCONF");
2481   ndbassert(signal->getNoOfSections() == 0);
2482 
2483   DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr();
2484   const Uint32 nodeCount = conf->count;
2485   const Uint32 tableId = conf->tableId;
2486   const Uint32 fragNo = conf->fragId;
2487 
2488   ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
2489 
2490   Ptr<SyncRecord> ptr;
2491   c_syncPool.getPtr(ptr, conf->senderData);
2492 
2493   {
2494     LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2495 
2496     /**
2497      * Add primary node for fragment to list
2498      */
2499     FragmentDescriptor fd;
2500     fd.m_fragDesc.m_nodeId = conf->nodes[0];
2501     fd.m_fragDesc.m_fragmentNo = fragNo;
2502     fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
2503     if (ptr.p->m_frag_id == ZNIL)
2504     {
2505       signal->theData[2] = fd.m_dummy;
2506       fragBuf.append(&signal->theData[2], 1);
2507     }
2508     else if (ptr.p->m_frag_id == fragNo)
2509     {
2510       /*
2511        * Given fragment must have a replica on this node.
2512        */
2513       const Uint32 ownNodeId = getOwnNodeId();
2514       Uint32 i = 0;
2515       for (i = 0; i < nodeCount; i++)
2516         if (conf->nodes[i] == ownNodeId)
2517           break;
2518       if (i == nodeCount)
2519       {
2520         sendSubSyncRef(signal, 1428);
2521         return;
2522       }
2523       fd.m_fragDesc.m_nodeId = ownNodeId;
2524       signal->theData[2] = fd.m_dummy;
2525       fragBuf.append(&signal->theData[2], 1);
2526     }
2527   }
2528 
2529   const Uint32 nextFrag = fragNo + 1;
2530   if(nextFrag == ptr.p->m_frag_cnt)
2531   {
2532     jam();
2533 
2534     ptr.p->startScan(signal);
2535     return;
2536   }
2537 
2538   DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2539   req->senderRef = reference();
2540   req->senderData = ptr.i;
2541   req->tableId = tableId;
2542   req->fragId = nextFrag;
2543   req->scanCookie = ptr.p->m_scan_cookie;
2544   sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2545              DihScanGetNodesReq::SignalLength, JBB);
2546 
2547   DBUG_VOID_RETURN;
2548 }
2549 
2550 /**********************************************************
2551  * Dict interface
2552  */
2553 
2554 /*************************************************************************
2555  *
2556  *
2557  */
2558 void
execGET_TABINFOREF(Signal * signal)2559 Suma::execGET_TABINFOREF(Signal* signal){
2560   jamEntry();
2561   GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
2562   Uint32 tableId = ref->tableId;
2563   Uint32 senderData = ref->senderData;
2564   Uint32 schemaTransId = ref->schemaTransId;
2565   GetTabInfoRef::ErrorCode errorCode =
2566     (GetTabInfoRef::ErrorCode) ref->errorCode;
2567   int do_resend_request = 0;
2568   TablePtr tabPtr;
2569   c_tablePool.getPtr(tabPtr, senderData);
2570   switch (errorCode)
2571   {
2572   case GetTabInfoRef::TableNotDefined:
2573     // wrong state
2574     break;
2575   case GetTabInfoRef::InvalidTableId:
2576     // no such table
2577     break;
2578   case GetTabInfoRef::Busy:
2579     do_resend_request = 1;
2580     break;
2581   case GetTabInfoRef::NoFetchByName:
2582     jam();
2583   case GetTabInfoRef::TableNameTooLong:
2584     jam();
2585     ndbrequire(false);
2586   }
2587   if (tabPtr.p->m_state == Table::DROPPED)
2588   {
2589     jam();
2590     do_resend_request = 0;
2591   }
2592 
2593   if (do_resend_request)
2594   {
2595     GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2596     req->senderRef = reference();
2597     req->senderData = senderData;
2598     req->requestType =
2599       GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2600     req->tableId = tableId;
2601     req->schemaTransId = schemaTransId;
2602     sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2603                         30, GetTabInfoReq::SignalLength);
2604     return;
2605   }
2606   get_tabinfo_ref_release(signal, tabPtr);
2607 }
2608 
2609 void
get_tabinfo_ref_release(Signal * signal,Ptr<Table> tabPtr)2610 Suma::get_tabinfo_ref_release(Signal* signal, Ptr<Table> tabPtr)
2611 {
2612   LocalDLList<Subscription> subList(c_subscriptionPool,
2613                                     tabPtr.p->m_subscriptions);
2614   Ptr<Subscription> subPtr;
2615   bool empty = subList.isEmpty();
2616   for(subList.first(subPtr); !subPtr.isNull();)
2617   {
2618     jam();
2619     Ptr<SubOpRecord> ptr;
2620     ndbassert(subPtr.p->m_start_req.isEmpty());
2621     ndbassert(subPtr.p->m_stop_req.isEmpty());
2622     LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2623     for (list.first(ptr); !ptr.isNull(); )
2624     {
2625       jam();
2626       sendSubCreateRef(signal,
2627                        ptr.p->m_senderRef,
2628                        ptr.p->m_senderData,
2629                        SubCreateRef::TableDropped);
2630 
2631       Ptr<SubOpRecord> tmp0 = ptr;
2632       list.next(ptr);
2633       list.release(tmp0);
2634     }
2635     Ptr<Subscription> tmp1 = subPtr;
2636     subList.next(subPtr);
2637     c_subscriptions.remove(tmp1);
2638     subList.release(tmp1);
2639   }
2640 
2641   c_tables.release(tabPtr);
2642   ndbassert(!empty);
2643 }
2644 
2645 void
execGET_TABINFO_CONF(Signal * signal)2646 Suma::execGET_TABINFO_CONF(Signal* signal){
2647   jamEntry();
2648 
2649   CRASH_INSERTION(13006);
2650 
2651   if(!assembleFragments(signal)){
2652     return;
2653   }
2654 
2655   SectionHandle handle(this, signal);
2656   GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
2657   TablePtr tabPtr;
2658   c_tablePool.getPtr(tabPtr, conf->senderData);
2659   SegmentedSectionPtr ptr;
2660   handle.getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
2661   ndbrequire(tabPtr.p->parseTable(ptr, *this));
2662   releaseSections(handle);
2663 
2664   if (tabPtr.p->m_state == Table::DROPPED)
2665   {
2666     jam();
2667     get_tabinfo_ref_release(signal, tabPtr);
2668     return;
2669   }
2670 
2671   tabPtr.p->m_state = Table::DEFINED;
2672 
2673   LocalDLList<Subscription> subList(c_subscriptionPool,
2674                                     tabPtr.p->m_subscriptions);
2675   Ptr<Subscription> subPtr;
2676   bool empty = subList.isEmpty();
2677   for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
2678   {
2679     jam();
2680     subPtr.p->m_state = Subscription::DEFINED;
2681 
2682     Ptr<SubOpRecord> ptr;
2683     LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2684     for (list.first(ptr); !ptr.isNull();)
2685     {
2686       jam();
2687       SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2688       conf->senderRef  = reference();
2689       conf->senderData = ptr.p->m_senderData;
2690       sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
2691                  SubCreateConf::SignalLength, JBB);
2692 
2693       Ptr<SubOpRecord> tmp = ptr;
2694       list.next(ptr);
2695       list.release(tmp);
2696     }
2697   }
2698 
2699   ndbassert(!empty);
2700 }
2701 
2702 bool
parseTable(SegmentedSectionPtr ptr,Suma & suma)2703 Suma::Table::parseTable(SegmentedSectionPtr ptr,
2704 			Suma &suma)
2705 {
2706   DBUG_ENTER("Suma::Table::parseTable");
2707 
2708   SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool());
2709 
2710   SimpleProperties::UnpackStatus s;
2711   DictTabInfo::Table tableDesc; tableDesc.init();
2712   s = SimpleProperties::unpack(it, &tableDesc,
2713 			       DictTabInfo::TableMapping,
2714 			       DictTabInfo::TableMappingSize,
2715 			       true, true);
2716 
2717   jamBlock(&suma);
2718   suma.suma_ndbrequire(s == SimpleProperties::Break);
2719 
2720   /**
2721    * Initialize table object
2722    */
2723   m_noOfAttributes = tableDesc.NoOfAttributes;
2724   m_schemaVersion = tableDesc.TableVersion;
2725 
2726   DBUG_RETURN(true);
2727 }
2728 
2729 /**********************************************************
2730  *
2731  * Scan interface
2732  *
2733  */
2734 
2735 void
startScan(Signal * signal)2736 Suma::SyncRecord::startScan(Signal* signal)
2737 {
2738   jam();
2739   DBUG_ENTER("Suma::SyncRecord::startScan");
2740 
2741   /**
2742    * Get fraginfo
2743    */
2744   m_currentFragment = 0;
2745   nextScan(signal);
2746   DBUG_VOID_RETURN;
2747 }
2748 
2749 bool
getNextFragment(TablePtr * tab,FragmentDescriptor * fd)2750 Suma::SyncRecord::getNextFragment(TablePtr * tab,
2751                                   FragmentDescriptor * fd)
2752 {
2753   jam();
2754   SubscriptionPtr subPtr;
2755   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2756   DataBuffer<15>::DataBufferIterator fragIt;
2757 
2758   TablePtr tabPtr;
2759   suma.c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2760   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  m_fragments);
2761 
2762   fragBuf.position(fragIt, m_currentFragment);
2763   for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
2764   {
2765     FragmentDescriptor tmp;
2766     tmp.m_dummy = * fragIt.data;
2767     if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
2768       * fd = tmp;
2769       * tab = tabPtr;
2770       return true;
2771     }
2772   }
2773   m_currentFragment = 0;
2774   return false;
2775 }
2776 
2777 void
nextScan(Signal * signal)2778 Suma::SyncRecord::nextScan(Signal* signal)
2779 {
2780   jam();
2781   DBUG_ENTER("Suma::SyncRecord::nextScan");
2782   TablePtr tabPtr;
2783   FragmentDescriptor fd;
2784   SubscriptionPtr subPtr;
2785   if(!getNextFragment(&tabPtr, &fd)){
2786     jam();
2787     completeScan(signal);
2788     DBUG_VOID_RETURN;
2789   }
2790 
2791   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2792 
2793   DataBuffer<15>::Head head = m_attributeList;
2794   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
2795 
2796   Uint32 instanceKey = fd.m_fragDesc.m_lqhInstanceKey;
2797   BlockReference lqhRef = numberToRef(DBLQH, instanceKey, suma.getOwnNodeId());
2798 
2799   ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
2800   const Uint32 parallelism = 16;
2801   //const Uint32 attrLen = 5 + attrBuf.getSize();
2802 
2803   req->senderData = ptrI;
2804   req->resultRef = suma.reference();
2805   req->tableId = tabPtr.p->m_tableId;
2806   req->requestInfo = 0;
2807   req->savePointId = 0;
2808   ScanFragReq::setLockMode(req->requestInfo, 0);
2809   ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2810   ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
2811   if (m_requestInfo & SubSyncReq::NoDisk)
2812   {
2813     ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
2814   }
2815 
2816   if (m_requestInfo & SubSyncReq::LM_Exclusive)
2817   {
2818     ScanFragReq::setLockMode(req->requestInfo, 1);
2819     ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2820     ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
2821   }
2822 
2823   if (m_requestInfo & SubSyncReq::Reorg)
2824   {
2825     ScanFragReq::setReorgFlag(req->requestInfo, ScanFragReq::REORG_MOVED);
2826   }
2827 
2828   if (m_requestInfo & SubSyncReq::TupOrder)
2829   {
2830     ScanFragReq::setTupScanFlag(req->requestInfo, 1);
2831   }
2832 
2833   if (m_requestInfo & SubSyncReq::LM_CommittedRead)
2834   {
2835     ScanFragReq::setReadCommittedFlag(req->requestInfo, 1);
2836   }
2837 
2838   if (m_requestInfo & SubSyncReq::RangeScan)
2839   {
2840     ScanFragReq::setRangeScanFlag(req->requestInfo, 1);
2841   }
2842 
2843   if (m_requestInfo & SubSyncReq::StatScan)
2844   {
2845     ScanFragReq::setStatScanFlag(req->requestInfo, 1);
2846   }
2847 
2848   req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
2849   req->schemaVersion = tabPtr.p->m_schemaVersion;
2850   req->transId1 = 0;
2851   req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2852   req->clientOpPtr = (ptrI << 16);
2853   req->batch_size_rows= parallelism;
2854 
2855   req->batch_size_bytes= 0;
2856 
2857   Uint32 * attrInfo = signal->theData + 25;
2858   attrInfo[0] = attrBuf.getSize();
2859   attrInfo[1] = 0;
2860   attrInfo[2] = 0;
2861   attrInfo[3] = 0;
2862   attrInfo[4] = 0;
2863 
2864   Uint32 pos = 5;
2865   DataBuffer<15>::DataBufferIterator it;
2866   for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it))
2867   {
2868     AttributeHeader::init(&attrInfo[pos++], * it.data, 0);
2869   }
2870   LinearSectionPtr ptr[3];
2871   Uint32 noOfSections;
2872   ptr[0].p = attrInfo;
2873   ptr[0].sz = pos;
2874   noOfSections = 1;
2875   if (m_requestInfo & SubSyncReq::RangeScan)
2876   {
2877     jam();
2878     Uint32 oldpos = pos; // after attrInfo
2879     LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
2880     for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it))
2881     {
2882       attrInfo[pos++] = *it.data;
2883     }
2884     ptr[1].p = &attrInfo[oldpos];
2885     ptr[1].sz = pos - oldpos;
2886     noOfSections = 2;
2887   }
2888   suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
2889 		  ScanFragReq::SignalLength, JBB, ptr, noOfSections);
2890 
2891   m_currentNoOfAttributes = attrBuf.getSize();
2892 
2893   DBUG_VOID_RETURN;
2894 }
2895 
2896 
2897 void
execSCAN_FRAGREF(Signal * signal)2898 Suma::execSCAN_FRAGREF(Signal* signal){
2899   jamEntry();
2900 
2901 //  ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
2902   ndbrequire(false);
2903 }
2904 
2905 void
execSCAN_FRAGCONF(Signal * signal)2906 Suma::execSCAN_FRAGCONF(Signal* signal){
2907   jamEntry();
2908   DBUG_ENTER("Suma::execSCAN_FRAGCONF");
2909   ndbassert(signal->getNoOfSections() == 0);
2910   CRASH_INSERTION(13011);
2911 
2912   ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
2913 
2914   const Uint32 completed = conf->fragmentCompleted;
2915   const Uint32 senderData = conf->senderData;
2916   const Uint32 completedOps = conf->completedOps;
2917 
2918   Ptr<SyncRecord> syncPtr;
2919   c_syncPool.getPtr(syncPtr, senderData);
2920 
2921   if(completed != 2){ // 2==ZSCAN_FRAG_CLOSED
2922     jam();
2923 
2924 #if PRINT_ONLY
2925     SubSyncContinueConf * const conf =
2926       (SubSyncContinueConf*)signal->getDataPtrSend();
2927     conf->subscriptionId = subPtr.p->m_subscriptionId;
2928     conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2929     execSUB_SYNC_CONTINUE_CONF(signal);
2930 #else
2931     SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
2932     req->subscriberData = syncPtr.p->m_senderData;
2933     req->noOfRowsSent = completedOps;
2934     req->senderData = senderData;
2935     sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
2936 	       SubSyncContinueReq::SignalLength, JBB);
2937 #endif
2938     DBUG_VOID_RETURN;
2939   }
2940 
2941   ndbrequire(completedOps == 0);
2942 
2943   syncPtr.p->m_currentFragment++;
2944   syncPtr.p->nextScan(signal);
2945   DBUG_VOID_RETURN;
2946 }
2947 
2948 void
execSUB_SYNC_CONTINUE_CONF(Signal * signal)2949 Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
2950   jamEntry();
2951   ndbassert(signal->getNoOfSections() == 0);
2952 
2953   CRASH_INSERTION(13012);
2954 
2955   SubSyncContinueConf * const conf =
2956     (SubSyncContinueConf*)signal->getDataPtr();
2957 
2958   SubscriptionPtr subPtr;
2959   Subscription key;
2960   key.m_subscriptionId = conf->subscriptionId;
2961   key.m_subscriptionKey = conf->subscriptionKey;
2962   Uint32 syncPtrI = conf->senderData;
2963 
2964   ndbrequire(c_subscriptions.find(subPtr, key));
2965 
2966   Uint32 instanceKey;
2967   {
2968     Ptr<SyncRecord> syncPtr;
2969     c_syncPool.getPtr(syncPtr, syncPtrI);
2970     LocalDataBuffer<15> fragBuf(c_dataBufferPool, syncPtr.p->m_fragments);
2971     DataBuffer<15>::DataBufferIterator fragIt;
2972     bool ok = fragBuf.position(fragIt, syncPtr.p->m_currentFragment);
2973     ndbrequire(ok);
2974     FragmentDescriptor tmp;
2975     tmp.m_dummy = * fragIt.data;
2976     instanceKey = tmp.m_fragDesc.m_lqhInstanceKey;
2977   }
2978   BlockReference lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
2979 
2980   ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
2981   req->senderData = syncPtrI;
2982   req->requestInfo = 0;
2983   req->transId1 = 0;
2984   req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
2985   req->batch_size_rows = 16;
2986   req->batch_size_bytes = 0;
2987   sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
2988 	     ScanFragNextReq::SignalLength, JBB);
2989 }
2990 
2991 void
completeScan(Signal * signal,int error)2992 Suma::SyncRecord::completeScan(Signal* signal, int error)
2993 {
2994   jam();
2995   DBUG_ENTER("Suma::SyncRecord::completeScan");
2996 
2997   SubscriptionPtr subPtr;
2998   suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI);
2999 
3000   DihScanTabCompleteRep* rep = (DihScanTabCompleteRep*)signal->getDataPtr();
3001   rep->tableId = subPtr.p->m_tableId;
3002   rep->scanCookie = m_scan_cookie;
3003   suma.sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal,
3004                   DihScanTabCompleteRep::SignalLength, JBB);
3005 
3006 #if PRINT_ONLY
3007   ndbout_c("GSN_SUB_SYNC_CONF (data)");
3008 #else
3009   if (error == 0)
3010   {
3011     SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
3012     conf->senderRef = suma.reference();
3013     conf->senderData = m_senderData;
3014     suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
3015 		    SubSyncConf::SignalLength, JBB);
3016   }
3017   else
3018   {
3019     SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
3020     ref->senderRef = suma.reference();
3021     ref->senderData = m_senderData;
3022     suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
3023 		    SubSyncRef::SignalLength, JBB);
3024   }
3025 #endif
3026 
3027   release();
3028   LocalDLList<SyncRecord> list(suma.c_syncPool, subPtr.p->m_syncRecords);
3029   Ptr<SyncRecord> tmp;
3030   tmp.i = ptrI;
3031   tmp.p = this;
3032   list.release(tmp);
3033 
3034   DBUG_VOID_RETURN;
3035 }
3036 
3037 void
execSCAN_HBREP(Signal * signal)3038 Suma::execSCAN_HBREP(Signal* signal){
3039   jamEntry();
3040 #if 0
3041   ndbout << "execSCAN_HBREP" << endl << hex;
3042   for(int i = 0; i<signal->length(); i++){
3043     ndbout << signal->theData[i] << " ";
3044     if(((i + 1) % 8) == 0)
3045       ndbout << endl << hex;
3046   }
3047   ndbout << endl;
3048 #endif
3049 }
3050 
3051 /**********************************************************
3052  *
3053  * Suma participant interface
3054  *
3055  * Creation of subscriber
3056  *
3057  */
3058 
3059 void
execSUB_START_REQ(Signal * signal)3060 Suma::execSUB_START_REQ(Signal* signal){
3061   jamEntry();
3062   ndbassert(signal->getNoOfSections() == 0);
3063   DBUG_ENTER("Suma::execSUB_START_REQ");
3064   SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
3065 
3066   CRASH_INSERTION(13013);
3067   Uint32 senderRef            = req->senderRef;
3068   Uint32 senderData           = req->senderData;
3069   Uint32 subscriberData       = req->subscriberData;
3070   Uint32 subscriberRef        = req->subscriberRef;
3071   SubscriptionData::Part part = (SubscriptionData::Part)req->part;
3072   (void)part; // TODO validate part
3073 
3074   Subscription key;
3075   key.m_subscriptionId        = req->subscriptionId;
3076   key.m_subscriptionKey       = req->subscriptionKey;
3077 
3078   SubscriptionPtr subPtr;
3079 
3080   CRASH_INSERTION2(13042, getNodeState().startLevel == NodeState::SL_STARTING);
3081 
3082   if (c_startup.m_restart_server_node_id == RNIL)
3083   {
3084     jam();
3085 
3086     /**
3087      * We havent started syncing yet
3088      */
3089     sendSubStartRef(signal,
3090                     senderRef, senderData, SubStartRef::NotStarted);
3091     return;
3092   }
3093 
3094   bool found = c_subscriptions.find(subPtr, key);
3095   if (!found)
3096   {
3097     jam();
3098     sendSubStartRef(signal,
3099                     senderRef, senderData, SubStartRef::NoSuchSubscription);
3100     return;
3101   }
3102 
3103   if (ERROR_INSERTED(13046))
3104   {
3105     jam();
3106     CLEAR_ERROR_INSERT_VALUE;
3107     sendSubStartRef(signal,
3108                     senderRef, senderData, SubStartRef::NoSuchSubscription);
3109     return;
3110   }
3111 
3112   switch(subPtr.p->m_state){
3113   case Subscription::UNDEFINED:
3114     jam();
3115     ndbrequire(false);
3116   case Subscription::DEFINING:
3117     jam();
3118     sendSubStartRef(signal,
3119                     senderRef, senderData, SubStartRef::Defining);
3120     return;
3121   case Subscription::DEFINED:
3122     break;
3123   }
3124 
3125   if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
3126   {
3127     jam();
3128     if (c_startup.m_restart_server_node_id == 0)
3129     {
3130       sendSubStartRef(signal,
3131                       senderRef, senderData, SubStartRef::Dropped);
3132       return;
3133     }
3134     else
3135     {
3136       /**
3137        * Allow SUB_START_REQ from peer node
3138        */
3139     }
3140   }
3141 
3142   if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
3143   {
3144     jam();
3145     sendSubStartRef(signal,
3146                     senderRef, senderData, subPtr.p->m_errorCode);
3147     return;
3148   }
3149 
3150   SubscriberPtr subbPtr;
3151   if(!c_subscriberPool.seize(subbPtr))
3152   {
3153     jam();
3154     sendSubStartRef(signal,
3155                     senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
3156     return;
3157   }
3158 
3159   Ptr<SubOpRecord> subOpPtr;
3160   if (!c_subOpPool.seize(subOpPtr))
3161   {
3162     jam();
3163     c_subscriberPool.release(subbPtr);
3164     sendSubStartRef(signal,
3165                     senderRef, senderData, SubStartRef::OutOfSubOpRecords);
3166     return;
3167   }
3168 
3169   if (! check_sub_start(subscriberRef))
3170   {
3171     jam();
3172     c_subscriberPool.release(subbPtr);
3173     c_subOpPool.release(subOpPtr);
3174     sendSubStartRef(signal,
3175                     senderRef, senderData, SubStartRef::NodeDied);
3176     return;
3177   }
3178 
3179   // setup subscriber record
3180   subbPtr.p->m_senderRef  = subscriberRef;
3181   subbPtr.p->m_senderData = subscriberData;
3182 
3183   subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
3184   subOpPtr.p->m_subPtrI = subPtr.i;
3185   subOpPtr.p->m_senderRef = senderRef;
3186   subOpPtr.p->m_senderData = senderData;
3187   subOpPtr.p->m_subscriberRef = subbPtr.i;
3188 
3189   {
3190     LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3191     subOpList.add(subOpPtr);
3192   }
3193 
3194   /**
3195    * Check triggers
3196    */
3197   switch(subPtr.p->m_trigger_state){
3198   case Subscription::T_UNDEFINED:
3199     jam();
3200     /**
3201      * create triggers
3202      */
3203     create_triggers(signal, subPtr);
3204     break;
3205   case Subscription::T_CREATING:
3206     jam();
3207     /**
3208      * Triggers are already being created...wait for completion
3209      */
3210     return;
3211   case Subscription::T_DROPPING:
3212     jam();
3213     /**
3214      * Trigger(s) are being dropped...wait for completion
3215      *   (and recreate them when done)
3216      */
3217     break;
3218   case Subscription::T_DEFINED:{
3219     jam();
3220     report_sub_start_conf(signal, subPtr);
3221     return;
3222   }
3223   case Subscription::T_ERROR:
3224     jam();
3225     ndbrequire(false); // Checked above
3226     break;
3227   }
3228 }
3229 
3230 void
sendSubStartRef(Signal * signal,Uint32 dstref,Uint32 data,Uint32 err)3231 Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
3232 {
3233   jam();
3234   SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
3235   ref->senderRef = reference();
3236   ref->senderData = data;
3237   ref->errorCode = err;
3238   sendSignal(dstref, GSN_SUB_START_REF, signal,
3239 	     SubStartRef::SignalLength, JBB);
3240 }
3241 
3242 void
create_triggers(Signal * signal,SubscriptionPtr subPtr)3243 Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr)
3244 {
3245   jam();
3246 
3247   ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
3248   subPtr.p->m_trigger_state = Subscription::T_CREATING;
3249 
3250   TablePtr tabPtr;
3251   c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3252 
3253   AttributeMask attrMask;
3254   tabPtr.p->createAttributeMask(attrMask, *this);
3255 
3256   subPtr.p->m_outstanding_trigger = 3;
3257   for(Uint32 j = 0; j<3; j++)
3258   {
3259     Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i;
3260     ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
3261 
3262     CreateTrigImplReq * const req =
3263       (CreateTrigImplReq*)signal->getDataPtrSend();
3264     req->senderRef = SUMA_REF;
3265     req->senderData = subPtr.i;
3266     req->requestType = 0;
3267 
3268     Uint32 ti = 0;
3269     TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3270     TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3271     TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3272     TriggerInfo::setMonitorReplicas(ti, true);
3273     //TriggerInfo::setMonitorAllAttributes(ti, j == TriggerEvent::TE_DELETE);
3274     TriggerInfo::setMonitorAllAttributes(ti, true);
3275     TriggerInfo::setReportAllMonitoredAttributes(ti,
3276        subPtr.p->m_options & Subscription::REPORT_ALL);
3277     req->triggerInfo = ti;
3278 
3279     req->receiverRef = SUMA_REF;
3280     req->triggerId = triggerId;
3281     req->tableId = subPtr.p->m_tableId;
3282     req->tableVersion = 0; // not used
3283     req->indexId = ~(Uint32)0;
3284     req->indexVersion = 0;
3285 
3286     LinearSectionPtr ptr[3];
3287     ptr[0].p = attrMask.rep.data;
3288     ptr[0].sz = attrMask.getSizeInWords();
3289     sendSignal(DBTUP_REF, GSN_CREATE_TRIG_IMPL_REQ,
3290                signal, CreateTrigImplReq::SignalLength, JBB, ptr, 1);
3291   }
3292 }
3293 
3294 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)3295 Suma::execCREATE_TRIG_IMPL_CONF(Signal* signal)
3296 {
3297   jamEntry();
3298 
3299   CreateTrigImplConf * conf = (CreateTrigImplConf*)signal->getDataPtr();
3300   const Uint32 triggerId = conf->triggerId;
3301   Uint32 type = (triggerId >> 16) & 0x3;
3302   Uint32 tableId = conf->tableId;
3303 
3304   TablePtr tabPtr;
3305   SubscriptionPtr subPtr;
3306   c_subscriptions.getPtr(subPtr, conf->senderData);
3307   c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3308 
3309   ndbrequire(tabPtr.p->m_tableId == tableId);
3310   ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3311 
3312   ndbrequire(type < 3);
3313   ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3314   subPtr.p->m_triggers[type] = triggerId;
3315 
3316   ndbrequire(subPtr.p->m_outstanding_trigger);
3317   subPtr.p->m_outstanding_trigger--;
3318 
3319   if (subPtr.p->m_outstanding_trigger)
3320   {
3321     jam();
3322     /**
3323      * Wait for more
3324      */
3325     return;
3326   }
3327 
3328   if (subPtr.p->m_errorCode == 0)
3329   {
3330     jam();
3331     subPtr.p->m_trigger_state = Subscription::T_DEFINED;
3332     report_sub_start_conf(signal, subPtr);
3333   }
3334   else
3335   {
3336     jam();
3337     subPtr.p->m_trigger_state = Subscription::T_ERROR;
3338     drop_triggers(signal, subPtr);
3339   }
3340 }
3341 
3342 void
execCREATE_TRIG_IMPL_REF(Signal * signal)3343 Suma::execCREATE_TRIG_IMPL_REF(Signal* signal)
3344 {
3345   jamEntry();
3346 
3347   CreateTrigImplRef * const ref = (CreateTrigImplRef*)signal->getDataPtr();
3348   const Uint32 triggerId = ref->triggerId;
3349   Uint32 type = (triggerId >> 16) & 0x3;
3350   Uint32 tableId = ref->tableId;
3351 
3352   TablePtr tabPtr;
3353   SubscriptionPtr subPtr;
3354   c_subscriptions.getPtr(subPtr, ref->senderData);
3355   c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3356 
3357   ndbrequire(tabPtr.p->m_tableId == tableId);
3358   ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3359 
3360   ndbrequire(type < 3);
3361   ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3362 
3363   subPtr.p->m_errorCode = ref->errorCode;
3364 
3365   ndbrequire(subPtr.p->m_outstanding_trigger);
3366   subPtr.p->m_outstanding_trigger--;
3367 
3368   if (subPtr.p->m_outstanding_trigger)
3369   {
3370     jam();
3371     /**
3372      * Wait for more
3373      */
3374     return;
3375   }
3376 
3377   subPtr.p->m_trigger_state = Subscription::T_ERROR;
3378   drop_triggers(signal, subPtr);
3379 }
3380 
3381 bool
check_sub_start(Uint32 subscriberRef)3382 Suma::check_sub_start(Uint32 subscriberRef)
3383 {
3384   Uint32 nodeId = refToNode(subscriberRef);
3385   bool startme = c_startup.m_restart_server_node_id;
3386   bool handover = c_startup.m_wait_handover;
3387   bool connected =
3388     c_failedApiNodes.get(nodeId) == false &&
3389     c_connected_nodes.get(nodeId);
3390 
3391   return (startme || handover || connected);
3392 }
3393 
3394 void
report_sub_start_conf(Signal * signal,Ptr<Subscription> subPtr)3395 Suma::report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr)
3396 {
3397   const Uint64 gci = get_current_gci(signal);
3398   {
3399     LocalDLList<Subscriber> list(c_subscriberPool,
3400                                  subPtr.p->m_subscribers);
3401     LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3402 
3403     Ptr<Subscriber> ptr;
3404     Ptr<SubOpRecord> subOpPtr;
3405     for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3406     {
3407       jam();
3408 
3409       Uint32 senderRef = subOpPtr.p->m_senderRef;
3410       Uint32 senderData = subOpPtr.p->m_senderData;
3411       c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3412 
3413       if (check_sub_start(ptr.p->m_senderRef))
3414       {
3415         SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
3416         conf->senderRef       = reference();
3417         conf->senderData      = senderData;
3418         conf->subscriptionId  = subPtr.p->m_subscriptionId;
3419         conf->subscriptionKey = subPtr.p->m_subscriptionKey;
3420         conf->firstGCI        = Uint32(gci >> 32);
3421         conf->part            = SubscriptionData::TableData;
3422         conf->bucketCount     = c_no_of_buckets;
3423         conf->nodegroup       = c_nodeGroup;
3424         sendSignal(senderRef, GSN_SUB_START_CONF, signal,
3425                    SubStartConf::SignalLength, JBB);
3426 
3427         /**
3428          * Call before adding to list...
3429          *   cause method will (maybe) iterate thought list
3430          */
3431         bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3432         send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE,
3433                                   report, list);
3434 
3435         list.add(ptr);
3436         c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef));
3437         c_subscriber_per_node[refToNode(ptr.p->m_senderRef)]++;
3438       }
3439       else
3440       {
3441         jam();
3442 
3443         sendSubStartRef(signal,
3444                         senderRef, senderData, SubStartRef::NodeDied);
3445 
3446         c_subscriberPool.release(ptr);
3447       }
3448 
3449       Ptr<SubOpRecord> tmp = subOpPtr;
3450       subOpList.next(subOpPtr);
3451       subOpList.release(tmp);
3452     }
3453   }
3454 
3455   check_release_subscription(signal, subPtr);
3456 }
3457 
3458 void
report_sub_start_ref(Signal * signal,Ptr<Subscription> subPtr,Uint32 errCode)3459 Suma::report_sub_start_ref(Signal* signal,
3460                            Ptr<Subscription> subPtr,
3461                            Uint32 errCode)
3462 {
3463   LocalDLList<Subscriber> list(c_subscriberPool,
3464                                subPtr.p->m_subscribers);
3465   LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3466 
3467   Ptr<Subscriber> ptr;
3468   Ptr<SubOpRecord> subOpPtr;
3469   for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3470   {
3471     jam();
3472 
3473     Uint32 senderRef = subOpPtr.p->m_senderRef;
3474     Uint32 senderData = subOpPtr.p->m_senderData;
3475     c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3476 
3477     SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
3478     ref->senderRef  = reference();
3479     ref->senderData = senderData;
3480     ref->errorCode  = errCode;
3481 
3482     sendSignal(senderRef, GSN_SUB_START_REF, signal,
3483                SubStartConf::SignalLength, JBB);
3484 
3485 
3486     Ptr<SubOpRecord> tmp = subOpPtr;
3487     subOpList.next(subOpPtr);
3488     subOpList.release(tmp);
3489     c_subscriberPool.release(ptr);
3490   }
3491 }
3492 
3493 void
drop_triggers(Signal * signal,SubscriptionPtr subPtr)3494 Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr)
3495 {
3496   jam();
3497 
3498   subPtr.p->m_outstanding_trigger = 0;
3499 
3500   Ptr<Table> tabPtr;
3501   c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3502   if (tabPtr.p->m_state == Table::DROPPED)
3503   {
3504     jam();
3505     subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
3506     subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
3507     subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
3508   }
3509   else
3510   {
3511     for(Uint32 j = 0; j<3; j++)
3512     {
3513       jam();
3514       Uint32 triggerId = subPtr.p->m_triggers[j];
3515       if (triggerId != ILLEGAL_TRIGGER_ID)
3516       {
3517         subPtr.p->m_outstanding_trigger++;
3518 
3519         DropTrigImplReq * const req =
3520           (DropTrigImplReq*)signal->getDataPtrSend();
3521         req->senderRef = SUMA_REF; // Sending to myself
3522         req->senderData = subPtr.i;
3523         req->requestType = 0;
3524 
3525         // TUP needs some triggerInfo to find right list
3526         Uint32 ti = 0;
3527         TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3528         TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3529         TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3530         TriggerInfo::setMonitorReplicas(ti, true);
3531         //TriggerInfo::setMonitorAllAttributes(ti, j ==TriggerEvent::TE_DELETE);
3532         TriggerInfo::setMonitorAllAttributes(ti, true);
3533         TriggerInfo::setReportAllMonitoredAttributes(ti,
3534                   subPtr.p->m_options & Subscription::REPORT_ALL);
3535         req->triggerInfo = ti;
3536 
3537         req->tableId = subPtr.p->m_tableId;
3538         req->tableVersion = 0; // not used
3539         req->indexId = RNIL;
3540         req->indexVersion = 0;
3541         req->triggerId = triggerId;
3542         req->receiverRef = SUMA_REF;
3543 
3544         c_outstanding_drop_trig_req++;
3545         sendSignal(DBTUP_REF, GSN_DROP_TRIG_IMPL_REQ,
3546                    signal, DropTrigImplReq::SignalLength, JBB);
3547       }
3548     }
3549   }
3550 
3551   if (subPtr.p->m_outstanding_trigger == 0)
3552   {
3553     jam();
3554     drop_triggers_complete(signal, subPtr);
3555   }
3556 }
3557 
3558 void
execDROP_TRIG_IMPL_REF(Signal * signal)3559 Suma::execDROP_TRIG_IMPL_REF(Signal* signal)
3560 {
3561   jamEntry();
3562   DropTrigImplRef * const ref = (DropTrigImplRef*)signal->getDataPtr();
3563   Ptr<Table> tabPtr;
3564   Ptr<Subscription> subPtr;
3565   const Uint32 triggerId = ref->triggerId;
3566   const Uint32 type = (triggerId >> 16) & 0x3;
3567 
3568   c_subscriptionPool.getPtr(subPtr, ref->senderData);
3569   c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3570   ndbrequire(tabPtr.p->m_tableId == ref->tableId);
3571 
3572   ndbrequire(type < 3);
3573   ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3574   subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3575 
3576   ndbrequire(subPtr.p->m_outstanding_trigger);
3577   subPtr.p->m_outstanding_trigger--;
3578 
3579   ndbrequire(c_outstanding_drop_trig_req);
3580   c_outstanding_drop_trig_req--;
3581 
3582   if (subPtr.p->m_outstanding_trigger)
3583   {
3584     jam();
3585     /**
3586      * Wait for more
3587      */
3588     return;
3589   }
3590 
3591   drop_triggers_complete(signal, subPtr);
3592 }
3593 
3594 void
execDROP_TRIG_IMPL_CONF(Signal * signal)3595 Suma::execDROP_TRIG_IMPL_CONF(Signal* signal)
3596 {
3597   jamEntry();
3598 
3599   DropTrigImplConf * const conf = (DropTrigImplConf*)signal->getDataPtr();
3600 
3601   Ptr<Table> tabPtr;
3602   Ptr<Subscription> subPtr;
3603   const Uint32 triggerId = conf->triggerId;
3604   const Uint32 type = (triggerId >> 16) & 0x3;
3605 
3606   c_subscriptionPool.getPtr(subPtr, conf->senderData);
3607   c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3608   ndbrequire(tabPtr.p->m_tableId == conf->tableId);
3609 
3610   ndbrequire(type < 3);
3611   ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3612   subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3613 
3614   ndbrequire(subPtr.p->m_outstanding_trigger);
3615   subPtr.p->m_outstanding_trigger--;
3616 
3617   ndbrequire(c_outstanding_drop_trig_req);
3618   c_outstanding_drop_trig_req--;
3619 
3620   if (subPtr.p->m_outstanding_trigger)
3621   {
3622     jam();
3623     /**
3624      * Wait for more
3625      */
3626     return;
3627   }
3628 
3629   drop_triggers_complete(signal, subPtr);
3630 }
3631 
3632 void
drop_triggers_complete(Signal * signal,Ptr<Subscription> subPtr)3633 Suma::drop_triggers_complete(Signal* signal, Ptr<Subscription> subPtr)
3634 {
3635   switch(subPtr.p->m_trigger_state){
3636   case Subscription::T_UNDEFINED:
3637   case Subscription::T_CREATING:
3638   case Subscription::T_DEFINED:
3639     jam();
3640     ndbrequire(false);
3641     break;
3642   case Subscription::T_DROPPING:
3643     jam();
3644     /**
3645      */
3646     subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3647     if (!subPtr.p->m_start_req.isEmpty())
3648     {
3649       jam();
3650       create_triggers(signal, subPtr);
3651       return;
3652     }
3653     break;
3654   case Subscription::T_ERROR:
3655     jam();
3656     Uint32 err = subPtr.p->m_errorCode;
3657     subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3658     subPtr.p->m_errorCode = 0;
3659     report_sub_start_ref(signal, subPtr, err);
3660     break;
3661   }
3662 
3663   check_release_subscription(signal, subPtr);
3664 }
3665 
3666 /**********************************************************
3667  * Suma participant interface
3668  *
3669  * Stopping and removing of subscriber
3670  *
3671  */
3672 
3673 void
execSUB_STOP_REQ(Signal * signal)3674 Suma::execSUB_STOP_REQ(Signal* signal){
3675   jamEntry();
3676   ndbassert(signal->getNoOfSections() == 0);
3677   DBUG_ENTER("Suma::execSUB_STOP_REQ");
3678 
3679   CRASH_INSERTION(13019);
3680 
3681   SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
3682   Uint32 senderRef      = req->senderRef;
3683   Uint32 senderData     = req->senderData;
3684   Uint32 subscriberRef  = req->subscriberRef;
3685   Uint32 subscriberData = req->subscriberData;
3686   SubscriptionPtr subPtr;
3687   Subscription key;
3688   key.m_subscriptionId  = req->subscriptionId;
3689   key.m_subscriptionKey = req->subscriptionKey;
3690   bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START);
3691 
3692   if (c_startup.m_restart_server_node_id == RNIL)
3693   {
3694     jam();
3695 
3696     /**
3697      * We havent started syncing yet
3698      */
3699     sendSubStopRef(signal,
3700                    senderRef, senderData, SubStopRef::NotStarted);
3701     return;
3702   }
3703 
3704   bool found = c_subscriptions.find(subPtr, key);
3705   if (!found)
3706   {
3707     jam();
3708     sendSubStopRef(signal,
3709                    senderRef, senderData, SubStopRef::NoSuchSubscription);
3710     return;
3711   }
3712 
3713   switch(subPtr.p->m_state){
3714   case Subscription::UNDEFINED:
3715     jam();
3716     ndbrequire(false);
3717   case Subscription::DEFINING:
3718     jam();
3719     sendSubStopRef(signal,
3720                    senderRef, senderData, SubStopRef::Defining);
3721     return;
3722   case Subscription::DEFINED:
3723     jam();
3724     break;
3725   }
3726 
3727   Ptr<SubOpRecord> subOpPtr;
3728   LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3729   bool empty = list.isEmpty();
3730   if (list.seize(subOpPtr) == false)
3731   {
3732     jam();
3733     sendSubStopRef(signal,
3734                    senderRef, senderData, SubStopRef::OutOfSubOpRecords);
3735     return;
3736   }
3737 
3738   if (abortStart)
3739   {
3740     jam();
3741     subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ;
3742   }
3743   else
3744   {
3745     jam();
3746     subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
3747   }
3748   subOpPtr.p->m_subPtrI = subPtr.i;
3749   subOpPtr.p->m_senderRef = senderRef;
3750   subOpPtr.p->m_senderData = senderData;
3751   subOpPtr.p->m_subscriberRef = subscriberRef;
3752   subOpPtr.p->m_subscriberData = subscriberData;
3753 
3754 
3755   if (empty)
3756   {
3757     jam();
3758     signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3759     signal->theData[1] = subOpPtr.i;
3760     signal->theData[2] = RNIL;
3761     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3762   }
3763 }
3764 
3765 void
sub_stop_req(Signal * signal)3766 Suma::sub_stop_req(Signal* signal)
3767 {
3768   jam();
3769 
3770   Ptr<SubOpRecord> subOpPtr;
3771   c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
3772 
3773   Ptr<Subscription> subPtr;
3774   c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
3775 
3776   Ptr<Subscriber> ptr;
3777   {
3778     LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3779     if (signal->theData[2] == RNIL)
3780     {
3781       jam();
3782       list.first(ptr);
3783     }
3784     else
3785     {
3786       jam();
3787       list.getPtr(ptr, signal->theData[2]);
3788     }
3789 
3790     for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr))
3791     {
3792       if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
3793           ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
3794       {
3795         jam();
3796         goto found;
3797       }
3798     }
3799   }
3800 
3801   if (ptr.isNull())
3802   {
3803     jam();
3804     sendSubStopRef(signal,
3805                    subOpPtr.p->m_senderRef,
3806                    subOpPtr.p->m_senderData,
3807                    SubStopRef::NoSuchSubscriber);
3808     check_remove_queue(signal, subPtr, subOpPtr, true, true);
3809     return;
3810   }
3811 
3812   signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3813   signal->theData[1] = subOpPtr.i;
3814   signal->theData[2] = ptr.i;
3815   sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3816   return;
3817 
3818 found:
3819   {
3820     LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3821     list.remove(ptr);
3822     /**
3823      * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
3824      */
3825     bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3826     report_sub_stop_conf(signal, subOpPtr, ptr, report, list);
3827     c_subscriberPool.release(ptr);
3828   }
3829   check_remove_queue(signal, subPtr, subOpPtr, true, true);
3830   check_release_subscription(signal, subPtr);
3831 }
3832 
3833 void
check_remove_queue(Signal * signal,Ptr<Subscription> subPtr,Ptr<SubOpRecord> subOpPtr,bool ishead,bool dorelease)3834 Suma::check_remove_queue(Signal* signal,
3835                          Ptr<Subscription> subPtr,
3836                          Ptr<SubOpRecord> subOpPtr,
3837                          bool ishead,
3838                          bool dorelease)
3839 {
3840   LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3841 
3842   {
3843     Ptr<SubOpRecord> tmp;
3844     list.first(tmp);
3845     if (ishead)
3846     {
3847       jam();
3848       ndbrequire(tmp.i == subOpPtr.i);
3849     }
3850     else
3851     {
3852       jam();
3853       ishead = (tmp.i == subOpPtr.i);
3854     }
3855   }
3856 
3857   if (dorelease)
3858   {
3859     jam();
3860     list.release(subOpPtr);
3861   }
3862   else
3863   {
3864     jam();
3865     list.remove(subOpPtr);
3866   }
3867 
3868   if (ishead)
3869   {
3870     jam();
3871     if (list.first(subOpPtr) == false)
3872     {
3873       jam();
3874       c_restart.m_waiting_on_self = 1;
3875       return;
3876     }
3877     // Fall through
3878   }
3879   else
3880   {
3881     jam();
3882     return;
3883   }
3884 
3885   switch(subOpPtr.p->m_opType){
3886   case SubOpRecord::R_SUB_ABORT_START_REQ:
3887   case SubOpRecord::R_SUB_STOP_REQ:
3888     jam();
3889     signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3890     signal->theData[1] = subOpPtr.i;
3891     signal->theData[2] = RNIL;
3892     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3893     return;
3894   case SubOpRecord::R_API_FAIL_REQ:
3895     jam();
3896     signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
3897     signal->theData[1] = subOpPtr.i;
3898     signal->theData[2] = RNIL;
3899     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3900     return;
3901   case SubOpRecord::R_START_ME_REQ:
3902     jam();
3903     sendSubCreateReq(signal, subPtr);
3904     return;
3905   }
3906 }
3907 
3908 void
report_sub_stop_conf(Signal * signal,Ptr<SubOpRecord> subOpPtr,Ptr<Subscriber> ptr,bool report,LocalDLList<Subscriber> & list)3909 Suma::report_sub_stop_conf(Signal* signal,
3910                            Ptr<SubOpRecord> subOpPtr,
3911                            Ptr<Subscriber> ptr,
3912                            bool report,
3913                            LocalDLList<Subscriber>& list)
3914 {
3915   jam();
3916   CRASH_INSERTION(13020);
3917 
3918   Uint32 senderRef = subOpPtr.p->m_senderRef;
3919   Uint32 senderData = subOpPtr.p->m_senderData;
3920   bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ;
3921 
3922   // let subscriber know that subscrber is stopped
3923   if (!abortStart)
3924   {
3925     jam();
3926     send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP,
3927                               report, list);
3928   }
3929 
3930   SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
3931   const Uint64 gci = m_max_seen_gci;
3932   conf->senderRef= reference();
3933   conf->senderData= senderData;
3934   conf->gci_hi= Uint32(gci>>32);
3935   conf->gci_lo= Uint32(gci);
3936   sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
3937 	     SubStopConf::SignalLength, JBB);
3938 
3939   Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3940   if (c_subscriber_per_node[nodeId])
3941   {
3942     c_subscriber_per_node[nodeId]--;
3943     if (c_subscriber_per_node[nodeId] == 0)
3944     {
3945       jam();
3946       c_subscriber_nodes.clear(nodeId);
3947     }
3948   }
3949 }
3950 
3951 void
sendSubStopRef(Signal * signal,Uint32 retref,Uint32 data,Uint32 errCode)3952 Suma::sendSubStopRef(Signal* signal,
3953                      Uint32 retref,
3954                      Uint32 data,
3955                      Uint32 errCode)
3956 {
3957   jam();
3958   SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
3959   ref->senderRef = reference();
3960   ref->errorCode = errCode;
3961   ref->senderData = data;
3962   sendSignal(retref, GSN_SUB_STOP_REF, signal,  SubStopRef::SignalLength, JBB);
3963 }
3964 
3965 // report new started subscriber to all other subscribers
3966 void
send_sub_start_stop_event(Signal * signal,Ptr<Subscriber> ptr,NdbDictionary::Event::_TableEvent event,bool report,LocalDLList<Subscriber> & list)3967 Suma::send_sub_start_stop_event(Signal *signal,
3968                                 Ptr<Subscriber> ptr,
3969                                 NdbDictionary::Event::_TableEvent event,
3970                                 bool report,
3971                                 LocalDLList<Subscriber>& list)
3972 {
3973   const Uint64 gci = get_current_gci(signal);
3974   SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
3975   Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3976 
3977   NdbDictionary::Event::_TableEvent other;
3978   if (event == NdbDictionary::Event::_TE_STOP)
3979   {
3980     other = NdbDictionary::Event::_TE_UNSUBSCRIBE;
3981   }
3982   else if (event == NdbDictionary::Event::_TE_ACTIVE)
3983   {
3984     other = NdbDictionary::Event::_TE_SUBSCRIBE;
3985   }
3986   else
3987   {
3988     jamLine(event);
3989     ndbrequire(false);
3990   }
3991 
3992   data->gci_hi         = Uint32(gci >> 32);
3993   data->gci_lo         = Uint32(gci);
3994   data->tableId        = 0;
3995   data->requestInfo    = 0;
3996   SubTableData::setOperation(data->requestInfo, event);
3997   SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
3998   SubTableData::setReqNodeId(data->requestInfo, nodeId);
3999   data->changeMask     = 0;
4000   data->totalLen       = 0;
4001   data->senderData     = ptr.p->m_senderData;
4002   sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4003              SubTableData::SignalLength, JBB);
4004 
4005   if (report == false)
4006   {
4007     return;
4008   }
4009 
4010   data->requestInfo    = 0;
4011   SubTableData::setOperation(data->requestInfo, other);
4012   SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
4013 
4014   Ptr<Subscriber> tmp;
4015   for(list.first(tmp); !tmp.isNull(); list.next(tmp))
4016   {
4017     jam();
4018     SubTableData::setReqNodeId(data->requestInfo, nodeId);
4019     data->senderData = tmp.p->m_senderData;
4020     sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4021                SubTableData::SignalLength, JBB);
4022 
4023     ndbassert(tmp.i != ptr.i); // ptr should *NOT* be in list now
4024     if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE)
4025     {
4026       jam();
4027       SubTableData::setReqNodeId(data->requestInfo,
4028                                  refToNode(tmp.p->m_senderRef));
4029 
4030       data->senderData = ptr.p->m_senderData;
4031       sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4032                  SubTableData::SignalLength, JBB);
4033     }
4034   }
4035 }
4036 
4037 void
createAttributeMask(AttributeMask & mask,Suma & suma)4038 Suma::Table::createAttributeMask(AttributeMask& mask,
4039                                  Suma &suma)
4040 {
4041   mask.clear();
4042   for(Uint32 i = 0; i<m_noOfAttributes; i++)
4043     mask.set(i);
4044 }
4045 
suma_ndbrequire(bool v)4046 void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
4047 
4048 
4049 /**********************************************************
4050  * Scan data interface
4051  *
4052  * Assumption: one execTRANSID_AI contains all attr info
4053  *
4054  */
4055 
4056 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
4057 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
4058 
4059 static Uint32 f_bufferLock = 0;
4060 static Uint32 f_buffer[SUMA_BUF_SZ];
4061 static Uint32 f_trigBufferSize = 0;
4062 static Uint32 b_bufferLock = 0;
4063 static Uint32 b_buffer[SUMA_BUF_SZ];
4064 static Uint32 b_trigBufferSize = 0;
4065 
4066 void
execTRANSID_AI(Signal * signal)4067 Suma::execTRANSID_AI(Signal* signal)
4068 {
4069   jamEntry();
4070   DBUG_ENTER("Suma::execTRANSID_AI");
4071 
4072   CRASH_INSERTION(13015);
4073   TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
4074   const Uint32 opPtrI = data->connectPtr;
4075   Uint32 length = signal->length() - 3;
4076 
4077   if(f_bufferLock == 0){
4078     f_bufferLock = opPtrI;
4079   } else {
4080     ndbrequire(f_bufferLock == opPtrI);
4081   }
4082 
4083   if (signal->getNoOfSections())
4084   {
4085     SectionHandle handle(this, signal);
4086     SegmentedSectionPtr dataPtr;
4087     handle.getSection(dataPtr, 0);
4088     length = dataPtr.sz;
4089     copy(data->attrData, dataPtr);
4090     releaseSections(handle);
4091   }
4092 
4093   Ptr<SyncRecord> syncPtr;
4094   c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4095 
4096   Uint32 sum = 0;
4097   Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4098   Uint32 * headers = f_buffer;
4099   const Uint32 * src = &data->attrData[0];
4100   const Uint32 * const end = &src[length];
4101 
4102   const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4103   for(Uint32 i = 0; i<attribs; i++){
4104     Uint32 tmp = * src++;
4105     * headers++ = tmp;
4106     Uint32 len = AttributeHeader::getDataSize(tmp);
4107 
4108     memcpy(dst, src, 4 * len);
4109     dst += len;
4110     src += len;
4111     sum += len;
4112   }
4113   f_trigBufferSize = sum;
4114 
4115   ndbrequire(src == end);
4116 
4117   if ((syncPtr.p->m_requestInfo & SubSyncReq::LM_Exclusive) == 0)
4118   {
4119     sendScanSubTableData(signal, syncPtr, 0);
4120   }
4121 
4122   DBUG_VOID_RETURN;
4123 }
4124 
4125 void
execKEYINFO20(Signal * signal)4126 Suma::execKEYINFO20(Signal* signal)
4127 {
4128   jamEntry();
4129   KeyInfo20* data = (KeyInfo20*)signal->getDataPtr();
4130 
4131   const Uint32 opPtrI = data->clientOpPtr;
4132   const Uint32 takeOver = data->scanInfo_Node;
4133 
4134   ndbrequire(f_bufferLock == opPtrI);
4135 
4136   Ptr<SyncRecord> syncPtr;
4137   c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4138   sendScanSubTableData(signal, syncPtr, takeOver);
4139 }
4140 
4141 void
sendScanSubTableData(Signal * signal,Ptr<SyncRecord> syncPtr,Uint32 takeOver)4142 Suma::sendScanSubTableData(Signal* signal,
4143                            Ptr<SyncRecord> syncPtr, Uint32 takeOver)
4144 {
4145   const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4146   const Uint32 sum =  f_trigBufferSize;
4147 
4148   /**
4149    * Send data to subscriber
4150    */
4151   LinearSectionPtr ptr[3];
4152   ptr[0].p = f_buffer;
4153   ptr[0].sz = attribs;
4154 
4155   ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4156   ptr[1].sz = sum;
4157 
4158   SubscriptionPtr subPtr;
4159   c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
4160 
4161 
4162   /**
4163    * Initialize signal
4164    */
4165   SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
4166   Uint32 ref = syncPtr.p->m_senderRef;
4167   sdata->tableId = syncPtr.p->m_tableId;
4168   sdata->senderData = syncPtr.p->m_senderData;
4169   sdata->requestInfo = 0;
4170   SubTableData::setOperation(sdata->requestInfo,
4171 			     NdbDictionary::Event::_TE_SCAN); // Scan
4172   sdata->gci_hi = 0; // Undefined
4173   sdata->gci_lo = 0;
4174   sdata->takeOver = takeOver;
4175 #if PRINT_ONLY
4176   ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
4177 #else
4178   sendSignal(ref,
4179 	     GSN_SUB_TABLE_DATA,
4180 	     signal,
4181 	     SubTableData::SignalLength, JBB,
4182 	     ptr, 2);
4183 #endif
4184 
4185   /**
4186    * Reset f_bufferLock
4187    */
4188   f_bufferLock = 0;
4189 }
4190 
4191 /**********************************************************
4192  *
4193  * Trigger data interface
4194  *
4195  */
4196 
4197 void
execTRIG_ATTRINFO(Signal * signal)4198 Suma::execTRIG_ATTRINFO(Signal* signal)
4199 {
4200   jamEntry();
4201   DBUG_ENTER("Suma::execTRIG_ATTRINFO");
4202 
4203   CRASH_INSERTION(13016);
4204   TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
4205   const Uint32 trigId = trg->getTriggerId();
4206 
4207   const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
4208 
4209   if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
4210     jam();
4211 
4212     ndbrequire(b_bufferLock == trigId);
4213 
4214     memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
4215     b_trigBufferSize += dataLen;
4216 
4217     // printf("before values %u %u %u\n",trigId, dataLen,  b_trigBufferSize);
4218   } else {
4219     jam();
4220 
4221     if(f_bufferLock == 0){
4222       f_bufferLock = trigId;
4223       f_trigBufferSize = 0;
4224       b_bufferLock = trigId;
4225       b_trigBufferSize = 0;
4226     } else {
4227       ndbrequire(f_bufferLock == trigId);
4228     }
4229 
4230     memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
4231     f_trigBufferSize += dataLen;
4232   }
4233 
4234 
4235   DBUG_VOID_RETURN;
4236 }
4237 
4238 #ifdef NODEFAIL_DEBUG2
4239 static int theCounts[64] = {0};
4240 #endif
4241 
4242 Uint32
get_responsible_node(Uint32 bucket) const4243 Suma::get_responsible_node(Uint32 bucket) const
4244 {
4245   // id will contain id to responsible suma or
4246   // RNIL if we don't have nodegroup info yet
4247 
4248   jam();
4249   Uint32 node;
4250   const Bucket* ptr= c_buckets + bucket;
4251   for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4252   {
4253     node= ptr->m_nodes[i];
4254     if(c_alive_nodes.get(node))
4255     {
4256 #ifdef NODEFAIL_DEBUG2
4257       theCounts[node]++;
4258       ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
4259                n,D, id, theCounts[node]);
4260 #endif
4261       return node;
4262     }
4263   }
4264 
4265   return 0;
4266 }
4267 
4268 Uint32
get_responsible_node(Uint32 bucket,const NdbNodeBitmask & mask) const4269 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
4270 {
4271   jam();
4272   Uint32 node;
4273   const Bucket* ptr= c_buckets + bucket;
4274   for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4275   {
4276     node= ptr->m_nodes[i];
4277     if(mask.get(node))
4278     {
4279       return node;
4280     }
4281   }
4282 
4283   return 0;
4284 }
4285 
4286 bool
check_switchover(Uint32 bucket,Uint64 gci)4287 Suma::check_switchover(Uint32 bucket, Uint64 gci)
4288 {
4289   const Uint32 send_mask =
4290     Bucket::BUCKET_STARTING |
4291     Bucket::BUCKET_TAKEOVER |
4292     Bucket::BUCKET_SHUTDOWN_TO;
4293 
4294   bool send = c_buckets[bucket].m_state & send_mask;
4295   ndbassert(m_switchover_buckets.get(bucket));
4296   if(unlikely(gci > c_buckets[bucket].m_switchover_gci))
4297   {
4298     return send;
4299   }
4300   return !send;
4301 }
4302 
4303 static
4304 Uint32
reformat(Signal * signal,LinearSectionPtr ptr[3],Uint32 * src_1,Uint32 sz_1,Uint32 * src_2,Uint32 sz_2)4305 reformat(Signal* signal, LinearSectionPtr ptr[3],
4306 	 Uint32 * src_1, Uint32 sz_1,
4307 	 Uint32 * src_2, Uint32 sz_2)
4308 {
4309   Uint32 noOfAttrs = 0, dataLen = 0;
4310   Uint32 * headers = signal->theData + 25;
4311   Uint32 * dst     = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
4312 
4313   ptr[0].p  = headers;
4314   ptr[1].p  = dst;
4315 
4316   while(sz_1 > 0){
4317     Uint32 tmp = * src_1 ++;
4318     * headers ++ = tmp;
4319     Uint32 len = AttributeHeader::getDataSize(tmp);
4320     memcpy(dst, src_1, 4 * len);
4321     dst += len;
4322     src_1 += len;
4323 
4324     noOfAttrs++;
4325     dataLen += len;
4326     sz_1 -= (1 + len);
4327   }
4328   assert(sz_1 == 0);
4329 
4330   ptr[0].sz = noOfAttrs;
4331   ptr[1].sz = dataLen;
4332 
4333   ptr[2].p = src_2;
4334   ptr[2].sz = sz_2;
4335 
4336   return sz_2 > 0 ? 3 : 2;
4337 }
4338 
4339 /**
4340  * Pass entire pages with SUMA-trigger-data from
4341  *   TUP to SUMA to avoid extensive LongSignalMessage buffer contention
4342  */
4343 void
execFIRE_TRIG_ORD_L(Signal * signal)4344 Suma::execFIRE_TRIG_ORD_L(Signal* signal)
4345 {
4346   jamEntry();
4347 
4348   ndbassert(signal->getNoOfSections() == 0);
4349   Uint32 pageId = signal->theData[0];
4350   Uint32 len = signal->theData[1];
4351 
4352   if (pageId == RNIL && len == 0)
4353   {
4354     jam();
4355     /**
4356      * Out of memory
4357      */
4358     out_of_buffer(signal);
4359     return;
4360   }
4361 
4362   Uint32 * ptr = reinterpret_cast<Uint32*>(c_page_pool.getPtr(pageId));
4363   while (len)
4364   {
4365     Uint32 * save = ptr;
4366     Uint32 msglen  = * ptr++;
4367     Uint32 siglen  = * ptr++;
4368     Uint32 sec0len = * ptr++;
4369     Uint32 sec1len = * ptr++;
4370     Uint32 sec2len = * ptr++;
4371 
4372     /**
4373      * Copy value directly into local buffers
4374      */
4375     Uint32 trigId = ((FireTrigOrd*)ptr)->getTriggerId();
4376     memcpy(signal->theData, ptr, 4 * siglen); // signal
4377     ptr += siglen;
4378     memcpy(f_buffer, ptr, 4*sec0len);
4379     ptr += sec0len;
4380     memcpy(b_buffer, ptr, 4*sec1len);
4381     ptr += sec1len;
4382     memcpy(f_buffer + sec0len, ptr, 4*sec2len);
4383     ptr += sec2len;
4384 
4385     f_trigBufferSize = sec0len + sec2len;
4386     b_trigBufferSize = sec1len;
4387     f_bufferLock = trigId;
4388     b_bufferLock = trigId;
4389 
4390     execFIRE_TRIG_ORD(signal);
4391 
4392     ndbrequire(ptr == save + msglen);
4393     ndbrequire(len >= msglen);
4394     len -= msglen;
4395   }
4396 
4397   m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId);
4398 }
4399 
4400 void
execFIRE_TRIG_ORD(Signal * signal)4401 Suma::execFIRE_TRIG_ORD(Signal* signal)
4402 {
4403   jamEntry();
4404   DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
4405 
4406   CRASH_INSERTION(13016);
4407   FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
4408   const Uint32 trigId    = trg->getTriggerId();
4409   const Uint32 hashValue = trg->getHashValue();
4410   const Uint32 gci_hi    = trg->getGCI();
4411   const Uint32 gci_lo    = trg->m_gci_lo;
4412   const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4413   const Uint32 event     = trg->getTriggerEvent();
4414   const Uint32 any_value = trg->getAnyValue();
4415   const Uint32 transId1  = trg->m_transId1;
4416   const Uint32 transId2  = trg->m_transId2;
4417 
4418   Ptr<Subscription> subPtr;
4419   c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF);
4420 
4421   ndbassert(gci > m_last_complete_gci);
4422 
4423   if (signal->getNoOfSections())
4424   {
4425     jam();
4426     ndbassert(isNdbMtLqh());
4427     SectionHandle handle(this, signal);
4428 
4429     ndbrequire(b_bufferLock == 0);
4430     ndbrequire(f_bufferLock == 0);
4431     f_bufferLock = trigId;
4432     b_bufferLock = trigId;
4433 
4434     SegmentedSectionPtr ptr;
4435     handle.getSection(ptr, 0); // Keys
4436     Uint32 sz = ptr.sz;
4437     copy(f_buffer, ptr);
4438 
4439     handle.getSection(ptr, 2); // After values
4440     copy(f_buffer + sz, ptr);
4441     f_trigBufferSize = sz + ptr.sz;
4442 
4443     handle.getSection(ptr, 1); // Before values
4444     copy(b_buffer, ptr);
4445     b_trigBufferSize = ptr.sz;
4446     releaseSections(handle);
4447   }
4448 
4449   jam();
4450   ndbrequire(f_bufferLock == trigId);
4451   /**
4452    * Reset f_bufferLock
4453    */
4454   f_bufferLock = 0;
4455   b_bufferLock = 0;
4456 
4457   Uint32 tableId = subPtr.p->m_tableId;
4458   Uint32 schemaVersion =
4459     c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
4460 
4461   Uint32 bucket= hashValue % c_no_of_buckets;
4462   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4463   if(m_active_buckets.get(bucket) ||
4464      (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
4465   {
4466     m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
4467     Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
4468     ndbrequire(sz == f_trigBufferSize);
4469 
4470     LinearSectionPtr ptr[3];
4471     const Uint32 nptr= reformat(signal, ptr,
4472 				f_buffer, f_trigBufferSize,
4473                                 b_buffer, b_trigBufferSize);
4474     Uint32 ptrLen= 0;
4475     for(Uint32 i =0; i < nptr; i++)
4476       ptrLen+= ptr[i].sz;
4477     /**
4478      * Signal to subscriber(s)
4479      */
4480     SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
4481     data->gci_hi         = gci_hi;
4482     data->gci_lo         = gci_lo;
4483     data->tableId        = tableId;
4484     data->requestInfo    = 0;
4485     SubTableData::setOperation(data->requestInfo, event);
4486     data->flags          = 0;
4487     data->anyValue       = any_value;
4488     data->totalLen       = ptrLen;
4489     data->transId1       = transId1;
4490     data->transId2       = transId2;
4491 
4492     {
4493       LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4494       SubscriberPtr subbPtr;
4495       for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
4496       {
4497 	data->senderData = subbPtr.p->m_senderData;
4498 	sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4499 		   SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
4500       }
4501     }
4502   }
4503   else
4504   {
4505     const uint buffer_header_sz = 6;
4506     Uint32* dst;
4507     Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
4508     if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
4509     {
4510       * dst++ = subPtr.i;
4511       * dst++ = schemaVersion;
4512       * dst++ = (event << 16) | f_trigBufferSize;
4513       * dst++ = any_value;
4514       * dst++ = transId1;
4515       * dst++ = transId2;
4516       memcpy(dst, f_buffer, f_trigBufferSize << 2);
4517       dst += f_trigBufferSize;
4518       memcpy(dst, b_buffer, b_trigBufferSize << 2);
4519     }
4520   }
4521 
4522   DBUG_VOID_RETURN;
4523 }
4524 
4525 void
checkMaxBufferedEpochs(Signal * signal)4526 Suma::checkMaxBufferedEpochs(Signal *signal)
4527 {
4528   /*
4529    * Check if any subscribers are exceeding the MaxBufferedEpochs
4530    */
4531   Ptr<Gcp_record> gcp;
4532   jamEntry();
4533   if (c_gcp_list.isEmpty())
4534   {
4535     jam();
4536     return;
4537   }
4538   c_gcp_list.first(gcp);
4539   if (ERROR_INSERTED(13037))
4540   {
4541     jam();
4542     CLEAR_ERROR_INSERT_VALUE;
4543     ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
4544             c_maxBufferedEpochs, m_max_seen_gci,
4545             m_last_complete_gci, gcp.p->m_gci);
4546   }
4547   else if (c_gcp_list.count() < c_maxBufferedEpochs)
4548   {
4549     return;
4550   }
4551   NodeBitmask subs = gcp.p->m_subscribers;
4552   jam();
4553   // Disconnect lagging subscribers waiting for oldest epoch
4554   ndbout_c("Found lagging epoch %llu", gcp.p->m_gci);
4555   for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
4556   {
4557     if (subs.get(nodeId))
4558     {
4559       jam();
4560       subs.clear(nodeId);
4561       // Disconnecting node
4562       signal->theData[0] = NDB_LE_SubscriptionStatus;
4563       signal->theData[1] = 1; // DISCONNECTED;
4564       signal->theData[2] = nodeId;
4565       signal->theData[3] = (Uint32) gcp.p->m_gci;
4566       signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
4567       signal->theData[5] = (Uint32) c_gcp_list.count();
4568       signal->theData[6] = c_maxBufferedEpochs;
4569       sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
4570 
4571       /**
4572        * Force API_FAILREQ
4573        */
4574       signal->theData[0] = nodeId;
4575       sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
4576     }
4577   }
4578 }
4579 
4580 void
execSUB_GCP_COMPLETE_REP(Signal * signal)4581 Suma::execSUB_GCP_COMPLETE_REP(Signal* signal)
4582 {
4583   jamEntry();
4584   ndbassert(signal->getNoOfSections() == 0);
4585 
4586   SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
4587   Uint32 gci_hi = rep->gci_hi;
4588   Uint32 gci_lo = rep->gci_lo;
4589   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4590 
4591   if (isNdbMtLqh() && m_gcp_rep_cnt > 1)
4592   {
4593 
4594 #define SSPP 0
4595 
4596     if (SSPP)
4597       printf("execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo);
4598     jam();
4599     Uint32 min = m_min_gcp_rep_counter_index;
4600     Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter);
4601     for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz)
4602     {
4603       jam();
4604       if (m_gcp_rep_counter[i].m_gci == gci)
4605       {
4606         jam();
4607         m_gcp_rep_counter[i].m_cnt ++;
4608         if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt)
4609         {
4610           jam();
4611           /**
4612            * Release this entry...
4613            */
4614           if (i != min)
4615           {
4616             jam();
4617             m_gcp_rep_counter[i] = m_gcp_rep_counter[min];
4618           }
4619           m_min_gcp_rep_counter_index = (min + 1) % sz;
4620           if (SSPP)
4621             ndbout_c(" found - complete after: (min: %u max: %u)",
4622                      m_min_gcp_rep_counter_index,
4623                      m_max_gcp_rep_counter_index);
4624           goto found;
4625         }
4626         else
4627         {
4628           jam();
4629           if (SSPP)
4630             ndbout_c(" found - wait unchanged: (min: %u max: %u)",
4631                      m_min_gcp_rep_counter_index,
4632                      m_max_gcp_rep_counter_index);
4633           return; // Wait for more...
4634         }
4635       }
4636     }
4637     /**
4638      * Not found...
4639      */
4640     Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz;
4641     ndbrequire(next != min); // ring buffer full
4642     m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci;
4643     m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1;
4644     m_max_gcp_rep_counter_index = next;
4645     if (SSPP)
4646       ndbout_c(" new - after: (min: %u max: %u)",
4647                m_min_gcp_rep_counter_index,
4648                m_max_gcp_rep_counter_index);
4649     return;
4650   }
4651 found:
4652   bool drop = false;
4653   Uint32 flags = (m_missing_data)
4654                  ? rep->flags | SubGcpCompleteRep::MISSING_DATA
4655                  : rep->flags;
4656 
4657   if (ERROR_INSERTED(13036))
4658   {
4659     jam();
4660     CLEAR_ERROR_INSERT_VALUE;
4661     ndbout_c("Simulating out of event buffer at node failure");
4662     flags |= SubGcpCompleteRep::MISSING_DATA;
4663   }
4664 
4665 #ifdef VM_TRACE
4666   if (m_gcp_monitor == 0)
4667   {
4668   }
4669   else if (gci_hi == Uint32(m_gcp_monitor >> 32))
4670   {
4671     ndbrequire(gci_lo == Uint32(m_gcp_monitor) + 1);
4672   }
4673   else
4674   {
4675     ndbrequire(gci_hi == Uint32(m_gcp_monitor >> 32) + 1);
4676     ndbrequire(gci_lo == 0);
4677   }
4678   m_gcp_monitor = gci;
4679 #endif
4680 
4681   m_last_complete_gci = gci;
4682   checkMaxBufferedEpochs(signal);
4683   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4684 
4685   /**
4686    *
4687    */
4688   if(!m_switchover_buckets.isclear())
4689   {
4690     bool unlock = false;
4691     Uint32 i = m_switchover_buckets.find(0);
4692     for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
4693     {
4694       if(gci > c_buckets[i].m_switchover_gci)
4695       {
4696 	Uint32 state = c_buckets[i].m_state;
4697 	m_switchover_buckets.clear(i);
4698 	printf("%u/%u (%u/%u) switchover complete bucket %d state: %x",
4699 	       Uint32(gci >> 32),
4700 	       Uint32(gci),
4701 	       Uint32(c_buckets[i].m_switchover_gci >> 32),
4702 	       Uint32(c_buckets[i].m_switchover_gci),
4703 	       i, state);
4704 
4705 	if(state & Bucket::BUCKET_STARTING)
4706 	{
4707 	  /**
4708 	   * NR case
4709 	   */
4710           jam();
4711 	  m_active_buckets.set(i);
4712 	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
4713 	  ndbout_c("starting");
4714 	  m_gcp_complete_rep_count++;
4715           unlock = true;
4716 	}
4717 	else if(state & Bucket::BUCKET_TAKEOVER)
4718 	{
4719 	  /**
4720 	   * NF case
4721 	   */
4722           jam();
4723 	  Bucket* bucket= c_buckets + i;
4724 	  Page_pos pos= bucket->m_buffer_head;
4725 	  ndbrequire(pos.m_max_gci < gci);
4726 
4727 	  Buffer_page* page= c_page_pool.getPtr(pos.m_page_id);
4728 	  ndbout_c("takeover %d", pos.m_page_id);
4729 	  page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
4730           page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
4731           ndbassert(pos.m_max_gci != 0);
4732 	  page->m_words_used = pos.m_page_pos;
4733 	  page->m_next_page = RNIL;
4734 	  memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
4735 	  bucket->m_buffer_head.m_page_id = RNIL;
4736 	  bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
4737 
4738 	  m_active_buckets.set(i);
4739           m_gcp_complete_rep_count++;
4740 	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
4741 	}
4742 	else if (state & Bucket::BUCKET_HANDOVER)
4743 	{
4744 	  /**
4745 	   * NR, living node
4746 	   */
4747           jam();
4748 	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
4749           m_gcp_complete_rep_count--;
4750 	  ndbout_c("handover");
4751 	}
4752         else if (state & Bucket::BUCKET_CREATED_MASK)
4753         {
4754           jam();
4755           Uint32 cnt = state >> 8;
4756           Uint32 mask = Uint32(Bucket::BUCKET_CREATED_MASK) | (cnt << 8);
4757 	  c_buckets[i].m_state &= ~mask;
4758           flags |= SubGcpCompleteRep::ADD_CNT;
4759           flags |= (cnt << 16);
4760           ndbout_c("add %u %s", cnt,
4761                    state & Bucket::BUCKET_CREATED_SELF ? "self" : "other");
4762           if (state & Bucket::BUCKET_CREATED_SELF &&
4763               get_responsible_node(i) == getOwnNodeId())
4764           {
4765             jam();
4766             m_active_buckets.set(i);
4767             m_gcp_complete_rep_count++;
4768           }
4769         }
4770         else if (state & Bucket::BUCKET_DROPPED_MASK)
4771         {
4772           jam();
4773           Uint32 cnt = state >> 8;
4774           Uint32 mask = Uint32(Bucket::BUCKET_DROPPED_MASK) | (cnt << 8);
4775 	  c_buckets[i].m_state &= ~mask;
4776           flags |= SubGcpCompleteRep::SUB_CNT;
4777           flags |= (cnt << 16);
4778           ndbout_c("sub %u %s", cnt,
4779                    state & Bucket::BUCKET_DROPPED_SELF ? "self" : "other");
4780           if (state & Bucket::BUCKET_DROPPED_SELF)
4781           {
4782             m_active_buckets.clear(i);
4783             drop = true;
4784           }
4785         }
4786         else if (state & Bucket::BUCKET_SHUTDOWN)
4787         {
4788           jam();
4789           Uint32 nodeId = c_buckets[i].m_switchover_node;
4790           ndbrequire(nodeId == getOwnNodeId());
4791           m_active_buckets.clear(i);
4792           m_gcp_complete_rep_count--;
4793           ndbout_c("shutdown handover");
4794           c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN;
4795         }
4796         else if (state & Bucket::BUCKET_SHUTDOWN_TO)
4797         {
4798           jam();
4799           Uint32 nodeId = c_buckets[i].m_switchover_node;
4800           NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
4801           nodegroup.clear(nodeId);
4802           ndbrequire(get_responsible_node(i) == nodeId &&
4803                      get_responsible_node(i, nodegroup) == getOwnNodeId());
4804           m_active_buckets.set(i);
4805           m_gcp_complete_rep_count++;
4806           ndbout_c("shutdown takover");
4807           c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO;
4808         }
4809       }
4810     }
4811 
4812     if (m_switchover_buckets.isclear())
4813     {
4814       jam();
4815       if(getNodeState().startLevel == NodeState::SL_STARTING &&
4816          c_startup.m_handover_nodes.isclear())
4817       {
4818         jam();
4819         sendSTTORRY(signal);
4820       }
4821       else if (getNodeState().startLevel >= NodeState::SL_STOPPING_1)
4822       {
4823         jam();
4824         ndbrequire(c_shutdown.m_wait_handover);
4825         StopMeConf * conf = CAST_PTR(StopMeConf, signal->getDataPtrSend());
4826         conf->senderData = c_shutdown.m_senderData;
4827         conf->senderRef = reference();
4828         sendSignal(c_shutdown.m_senderRef, GSN_STOP_ME_CONF, signal,
4829                    StopMeConf::SignalLength, JBB);
4830         c_shutdown.m_wait_handover = false;
4831         infoEvent("Suma: handover complete");
4832       }
4833     }
4834 
4835     if (unlock)
4836     {
4837       jam();
4838       send_dict_unlock_ord(signal, DictLockReq::SumaHandOver);
4839     }
4840   }
4841 
4842   if(ERROR_INSERTED(13010))
4843   {
4844     CLEAR_ERROR_INSERT_VALUE;
4845     ndbout_c("Don't send GCP_COMPLETE_REP(%llu)", gci);
4846     return;
4847   }
4848 
4849   /**
4850    * Signal to subscribers
4851    */
4852   rep->gci_hi = gci_hi;
4853   rep->gci_lo = gci_lo;
4854   rep->flags = flags;
4855   rep->senderRef  = reference();
4856   rep->gcp_complete_rep_count = m_gcp_complete_rep_count;
4857 
4858   if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear())
4859   {
4860     CRASH_INSERTION(13033);
4861 
4862     NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
4863     sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
4864 	       SubGcpCompleteRep::SignalLength, JBB);
4865 
4866     Ptr<Gcp_record> gcp;
4867     if(c_gcp_list.seize(gcp))
4868     {
4869       gcp.p->m_gci = gci;
4870       gcp.p->m_subscribers = c_subscriber_nodes;
4871     }
4872     else
4873     {
4874       char buf[100];
4875       c_subscriber_nodes.getText(buf);
4876       g_eventLogger->error("c_gcp_list.seize() failed: gci: %llu nodes: %s",
4877                            gci, buf);
4878     }
4879   }
4880 
4881   /**
4882    * Add GCP COMPLETE REP to buffer
4883    */
4884   bool subscribers = !c_subscriber_nodes.isclear();
4885   for(Uint32 i = 0; i<c_no_of_buckets; i++)
4886   {
4887     if(m_active_buckets.get(i))
4888       continue;
4889 
4890     if (subscribers || (c_buckets[i].m_state & Bucket::BUCKET_RESEND))
4891     {
4892       //Uint32* dst;
4893       get_buffer_ptr(signal, i, gci, 0);
4894     }
4895   }
4896 
4897   if(m_out_of_buffer_gci && gci > m_out_of_buffer_gci)
4898   {
4899     jam();
4900     infoEvent("Reenable event buffer");
4901     m_out_of_buffer_gci = 0;
4902     m_missing_data = false;
4903   }
4904 
4905   if (unlikely(drop))
4906   {
4907     jam();
4908     m_gcp_complete_rep_count = 0;
4909     c_nodeGroup = RNIL;
4910     c_nodes_in_nodegroup_mask.clear();
4911     fix_nodegroup();
4912   }
4913 }
4914 
4915 void
execCREATE_TAB_CONF(Signal * signal)4916 Suma::execCREATE_TAB_CONF(Signal *signal)
4917 {
4918   jamEntry();
4919   DBUG_ENTER("Suma::execCREATE_TAB_CONF");
4920 
4921   DBUG_VOID_RETURN;
4922 }
4923 
4924 void
execDROP_TAB_CONF(Signal * signal)4925 Suma::execDROP_TAB_CONF(Signal *signal)
4926 {
4927   jamEntry();
4928   ndbassert(signal->getNoOfSections() == 0);
4929 
4930   DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
4931   Uint32 senderRef= conf->senderRef;
4932   Uint32 tableId= conf->tableId;
4933 
4934   TablePtr tabPtr;
4935   if (!c_tables.find(tabPtr, tableId))
4936   {
4937     jam();
4938     return;
4939   }
4940 
4941   DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
4942   const Table::State old_state = tabPtr.p->m_state;
4943   tabPtr.p->m_state = Table::DROPPED;
4944   c_tables.remove(tabPtr);
4945 
4946   if (senderRef != 0)
4947   {
4948     jam();
4949 
4950     // dict coordinator sends info to API
4951 
4952     const Uint64 gci = get_current_gci(signal);
4953     SubTableData * data = (SubTableData*)signal->getDataPtrSend();
4954     data->gci_hi         = Uint32(gci >> 32);
4955     data->gci_lo         = Uint32(gci);
4956     data->tableId        = tableId;
4957     data->requestInfo    = 0;
4958     SubTableData::setOperation(data->requestInfo,
4959                                NdbDictionary::Event::_TE_DROP);
4960     SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
4961 
4962     Ptr<Subscription> subPtr;
4963     LocalDLList<Subscription> subList(c_subscriptionPool,
4964                                       tabPtr.p->m_subscriptions);
4965 
4966     for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
4967     {
4968       jam();
4969       if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
4970       {
4971         jam();
4972         continue;
4973         //continue in for-loop if the table is not part of
4974         //the subscription. Otherwise, send data to subscriber.
4975       }
4976 
4977       if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
4978       {
4979         jam();
4980         continue;
4981       }
4982 
4983       Ptr<Subscriber> ptr;
4984       LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4985       for(list.first(ptr); !ptr.isNull(); list.next(ptr))
4986       {
4987         jam();
4988         data->senderData= ptr.p->m_senderData;
4989         sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4990                    SubTableData::SignalLength, JBB);
4991       }
4992     }
4993   }
4994 
4995   if (old_state == Table::DEFINING)
4996   {
4997     jam();
4998     return;
4999   }
5000 
5001   if (tabPtr.p->m_subscriptions.isEmpty())
5002   {
5003     jam();
5004     tabPtr.p->release(* this);
5005     c_tablePool.release(tabPtr);
5006     return;
5007   }
5008   else
5009   {
5010     /**
5011      * check_release_subscription create a subList...
5012      *   weirdness below is to make sure that it's not created twice
5013      */
5014     Ptr<Subscription> subPtr;
5015     {
5016       LocalDLList<Subscription> subList(c_subscriptionPool,
5017                                         tabPtr.p->m_subscriptions);
5018       subList.first(subPtr);
5019     }
5020     while (!subPtr.isNull())
5021     {
5022       Ptr<Subscription> tmp = subPtr;
5023       {
5024         LocalDLList<Subscription> subList(c_subscriptionPool,
5025                                           tabPtr.p->m_subscriptions);
5026         subList.next(subPtr);
5027       }
5028       check_release_subscription(signal, tmp);
5029     }
5030   }
5031 }
5032 
5033 /**
5034  * This receives DICT_TAB_INFO in long signal section 1, and releases the data
5035  * after use.
5036  */
5037 void
execALTER_TAB_REQ(Signal * signal)5038 Suma::execALTER_TAB_REQ(Signal *signal)
5039 {
5040   jamEntry();
5041 
5042   AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
5043   Uint32 senderRef= req->senderRef;
5044   Uint32 tableId= req->tableId;
5045   Uint32 changeMask= req->changeMask;
5046   TablePtr tabPtr;
5047 
5048   // Copy DICT_TAB_INFO to local linear buffer
5049   SectionHandle handle(this, signal);
5050   SegmentedSectionPtr tabInfoPtr;
5051   handle.getSection(tabInfoPtr, 0);
5052 
5053   if (!c_tables.find(tabPtr, tableId))
5054   {
5055     jam();
5056     releaseSections(handle);
5057     return;
5058   }
5059 
5060   if (senderRef == 0)
5061   {
5062     jam();
5063     releaseSections(handle);
5064     return;
5065   }
5066   // dict coordinator sends info to API
5067 
5068 #ifndef DBUG_OFF
5069   ndbout_c("DICT_TAB_INFO in SUMA,  tabInfoPtr.sz = %d", tabInfoPtr.sz);
5070   SimplePropertiesSectionReader reader(handle.m_ptr[0],
5071 				       getSectionSegmentPool());
5072   reader.printAll(ndbout);
5073 #endif
5074   copy(b_dti_buf, tabInfoPtr);
5075   releaseSections(handle);
5076 
5077   LinearSectionPtr lptr[3];
5078   lptr[0].p = b_dti_buf;
5079   lptr[0].sz = tabInfoPtr.sz;
5080 
5081   const Uint64 gci = get_current_gci(signal);
5082   SubTableData * data = (SubTableData*)signal->getDataPtrSend();
5083   data->gci_hi         = Uint32(gci >> 32);
5084   data->gci_lo         = Uint32(gci);
5085   data->tableId        = tableId;
5086   data->requestInfo    = 0;
5087   SubTableData::setOperation(data->requestInfo,
5088 			     NdbDictionary::Event::_TE_ALTER);
5089   SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
5090   data->flags          = 0;
5091   data->changeMask     = changeMask;
5092   data->totalLen       = tabInfoPtr.sz;
5093   Ptr<Subscription> subPtr;
5094   LocalDLList<Subscription> subList(c_subscriptionPool,
5095                                     tabPtr.p->m_subscriptions);
5096 
5097   for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
5098   {
5099     if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
5100     {
5101       jam();
5102       continue;
5103       //continue in for-loop if the table is not part of
5104       //the subscription. Otherwise, send data to subscriber.
5105     }
5106 
5107     if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5108     {
5109       jam();
5110       continue;
5111     }
5112 
5113     Ptr<Subscriber> ptr;
5114     LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5115     for(list.first(ptr); !ptr.isNull(); list.next(ptr))
5116     {
5117       jam();
5118       data->senderData= ptr.p->m_senderData;
5119       Callback c = { 0, 0 };
5120       sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
5121                            SubTableData::SignalLength, JBB, lptr, 1, c);
5122     }
5123   }
5124 }
5125 
5126 void
execSUB_GCP_COMPLETE_ACK(Signal * signal)5127 Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal)
5128 {
5129   jamEntry();
5130   ndbassert(signal->getNoOfSections() == 0);
5131 
5132   SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
5133   Uint32 gci_hi = ack->rep.gci_hi;
5134   Uint32 gci_lo = ack->rep.gci_lo;
5135   Uint32 senderRef  = ack->rep.senderRef;
5136   if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength))
5137   {
5138     jam();
5139     ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(senderRef)).m_version));
5140     gci_lo = 0;
5141   }
5142 
5143   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
5144   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
5145 
5146   if (ERROR_INSERTED(13037))
5147   {
5148     jam();
5149     ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack");
5150     return;
5151   }
5152 
5153   if (refToBlock(senderRef) == SUMA)
5154   {
5155     jam();
5156 
5157     // Ack from other SUMA
5158     Uint32 nodeId= refToNode(senderRef);
5159     for(Uint32 i = 0; i<c_no_of_buckets; i++)
5160     {
5161       if(m_active_buckets.get(i) ||
5162 	 (m_switchover_buckets.get(i) && (check_switchover(i, gci))) ||
5163 	 (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId))
5164       {
5165 	release_gci(signal, i, gci);
5166       }
5167     }
5168     return;
5169   }
5170 
5171   // Ack from User and not an ack from other SUMA, redistribute in nodegroup
5172 
5173   Uint32 nodeId = refToNode(senderRef);
5174   if (ERROR_INSERTED(13023))
5175   {
5176     ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u",
5177              Uint32(gci>>32), Uint32(gci), nodeId);
5178     return;
5179   }
5180 
5181 
5182   jam();
5183   Ptr<Gcp_record> gcp;
5184   for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
5185   {
5186     if(gcp.p->m_gci == gci)
5187     {
5188       gcp.p->m_subscribers.clear(nodeId);
5189       gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
5190       if(!gcp.p->m_subscribers.isclear())
5191       {
5192 	jam();
5193 	return;
5194       }
5195       break;
5196     }
5197   }
5198 
5199   if(gcp.isNull())
5200   {
5201     g_eventLogger->warning("ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x",
5202                            Uint32(gci >> 32), Uint32(gci),
5203                            senderRef, signal->getSendersBlockRef());
5204   }
5205   else
5206   {
5207     c_gcp_list.release(gcp);
5208   }
5209 
5210   CRASH_INSERTION(13011);
5211   if(ERROR_INSERTED(13012))
5212   {
5213     CLEAR_ERROR_INSERT_VALUE;
5214     ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK");
5215     return;
5216   }
5217 
5218   ack->rep.senderRef = reference();
5219   NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
5220   sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
5221 	     SubGcpCompleteAck::SignalLength, JBB);
5222 }
5223 
5224 /**************************************************************
5225  *
5226  * Removing subscription
5227  *
5228  */
5229 
5230 void
execSUB_REMOVE_REQ(Signal * signal)5231 Suma::execSUB_REMOVE_REQ(Signal* signal)
5232 {
5233   jamEntry();
5234   DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
5235 
5236   CRASH_INSERTION(13021);
5237 
5238   const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
5239   SubscriptionPtr subPtr;
5240   Subscription key;
5241   key.m_subscriptionId  = req.subscriptionId;
5242   key.m_subscriptionKey = req.subscriptionKey;
5243 
5244   if (c_startup.m_restart_server_node_id == RNIL)
5245   {
5246     jam();
5247 
5248     /**
5249      * We havent started syncing yet
5250      */
5251     sendSubRemoveRef(signal,  req, SubRemoveRef::NotStarted);
5252     return;
5253   }
5254 
5255   bool found = c_subscriptions.find(subPtr, key);
5256 
5257   if(!found)
5258   {
5259     jam();
5260     sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
5261     return;
5262   }
5263 
5264   switch(subPtr.p->m_state){
5265   case Subscription::UNDEFINED:
5266     jam();
5267     ndbrequire(false);
5268   case Subscription::DEFINING:
5269     jam();
5270     sendSubRemoveRef(signal, req, SubRemoveRef::Defining);
5271     return;
5272   case Subscription::DEFINED:
5273     if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5274     {
5275       /**
5276        * already dropped
5277        */
5278       jam();
5279       sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
5280       return;
5281     }
5282     break;
5283   }
5284 
5285   subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5286   check_release_subscription(signal, subPtr);
5287 
5288   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
5289   conf->senderRef            = reference();
5290   conf->senderData           = req.senderData;
5291   conf->subscriptionId       = req.subscriptionId;
5292   conf->subscriptionKey      = req.subscriptionKey;
5293 
5294   sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
5295              SubRemoveConf::SignalLength, JBB);
5296   return;
5297 }
5298 
5299 void
check_release_subscription(Signal * signal,Ptr<Subscription> subPtr)5300 Suma::check_release_subscription(Signal* signal, Ptr<Subscription> subPtr)
5301 {
5302   if (!subPtr.p->m_subscribers.isEmpty())
5303   {
5304     jam();
5305     return;
5306   }
5307 
5308   if (!subPtr.p->m_start_req.isEmpty())
5309   {
5310     jam();
5311     return;
5312   }
5313 
5314   if (!subPtr.p->m_stop_req.isEmpty())
5315   {
5316     jam();
5317     return;
5318   }
5319 
5320   switch(subPtr.p->m_trigger_state){
5321   case Subscription::T_UNDEFINED:
5322     jam();
5323     goto do_release;
5324   case Subscription::T_CREATING:
5325     jam();
5326     /**
5327      * Wait for completion
5328      */
5329     return;
5330   case Subscription::T_DEFINED:
5331     jam();
5332     subPtr.p->m_trigger_state = Subscription::T_DROPPING;
5333     drop_triggers(signal, subPtr);
5334     return;
5335   case Subscription::T_DROPPING:
5336     jam();
5337     /**
5338      * Wait for completion
5339      */
5340     return;
5341   case Subscription::T_ERROR:
5342     jam();
5343     /**
5344      * Wait for completion
5345      */
5346     return;
5347   }
5348   ndbrequire(false);
5349 
5350 do_release:
5351   TablePtr tabPtr;
5352   c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5353 
5354   if (tabPtr.p->m_state == Table::DROPPED)
5355   {
5356     jam();
5357     subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5358   }
5359 
5360   if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0)
5361   {
5362     jam();
5363     return;
5364   }
5365 
5366   {
5367     LocalDLList<Subscription> list(c_subscriptionPool,
5368                                    tabPtr.p->m_subscriptions);
5369     list.remove(subPtr);
5370   }
5371 
5372   if (tabPtr.p->m_subscriptions.isEmpty())
5373   {
5374     jam();
5375     switch(tabPtr.p->m_state){
5376     case Table::UNDEFINED:
5377       ndbrequire(false);
5378     case Table::DEFINING:
5379       break;
5380     case Table::DEFINED:
5381       jam();
5382       c_tables.remove(tabPtr);
5383       // Fall through
5384     case Table::DROPPED:
5385       jam();
5386       tabPtr.p->release(* this);
5387       c_tablePool.release(tabPtr);
5388     };
5389   }
5390 
5391   c_subscriptions.release(subPtr);
5392 }
5393 
5394 void
sendSubRemoveRef(Signal * signal,const SubRemoveReq & req,Uint32 errCode)5395 Suma::sendSubRemoveRef(Signal* signal,
5396                        const SubRemoveReq& req,
5397                        Uint32 errCode)
5398 {
5399   jam();
5400   DBUG_ENTER("Suma::sendSubRemoveRef");
5401   SubRemoveRef  * ref = (SubRemoveRef *)signal->getDataPtrSend();
5402   ref->senderRef  = reference();
5403   ref->senderData = req.senderData;
5404   ref->subscriptionId = req.subscriptionId;
5405   ref->subscriptionKey = req.subscriptionKey;
5406   ref->errorCode = errCode;
5407   sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
5408 	     signal, SubRemoveRef::SignalLength, JBB);
5409   DBUG_VOID_RETURN;
5410 }
5411 
5412 void
release(Suma & suma)5413 Suma::Table::release(Suma & suma){
5414   jamBlock(&suma);
5415 
5416   m_state = UNDEFINED;
5417 }
5418 
5419 void
release()5420 Suma::SyncRecord::release(){
5421   jam();
5422 
5423   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
5424   fragBuf.release();
5425 
5426   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
5427   attrBuf.release();
5428 
5429   LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
5430   boundBuf.release();
5431 }
5432 
5433 
5434 /**************************************************************
5435  *
5436  * Restarting remote node functions, master functionality
5437  * (slave does nothing special)
5438  * - triggered on INCL_NODEREQ calling startNode
5439  * - included node will issue START_ME when it's ready to start
5440  * the subscribers
5441  *
5442  */
5443 
5444 void
execSUMA_START_ME_REQ(Signal * signal)5445 Suma::execSUMA_START_ME_REQ(Signal* signal) {
5446   jamEntry();
5447 
5448   Uint32 retref = signal->getSendersBlockRef();
5449   if (c_restart.m_ref)
5450   {
5451     jam();
5452     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5453     ref->errorCode = SumaStartMeRef::Busy;
5454     sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5455                SumaStartMeRef::SignalLength, JBB);
5456     return;
5457   }
5458 
5459   if (getNodeState().getStarted() == false)
5460   {
5461     jam();
5462     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5463     ref->errorCode = SumaStartMeRef::NotStarted;
5464     sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5465                SumaStartMeRef::SignalLength, JBB);
5466     return;
5467   }
5468 
5469   Ptr<SubOpRecord> subOpPtr;
5470   if (c_subOpPool.seize(subOpPtr) == false)
5471   {
5472     jam();
5473     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5474     ref->errorCode = SumaStartMeRef::Busy;
5475     sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5476                SumaStartMeRef::SignalLength, JBB);
5477     return;
5478   }
5479 
5480   subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
5481 
5482   c_restart.m_abort = 0;
5483   c_restart.m_waiting_on_self = 0;
5484   c_restart.m_ref = retref;
5485   c_restart.m_max_seq = c_current_seq;
5486   c_restart.m_subOpPtrI = subOpPtr.i;
5487 
5488   DLHashTable<Subscription>::Iterator it;
5489   if (c_subscriptions.first(it))
5490   {
5491     jam();
5492 
5493     /**
5494      * We only need to handle subscriptions with seq <= c_current_seq
5495      *   all subscriptions(s) created after this, will be handled by
5496      *   starting suma directly
5497      */
5498     c_current_seq++;
5499   }
5500 
5501   copySubscription(signal, it);
5502 }
5503 
5504 void
copySubscription(Signal * signal,DLHashTable<Subscription>::Iterator it)5505 Suma::copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator it)
5506 {
5507   jam();
5508 
5509   Ptr<SubOpRecord> subOpPtr;
5510   c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5511 
5512   Ptr<Subscription> subPtr = it.curr;
5513   if (!subPtr.isNull())
5514   {
5515     jam();
5516     c_restart.m_subPtrI = subPtr.i;
5517     c_restart.m_bucket = it.bucket;
5518 
5519     LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
5520     bool empty = list.isEmpty();
5521     list.add(subOpPtr);
5522 
5523     if (!empty)
5524     {
5525       /**
5526        * Wait for lock
5527        */
5528       jam();
5529       c_restart.m_waiting_on_self = 1;
5530       return;
5531     }
5532 
5533     sendSubCreateReq(signal, subPtr);
5534   }
5535   else
5536   {
5537     jam();
5538     SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend();
5539     conf->unused = 0;
5540     sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
5541                SumaStartMeConf::SignalLength, JBB);
5542 
5543     c_subOpPool.release(subOpPtr);
5544     c_restart.m_ref = 0;
5545     return;
5546   }
5547 }
5548 
5549 void
sendSubCreateReq(Signal * signal,Ptr<Subscription> subPtr)5550 Suma::sendSubCreateReq(Signal* signal, Ptr<Subscription> subPtr)
5551 {
5552   jam();
5553 
5554   if (c_restart.m_abort)
5555   {
5556     jam();
5557     abort_start_me(signal, subPtr, true);
5558     return;
5559   }
5560 
5561   c_restart.m_waiting_on_self = 0;
5562   SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
5563   req->senderRef        = reference();
5564   req->senderData       = subPtr.i;
5565   req->subscriptionId   = subPtr.p->m_subscriptionId;
5566   req->subscriptionKey  = subPtr.p->m_subscriptionKey;
5567   req->subscriptionType = subPtr.p->m_subscriptionType;
5568   req->tableId          = subPtr.p->m_tableId;
5569   req->schemaTransId    = 0;
5570 
5571   if (subPtr.p->m_options & Subscription::REPORT_ALL)
5572   {
5573     req->subscriptionType |= SubCreateReq::ReportAll;
5574   }
5575 
5576   if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
5577   {
5578     req->subscriptionType |= SubCreateReq::ReportSubscribe;
5579   }
5580 
5581   if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5582   {
5583     req->subscriptionType |= SubCreateReq::NoReportDDL;
5584   }
5585 
5586   if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5587   {
5588     req->subscriptionType |= SubCreateReq::NR_Sub_Dropped;
5589     ndbout_c("copying dropped sub: %u", subPtr.i);
5590   }
5591 
5592   Ptr<Table> tabPtr;
5593   c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5594   if (tabPtr.p->m_state != Table::DROPPED)
5595   {
5596     jam();
5597     c_restart.m_waiting_on_self = 0;
5598     if (!ndbd_suma_dictlock_startme(getNodeInfo(refToNode(c_restart.m_ref)).m_version))
5599     {
5600       jam();
5601       /**
5602        * Downgrade
5603        *
5604        * In pre suma v2, SUB_CREATE_REQ::SignalLength is one greater
5605        *   but code checks length and set a default value...
5606        *   so we dont need to do anything...
5607        *   Thank you Ms. Fortuna
5608        */
5609     }
5610 
5611     sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
5612                SubCreateReq::SignalLength, JBB);
5613   }
5614   else
5615   {
5616     jam();
5617     ndbout_c("not copying sub %u with dropped table: %u/%u",
5618              subPtr.i,
5619              tabPtr.p->m_tableId, tabPtr.i);
5620 
5621     c_restart.m_waiting_on_self = 1;
5622     SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
5623     conf->senderRef        = reference();
5624     conf->senderData       = subPtr.i;
5625     sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
5626                SubCreateConf::SignalLength, JBB);
5627   }
5628 }
5629 
5630 void
execSUB_CREATE_REF(Signal * signal)5631 Suma::execSUB_CREATE_REF(Signal* signal)
5632 {
5633   jamEntry();
5634 
5635   SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
5636   Uint32 error= ref->errorCode;
5637 
5638   {
5639     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5640     ref->errorCode = error;
5641     sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5642                SumaStartMeRef::SignalLength, JBB);
5643   }
5644 
5645   Ptr<Subscription> subPtr;
5646   c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5647   abort_start_me(signal, subPtr, true);
5648 }
5649 
5650 void
execSUB_CREATE_CONF(Signal * signal)5651 Suma::execSUB_CREATE_CONF(Signal* signal)
5652 {
5653   jamEntry();
5654 
5655   /**
5656    * We have lock...start all subscriber(s)
5657    */
5658   Ptr<Subscription> subPtr;
5659   c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5660 
5661   c_restart.m_waiting_on_self = 0;
5662 
5663   /**
5664    * Check if we were aborted...
5665    *  this signal is sent to self in case of DROPPED subscription...
5666    */
5667   if (c_restart.m_abort)
5668   {
5669     jam();
5670     abort_start_me(signal, subPtr, true);
5671     return;
5672   }
5673 
5674   Ptr<Table> tabPtr;
5675   c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5676 
5677   Ptr<Subscriber> ptr;
5678   if (tabPtr.p->m_state != Table::DROPPED)
5679   {
5680     jam();
5681     LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5682     list.first(ptr);
5683   }
5684   else
5685   {
5686     jam();
5687     ptr.setNull();
5688     ndbout_c("not copying subscribers on sub: %u with dropped table %u/%u",
5689              subPtr.i, tabPtr.p->m_tableId, tabPtr.i);
5690   }
5691 
5692   copySubscriber(signal, subPtr, ptr);
5693 }
5694 
5695 void
copySubscriber(Signal * signal,Ptr<Subscription> subPtr,Ptr<Subscriber> ptr)5696 Suma::copySubscriber(Signal* signal,
5697                      Ptr<Subscription> subPtr,
5698                      Ptr<Subscriber> ptr)
5699 {
5700   if (!ptr.isNull())
5701   {
5702     jam();
5703 
5704     SubStartReq* req = (SubStartReq*)signal->getDataPtrSend();
5705     req->senderRef        = reference();
5706     req->senderData       = ptr.i;
5707     req->subscriptionId   = subPtr.p->m_subscriptionId;
5708     req->subscriptionKey  = subPtr.p->m_subscriptionKey;
5709     req->part             = SubscriptionData::TableData;
5710     req->subscriberData   = ptr.p->m_senderData;
5711     req->subscriberRef    = ptr.p->m_senderRef;
5712 
5713     sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
5714                signal, SubStartReq::SignalLength, JBB);
5715     return;
5716   }
5717   else
5718   {
5719     // remove lock from this subscription
5720     Ptr<SubOpRecord> subOpPtr;
5721     c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5722     check_remove_queue(signal, subPtr, subOpPtr, true, false);
5723     check_release_subscription(signal, subPtr);
5724 
5725     DLHashTable<Subscription>::Iterator it;
5726     it.curr = subPtr;
5727     it.bucket = c_restart.m_bucket;
5728     c_subscriptions.next(it);
5729     copySubscription(signal, it);
5730   }
5731 }
5732 
5733 void
execSUB_START_CONF(Signal * signal)5734 Suma::execSUB_START_CONF(Signal* signal)
5735 {
5736   jamEntry();
5737 
5738   SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
5739 
5740   Ptr<Subscription> subPtr;
5741   c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5742 
5743   Ptr<Subscriber> ptr;
5744   c_subscriberPool.getPtr(ptr, conf->senderData);
5745 
5746   LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5747   list.next(ptr);
5748   copySubscriber(signal, subPtr, ptr);
5749 }
5750 
5751 void
execSUB_START_REF(Signal * signal)5752 Suma::execSUB_START_REF(Signal* signal)
5753 {
5754   jamEntry();
5755 
5756   SubStartRef * sig = (SubStartRef*)signal->getDataPtr();
5757   Uint32 errorCode = sig->errorCode;
5758 
5759   {
5760     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5761     ref->errorCode = errorCode;
5762     sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5763                SumaStartMeRef::SignalLength, JBB);
5764   }
5765 
5766   Ptr<Subscription> subPtr;
5767   c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5768 
5769   abort_start_me(signal, subPtr, true);
5770 }
5771 
5772 void
abort_start_me(Signal * signal,Ptr<Subscription> subPtr,bool lockowner)5773 Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr,
5774                      bool lockowner)
5775 {
5776   Ptr<SubOpRecord> subOpPtr;
5777   c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5778   check_remove_queue(signal, subPtr, subOpPtr, lockowner, true);
5779   check_release_subscription(signal, subPtr);
5780 
5781   c_restart.m_ref = 0;
5782 }
5783 
5784 void
execSUMA_HANDOVER_REQ(Signal * signal)5785 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
5786 {
5787   jamEntry();
5788   DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
5789   //  Uint32 sumaRef = signal->getSendersBlockRef();
5790   const SumaHandoverReq * req = CAST_CONSTPTR(SumaHandoverReq,
5791                                               signal->getDataPtr());
5792 
5793   Uint32 gci = req->gci;
5794   Uint32 nodeId = req->nodeId;
5795   Uint32 new_gci = Uint32(m_last_complete_gci >> 32) + MAX_CONCURRENT_GCP + 1;
5796   Uint32 requestType = req->requestType;
5797   if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5798   {
5799     jam();
5800     requestType = SumaHandoverReq::RT_START_NODE;
5801   }
5802 
5803   Uint32 start_gci = (gci > new_gci ? gci : new_gci);
5804   // mark all active buckets really belonging to restarting SUMA
5805 
5806   Bucket_mask tmp;
5807   if (requestType == SumaHandoverReq::RT_START_NODE)
5808   {
5809     jam();
5810     c_alive_nodes.set(nodeId);
5811     if (DBG_3R)
5812       ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, nodeId);
5813 
5814     for( Uint32 i = 0; i < c_no_of_buckets; i++)
5815     {
5816       if(get_responsible_node(i) == nodeId)
5817       {
5818         if (m_active_buckets.get(i))
5819         {
5820           // I'm running this bucket but it should really be the restarted node
5821           tmp.set(i);
5822           m_active_buckets.clear(i);
5823           m_switchover_buckets.set(i);
5824           c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5825           c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
5826           c_buckets[i].m_switchover_node = nodeId;
5827           ndbout_c("prepare to handover bucket: %d", i);
5828         }
5829         else if(m_switchover_buckets.get(i))
5830         {
5831           ndbout_c("dont handover bucket: %d %d", i, nodeId);
5832         }
5833       }
5834     }
5835   }
5836   else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5837   {
5838     jam();
5839 
5840     for( Uint32 i = 0; i < c_no_of_buckets; i++)
5841     {
5842       NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
5843       nodegroup.clear(nodeId);
5844       if(get_responsible_node(i) == nodeId &&
5845          get_responsible_node(i, nodegroup) == getOwnNodeId())
5846       {
5847         // I'm will be running this bucket when nodeId shutdown
5848         jam();
5849         tmp.set(i);
5850         m_switchover_buckets.set(i);
5851         c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5852         c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN_TO;
5853         c_buckets[i].m_switchover_node = nodeId;
5854         ndbout_c("prepare to takeover bucket: %d", i);
5855       }
5856     }
5857   }
5858   else
5859   {
5860     jam();
5861     goto ref;
5862   }
5863 
5864   {
5865     SumaHandoverConf *conf= CAST_PTR(SumaHandoverConf,signal->getDataPtrSend());
5866     tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
5867     conf->gci = start_gci;
5868     conf->nodeId = getOwnNodeId();
5869     conf->requestType = requestType;
5870     sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
5871                SumaHandoverConf::SignalLength, JBB);
5872   }
5873 
5874   DBUG_VOID_RETURN;
5875 
5876 ref:
5877   signal->theData[0] = 111;
5878   signal->theData[1] = getOwnNodeId();
5879   signal->theData[2] = nodeId;
5880   sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_REF, signal, 3, JBB);
5881   DBUG_VOID_RETURN;
5882 }
5883 
5884 // only run on all but restarting suma
5885 void
execSUMA_HANDOVER_REF(Signal * signal)5886 Suma::execSUMA_HANDOVER_REF(Signal* signal)
5887 {
5888   ndbrequire(false);
5889 }
5890 
5891 void
execSUMA_HANDOVER_CONF(Signal * signal)5892 Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
5893   jamEntry();
5894   DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
5895 
5896   const SumaHandoverConf * conf = CAST_CONSTPTR(SumaHandoverConf,
5897                                                 signal->getDataPtr());
5898 
5899   CRASH_INSERTION(13043);
5900 
5901   Uint32 gci = conf->gci;
5902   Uint32 nodeId = conf->nodeId;
5903   Uint32 requestType = conf->requestType;
5904   Bucket_mask tmp;
5905   tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
5906 #ifdef HANDOVER_DEBUG
5907   ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
5908 #endif
5909 
5910   if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5911   {
5912     jam();
5913     requestType = SumaHandoverReq::RT_START_NODE;
5914   }
5915 
5916   if (requestType == SumaHandoverReq::RT_START_NODE)
5917   {
5918     jam();
5919     for (Uint32 i = 0; i < c_no_of_buckets; i++)
5920     {
5921       if (tmp.get(i))
5922       {
5923         if (DBG_3R)
5924           ndbout_c("%u : %u %u", i, get_responsible_node(i), getOwnNodeId());
5925         ndbrequire(get_responsible_node(i) == getOwnNodeId());
5926         // We should run this bucket, but _nodeId_ is
5927         c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5928         c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
5929       }
5930     }
5931 
5932     char buf[255];
5933     tmp.getText(buf);
5934     infoEvent("Suma: handover from node %u gci: %u buckets: %s (%u)",
5935               nodeId, gci, buf, c_no_of_buckets);
5936     g_eventLogger->info("Suma: handover from node %u gci: %u buckets: %s (%u)",
5937                         nodeId, gci, buf, c_no_of_buckets);
5938     m_switchover_buckets.bitOR(tmp);
5939     c_startup.m_handover_nodes.clear(nodeId);
5940     DBUG_VOID_RETURN;
5941   }
5942   else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5943   {
5944     jam();
5945     for (Uint32 i = 0; i < c_no_of_buckets; i++)
5946     {
5947       if (tmp.get(i))
5948       {
5949         ndbrequire(get_responsible_node(i) == getOwnNodeId());
5950         // We should run this bucket, but _nodeId_ is
5951         c_buckets[i].m_switchover_node = getOwnNodeId();
5952         c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5953         c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN;
5954       }
5955     }
5956 
5957     char buf[255];
5958     tmp.getText(buf);
5959     infoEvent("Suma: handover to node %u gci: %u buckets: %s (%u)",
5960               nodeId, gci, buf, c_no_of_buckets);
5961     g_eventLogger->info("Suma: handover to node %u gci: %u buckets: %s (%u)",
5962                         nodeId, gci, buf, c_no_of_buckets);
5963     m_switchover_buckets.bitOR(tmp);
5964     c_startup.m_handover_nodes.clear(nodeId);
5965     DBUG_VOID_RETURN;
5966   }
5967 }
5968 
5969 void
execSTOP_ME_REQ(Signal * signal)5970 Suma::execSTOP_ME_REQ(Signal* signal)
5971 {
5972   jam();
5973   StopMeReq req = * CAST_CONSTPTR(StopMeReq, signal->getDataPtr());
5974 
5975   ndbrequire(refToNode(req.senderRef) == getOwnNodeId());
5976   ndbrequire(c_shutdown.m_wait_handover == false);
5977   c_shutdown.m_wait_handover = true;
5978   c_shutdown.m_senderRef = req.senderRef;
5979   c_shutdown.m_senderData = req.senderData;
5980 
5981   for (Uint32 i = c_nodes_in_nodegroup_mask.find(0);
5982        i != c_nodes_in_nodegroup_mask.NotFound ;
5983        i = c_nodes_in_nodegroup_mask.find(i + 1))
5984   {
5985     /**
5986      * Check that all SUMA nodes support graceful shutdown...
5987      *   and it's too late to stop it...
5988      * Shutdown instead...
5989      */
5990     if (!ndbd_suma_stop_me(getNodeInfo(i).m_version))
5991     {
5992       jam();
5993       char buf[255];
5994       BaseString::snprintf(buf, sizeof(buf),
5995 			   "Not all versions support graceful shutdown (suma)."
5996 			   " Shutdown directly instead");
5997       progError(__LINE__,
5998 		NDBD_EXIT_GRACEFUL_SHUTDOWN_ERROR,
5999 		buf);
6000       ndbrequire(false);
6001     }
6002   }
6003   send_handover_req(signal, SumaHandoverReq::RT_STOP_NODE);
6004 }
6005 
6006 #ifdef NOT_USED
6007 static
6008 NdbOut&
operator <<(NdbOut & out,const Suma::Page_pos & pos)6009 operator<<(NdbOut & out, const Suma::Page_pos & pos)
6010 {
6011   out << "[ Page_pos:"
6012       << " m_page_id: " << pos.m_page_id
6013       << " m_page_pos: " << pos.m_page_pos
6014       << " m_max_gci: " << pos.m_max_gci
6015       << " ]";
6016   return out;
6017 }
6018 #endif
6019 
6020 Uint32*
get_buffer_ptr(Signal * signal,Uint32 buck,Uint64 gci,Uint32 sz)6021 Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint64 gci, Uint32 sz)
6022 {
6023   sz += 1; // len
6024   Bucket* bucket= c_buckets+buck;
6025   Page_pos pos= bucket->m_buffer_head;
6026 
6027   Buffer_page* page = 0;
6028   Uint32 *ptr = 0;
6029 
6030   if (likely(pos.m_page_id != RNIL))
6031   {
6032     page= c_page_pool.getPtr(pos.m_page_id);
6033     ptr= page->m_data + pos.m_page_pos;
6034   }
6035 
6036   const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
6037 
6038   pos.m_page_pos += sz;
6039   pos.m_last_gci = gci;
6040   Uint64 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
6041 
6042   if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
6043   {
6044     pos.m_max_gci = max;
6045     bucket->m_buffer_head = pos;
6046     * ptr++ = (0x8000 << 16) | sz; // Same gci
6047     return ptr;
6048   }
6049   else if(pos.m_page_pos + Buffer_page::GCI_SZ32 <= Buffer_page::DATA_WORDS)
6050   {
6051 loop:
6052     pos.m_max_gci = max;
6053     pos.m_page_pos += Buffer_page::GCI_SZ32;
6054     bucket->m_buffer_head = pos;
6055     * ptr++ = (sz + Buffer_page::GCI_SZ32);
6056     * ptr++ = (Uint32)(gci >> 32);
6057     * ptr++ = (Uint32)(gci & 0xFFFFFFFF);
6058     return ptr;
6059   }
6060   else
6061   {
6062     /**
6063      * new page
6064      * 1) save header on last page
6065      * 2) seize new page
6066      */
6067     Uint32 next;
6068     if(unlikely((next= seize_page()) == RNIL))
6069     {
6070       /**
6071        * Out of buffer
6072        */
6073       out_of_buffer(signal);
6074       return 0;
6075     }
6076 
6077     if(likely(pos.m_page_id != RNIL))
6078     {
6079       page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
6080       page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
6081       page->m_words_used = pos.m_page_pos - sz;
6082       page->m_next_page= next;
6083       ndbassert(pos.m_max_gci != 0);
6084     }
6085     else
6086     {
6087       bucket->m_buffer_tail = next;
6088     }
6089 
6090     memset(&pos, 0, sizeof(pos));
6091     pos.m_page_id = next;
6092     pos.m_page_pos = sz;
6093     pos.m_last_gci = gci;
6094 
6095     page= c_page_pool.getPtr(pos.m_page_id);
6096     page->m_next_page= RNIL;
6097     ptr= page->m_data;
6098     goto loop; //
6099   }
6100 }
6101 
6102 void
out_of_buffer(Signal * signal)6103 Suma::out_of_buffer(Signal* signal)
6104 {
6105   if(m_out_of_buffer_gci)
6106   {
6107     return;
6108   }
6109 
6110   m_out_of_buffer_gci = m_last_complete_gci - 1;
6111   infoEvent("Out of event buffer: nodefailure will cause event failures");
6112   m_missing_data = false;
6113   out_of_buffer_release(signal, 0);
6114 }
6115 
6116 void
out_of_buffer_release(Signal * signal,Uint32 buck)6117 Suma::out_of_buffer_release(Signal* signal, Uint32 buck)
6118 {
6119   Bucket* bucket= c_buckets+buck;
6120   Uint32 tail= bucket->m_buffer_tail;
6121 
6122   if(tail != RNIL)
6123   {
6124     Buffer_page* page= c_page_pool.getPtr(tail);
6125     bucket->m_buffer_tail = page->m_next_page;
6126     free_page(tail, page);
6127     signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6128     signal->theData[1] = buck;
6129     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6130     return;
6131   }
6132 
6133   /**
6134    * Clear head
6135    */
6136   bucket->m_buffer_head.m_page_id = RNIL;
6137   bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
6138 
6139   buck++;
6140   if(buck != c_no_of_buckets)
6141   {
6142     signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6143     signal->theData[1] = buck;
6144     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6145     return;
6146   }
6147 
6148   /**
6149    * Finished will all release
6150    *   prepare for inclusion
6151    */
6152   m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
6153     ? m_max_seen_gci : m_last_complete_gci;
6154   m_missing_data = false;
6155 }
6156 
6157 Uint32
seize_page()6158 Suma::seize_page()
6159 {
6160   if (ERROR_INSERTED(13038))
6161   {
6162     jam();
6163     CLEAR_ERROR_INSERT_VALUE;
6164     ndbout_c("Simulating out of event buffer");
6165     m_out_of_buffer_gci = m_max_seen_gci;
6166   }
6167   if(unlikely(m_out_of_buffer_gci))
6168   {
6169     return RNIL;
6170   }
6171 loop:
6172   Ptr<Page_chunk> ptr;
6173   Uint32 ref= m_first_free_page;
6174   if(likely(ref != RNIL))
6175   {
6176     m_first_free_page = (c_page_pool.getPtr(ref))->m_next_page;
6177     Uint32 chunk = (c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
6178     c_page_chunk_pool.getPtr(ptr, chunk);
6179     ndbassert(ptr.p->m_free);
6180     ptr.p->m_free--;
6181     return ref;
6182   }
6183 
6184   if(!c_page_chunk_pool.seize(ptr))
6185     return RNIL;
6186 
6187   Uint32 count = 16;
6188   m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1);
6189   if (count == 0)
6190     return RNIL;
6191 
6192   ndbout_c("alloc_chunk(%d %d) - ", ref, count);
6193 
6194   m_first_free_page = ptr.p->m_page_id = ref;
6195   ptr.p->m_size = count;
6196   ptr.p->m_free = count;
6197 
6198   Buffer_page* page;
6199   LINT_INIT(page);
6200   for(Uint32 i = 0; i<count; i++)
6201   {
6202     page = c_page_pool.getPtr(ref);
6203     page->m_page_state= SUMA_SEQUENCE;
6204     page->m_page_chunk_ptr_i = ptr.i;
6205     page->m_next_page = ++ref;
6206   }
6207   page->m_next_page = RNIL;
6208 
6209   goto loop;
6210 }
6211 
6212 void
free_page(Uint32 page_id,Buffer_page * page)6213 Suma::free_page(Uint32 page_id, Buffer_page* page)
6214 {
6215   Ptr<Page_chunk> ptr;
6216   ndbrequire(page->m_page_state == SUMA_SEQUENCE);
6217 
6218   Uint32 chunk= page->m_page_chunk_ptr_i;
6219 
6220   c_page_chunk_pool.getPtr(ptr, chunk);
6221 
6222   ptr.p->m_free ++;
6223   page->m_next_page = m_first_free_page;
6224   ndbrequire(ptr.p->m_free <= ptr.p->m_size);
6225 
6226   m_first_free_page = page_id;
6227 }
6228 
6229 void
release_gci(Signal * signal,Uint32 buck,Uint64 gci)6230 Suma::release_gci(Signal* signal, Uint32 buck, Uint64 gci)
6231 {
6232   Bucket* bucket= c_buckets+buck;
6233   Uint32 tail= bucket->m_buffer_tail;
6234   Page_pos head= bucket->m_buffer_head;
6235   Uint64 max_acked = bucket->m_max_acked_gci;
6236 
6237   const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
6238   if(unlikely(bucket->m_state & mask))
6239   {
6240     jam();
6241     ndbout_c("release_gci(%d, %u/%u) 0x%x-> node failure -> abort",
6242              buck, Uint32(gci >> 32), Uint32(gci), bucket->m_state);
6243     return;
6244   }
6245 
6246   bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
6247   if(unlikely(tail == RNIL))
6248   {
6249     return;
6250   }
6251 
6252   if(tail == head.m_page_id)
6253   {
6254     if(gci >= head.m_max_gci)
6255     {
6256       jam();
6257       if (ERROR_INSERTED(13034))
6258       {
6259         jam();
6260         SET_ERROR_INSERT_VALUE(13035);
6261         return;
6262       }
6263       if (ERROR_INSERTED(13035))
6264       {
6265         CLEAR_ERROR_INSERT_VALUE;
6266         NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask);
6267         rg.m_nodes.clear(getOwnNodeId());
6268         signal->theData[0] = 9999;
6269         sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
6270         return;
6271       }
6272       head.m_page_pos = 0;
6273       head.m_max_gci = gci;
6274       head.m_last_gci = 0;
6275       bucket->m_buffer_head = head;
6276     }
6277     return;
6278   }
6279   else
6280   {
6281     jam();
6282     Buffer_page* page= c_page_pool.getPtr(tail);
6283     Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6284     Uint32 next_page = page->m_next_page;
6285 
6286     ndbassert(max_gci != 0);
6287 
6288     if(gci >= max_gci)
6289     {
6290       jam();
6291       free_page(tail, page);
6292 
6293       bucket->m_buffer_tail = next_page;
6294       signal->theData[0] = SumaContinueB::RELEASE_GCI;
6295       signal->theData[1] = buck;
6296       signal->theData[2] = (Uint32)(gci >> 32);
6297       signal->theData[3] = (Uint32)(gci & 0xFFFFFFFF);
6298       sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 4, JBB);
6299       return;
6300     }
6301     else
6302     {
6303       //ndbout_c("do nothing...");
6304     }
6305   }
6306 }
6307 
6308 static Uint32 g_cnt = 0;
6309 
6310 void
start_resend(Signal * signal,Uint32 buck)6311 Suma::start_resend(Signal* signal, Uint32 buck)
6312 {
6313   printf("start_resend(%d, ", buck);
6314 
6315   /**
6316    * Resend from m_max_acked_gci + 1 until max_gci + 1
6317    */
6318   Bucket* bucket= c_buckets + buck;
6319   Page_pos pos= bucket->m_buffer_head;
6320 
6321   if(m_out_of_buffer_gci)
6322   {
6323     Ptr<Gcp_record> gcp;
6324     c_gcp_list.last(gcp);
6325     signal->theData[0] = NDB_LE_SubscriptionStatus;
6326     signal->theData[1] = 2; // INCONSISTENT;
6327     signal->theData[2] = 0; // Not used
6328     signal->theData[3] = (Uint32) pos.m_max_gci;
6329     signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
6330     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
6331     m_missing_data = true;
6332     return;
6333   }
6334 
6335   if(pos.m_page_id == RNIL)
6336   {
6337     jam();
6338     m_active_buckets.set(buck);
6339     m_gcp_complete_rep_count ++;
6340     ndbout_c("empty bucket(RNIL) -> active max_acked: %u/%u max_gci: %u/%u",
6341 	     Uint32(bucket->m_max_acked_gci >> 32),
6342 	     Uint32(bucket->m_max_acked_gci),
6343 	     Uint32(pos.m_max_gci >> 32),
6344 	     Uint32(pos.m_max_gci));
6345     return;
6346   }
6347 
6348   Uint64 min= bucket->m_max_acked_gci + 1;
6349   Uint64 max = m_max_seen_gci;
6350 
6351   ndbrequire(max <= m_max_seen_gci);
6352 
6353   if(min > max)
6354   {
6355     ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
6356     m_active_buckets.set(buck);
6357     m_gcp_complete_rep_count ++;
6358     ndbout_c("empty bucket (%u/%u %u/%u) -> active",
6359              Uint32(min >> 32), Uint32(min),
6360              Uint32(max >> 32), Uint32(max));
6361     return;
6362   }
6363 
6364   g_cnt = 0;
6365   bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
6366   bucket->m_switchover_node = get_responsible_node(buck);
6367   bucket->m_switchover_gci = max;
6368 
6369   m_switchover_buckets.set(buck);
6370 
6371   signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6372   signal->theData[1] = buck;
6373   signal->theData[2] = (Uint32)(min >> 32);
6374   signal->theData[3] = 0;
6375   signal->theData[4] = 0;
6376   signal->theData[5] = (Uint32)(min & 0xFFFFFFFF);
6377   signal->theData[6] = 0;
6378   sendSignal(reference(), GSN_CONTINUEB, signal, 7, JBB);
6379 
6380   ndbout_c("min: %u/%u - max: %u/%u) page: %d",
6381 	   Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max),
6382 	   bucket->m_buffer_tail);
6383   ndbrequire(max >= min);
6384 }
6385 
6386 void
resend_bucket(Signal * signal,Uint32 buck,Uint64 min_gci,Uint32 pos,Uint64 last_gci)6387 Suma::resend_bucket(Signal* signal, Uint32 buck, Uint64 min_gci,
6388 		    Uint32 pos, Uint64 last_gci)
6389 {
6390   Bucket* bucket= c_buckets+buck;
6391   Uint32 tail= bucket->m_buffer_tail;
6392 
6393   Buffer_page* page= c_page_pool.getPtr(tail);
6394   Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6395   Uint32 next_page = page->m_next_page;
6396   Uint32 *ptr = page->m_data + pos;
6397   Uint32 *end = page->m_data + page->m_words_used;
6398   bool delay = false;
6399 
6400   ndbrequire(tail != RNIL);
6401 
6402   if(tail == bucket->m_buffer_head.m_page_id)
6403   {
6404     max_gci= bucket->m_buffer_head.m_max_gci;
6405     end= page->m_data + bucket->m_buffer_head.m_page_pos;
6406     next_page= RNIL;
6407 
6408     if(ptr == end)
6409     {
6410       delay = true;
6411       goto next;
6412     }
6413   }
6414   else if(pos == 0 && min_gci > max_gci)
6415   {
6416     free_page(tail, page);
6417     tail = bucket->m_buffer_tail = next_page;
6418     goto next;
6419   }
6420 
6421 #if 0
6422   for(Uint32 i = 0; i<page->m_words_used; i++)
6423   {
6424     printf("%.8x ", page->m_data[i]);
6425     if(((i + 1) % 8) == 0)
6426       printf("\n");
6427   }
6428   printf("\n");
6429 #endif
6430 
6431   while(ptr < end)
6432   {
6433     Uint32 *src = ptr;
6434     Uint32 tmp = * src++;
6435     Uint32 sz = tmp & 0xFFFF;
6436 
6437     ptr += sz;
6438 
6439     if(! (tmp & (0x8000 << 16)))
6440     {
6441       ndbrequire(sz >= Buffer_page::GCI_SZ32);
6442       sz -= Buffer_page::GCI_SZ32;
6443       Uint32 last_gci_hi = * src++;
6444       Uint32 last_gci_lo = * src++;
6445       last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
6446     }
6447     else
6448     {
6449       ndbrequire(ptr - sz > page->m_data);
6450     }
6451 
6452     if(last_gci < min_gci)
6453     {
6454       continue;
6455     }
6456 
6457     ndbrequire(sz);
6458     sz --; // remove *len* part of sz
6459 
6460     if(sz == 0)
6461     {
6462       SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
6463       rep->gci_hi = (Uint32)(last_gci >> 32);
6464       rep->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6465       rep->flags = (m_missing_data)
6466                    ? SubGcpCompleteRep::MISSING_DATA
6467                    : 0;
6468       rep->senderRef  = reference();
6469       rep->gcp_complete_rep_count = 1;
6470 
6471       if (ERROR_INSERTED(13036))
6472       {
6473         jam();
6474         CLEAR_ERROR_INSERT_VALUE;
6475         ndbout_c("Simulating out of event buffer at node failure");
6476         rep->flags |= SubGcpCompleteRep::MISSING_DATA;
6477       }
6478 
6479       char buf[255];
6480       c_subscriber_nodes.getText(buf);
6481       if (g_cnt)
6482       {
6483         ndbout_c("resending GCI: %u/%u rows: %d -> %s",
6484                  Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
6485       }
6486       g_cnt = 0;
6487 
6488       NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
6489       sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
6490 		 SubGcpCompleteRep::SignalLength, JBB);
6491     }
6492     else
6493     {
6494       const uint buffer_header_sz = 6;
6495       g_cnt++;
6496       Uint32 subPtrI = * src++ ;
6497       Uint32 schemaVersion = * src++;
6498       Uint32 event = * src >> 16;
6499       Uint32 sz_1 = (* src ++) & 0xFFFF;
6500       Uint32 any_value = * src++;
6501       Uint32 transId1 = * src++;
6502       Uint32 transId2 = * src++;
6503 
6504       ndbassert(sz - buffer_header_sz >= sz_1);
6505 
6506       LinearSectionPtr ptr[3];
6507       const Uint32 nptr= reformat(signal, ptr,
6508 				  src, sz_1,
6509 				  src + sz_1, sz - buffer_header_sz - sz_1);
6510       Uint32 ptrLen= 0;
6511       for(Uint32 i =0; i < nptr; i++)
6512         ptrLen+= ptr[i].sz;
6513 
6514       /**
6515        * Signal to subscriber(s)
6516        */
6517       Ptr<Subscription> subPtr;
6518       c_subscriptionPool.getPtr(subPtr, subPtrI);
6519       Ptr<Table> tabPtr;
6520       c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
6521       Uint32 table = subPtr.p->m_tableId;
6522       if (table_version_major(tabPtr.p->m_schemaVersion) ==
6523           table_version_major(schemaVersion))
6524       {
6525 	SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
6526 	data->gci_hi         = (Uint32)(last_gci >> 32);
6527 	data->gci_lo         = (Uint32)(last_gci & 0xFFFFFFFF);
6528 	data->tableId        = table;
6529 	data->requestInfo    = 0;
6530 	SubTableData::setOperation(data->requestInfo, event);
6531 	data->flags          = 0;
6532 	data->anyValue       = any_value;
6533 	data->totalLen       = ptrLen;
6534         data->transId1       = transId1;
6535         data->transId2       = transId2;
6536 
6537 	{
6538           LocalDLList<Subscriber> list(c_subscriberPool,
6539                                        subPtr.p->m_subscribers);
6540           SubscriberPtr subbPtr;
6541           for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
6542           {
6543             data->senderData = subbPtr.p->m_senderData;
6544             sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
6545                        SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
6546           }
6547         }
6548       }
6549     }
6550 
6551     break;
6552   }
6553 
6554   if(ptr == end && (tail != bucket->m_buffer_head.m_page_id))
6555   {
6556     /**
6557      * release...
6558      */
6559     free_page(tail, page);
6560     tail = bucket->m_buffer_tail = next_page;
6561     pos = 0;
6562     last_gci = 0;
6563   }
6564   else
6565   {
6566     pos = Uint32(ptr - page->m_data);
6567   }
6568 
6569 next:
6570   if(tail == RNIL)
6571   {
6572     bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
6573     ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
6574     ndbout_c("resend done...");
6575     return;
6576   }
6577 
6578   signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6579   signal->theData[1] = buck;
6580   signal->theData[2] = (Uint32)(min_gci >> 32);
6581   signal->theData[3] = pos;
6582   signal->theData[4] = (Uint32)(last_gci >> 32);
6583   signal->theData[5] = (Uint32)(min_gci & 0xFFFFFFFF);
6584   signal->theData[6] = (Uint32)(last_gci & 0xFFFFFFFF);
6585   if(!delay)
6586     sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 7, JBB);
6587   else
6588     sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 7);
6589 }
6590 
6591 void
execGCP_PREPARE(Signal * signal)6592 Suma::execGCP_PREPARE(Signal *signal)
6593 {
6594   jamEntry();
6595   const GCPPrepare *prep = (const GCPPrepare *)signal->getDataPtr();
6596   m_current_gci = prep->gci_lo | (Uint64(prep->gci_hi) << 32);
6597 }
6598 
6599 Uint64
get_current_gci(Signal *)6600 Suma::get_current_gci(Signal*)
6601 {
6602   return m_current_gci;
6603 }
6604 
6605 void
execCREATE_NODEGROUP_IMPL_REQ(Signal * signal)6606 Suma::execCREATE_NODEGROUP_IMPL_REQ(Signal* signal)
6607 {
6608   CreateNodegroupImplReq reqCopy = *(CreateNodegroupImplReq*)
6609     signal->getDataPtr();
6610   CreateNodegroupImplReq *req = &reqCopy;
6611 
6612   Uint32 err = 0;
6613   Uint32 rt = req->requestType;
6614 
6615   NdbNodeBitmask tmp;
6616   for (Uint32 i = 0; i<NDB_ARRAY_SIZE(req->nodes) && req->nodes[i]; i++)
6617   {
6618     tmp.set(req->nodes[i]);
6619   }
6620   Uint32 cnt = tmp.count();
6621   Uint32 group = req->nodegroupId;
6622 
6623   switch(rt){
6624   case CreateNodegroupImplReq::RT_ABORT:
6625     jam();
6626     break;
6627   case CreateNodegroupImplReq::RT_PARSE:
6628     jam();
6629     break;
6630   case CreateNodegroupImplReq::RT_PREPARE:
6631     jam();
6632     break;
6633   case CreateNodegroupImplReq::RT_COMMIT:
6634     jam();
6635     break;
6636   case CreateNodegroupImplReq::RT_COMPLETE:
6637     jam();
6638     CRASH_INSERTION(13043);
6639 
6640     Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6641     ndbrequire(gci > m_last_complete_gci);
6642 
6643     Uint32 state = 0;
6644     if (c_nodeGroup != RNIL)
6645     {
6646       jam();
6647       NdbNodeBitmask check = tmp;
6648       check.bitAND(c_nodes_in_nodegroup_mask);
6649       ndbrequire(check.isclear());
6650       ndbrequire(c_nodeGroup != group);
6651       ndbrequire(cnt == c_nodes_in_nodegroup_mask.count());
6652       state = Bucket::BUCKET_CREATED_OTHER;
6653     }
6654     else if (tmp.get(getOwnNodeId()))
6655     {
6656       jam();
6657       c_nodeGroup = group;
6658       c_nodes_in_nodegroup_mask.assign(tmp);
6659       fix_nodegroup();
6660       state = Bucket::BUCKET_CREATED_SELF;
6661     }
6662     if (state != 0)
6663     {
6664       for (Uint32 i = 0; i<c_no_of_buckets; i++)
6665       {
6666         jam();
6667         m_switchover_buckets.set(i);
6668         c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6669         c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6670       }
6671     }
6672   }
6673 
6674   {
6675     CreateNodegroupImplConf* conf =
6676       (CreateNodegroupImplConf*)signal->getDataPtrSend();
6677     conf->senderRef = reference();
6678     conf->senderData = req->senderData;
6679     sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_CONF, signal,
6680                CreateNodegroupImplConf::SignalLength, JBB);
6681   }
6682   return;
6683 
6684 //error:
6685   CreateNodegroupImplRef *ref =
6686     (CreateNodegroupImplRef*)signal->getDataPtrSend();
6687   ref->senderRef = reference();
6688   ref->senderData = req->senderData;
6689   ref->errorCode = err;
6690   sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_REF, signal,
6691              CreateNodegroupImplRef::SignalLength, JBB);
6692   return;
6693 }
6694 
6695 void
execDROP_NODEGROUP_IMPL_REQ(Signal * signal)6696 Suma::execDROP_NODEGROUP_IMPL_REQ(Signal* signal)
6697 {
6698   DropNodegroupImplReq reqCopy = *(DropNodegroupImplReq*)
6699     signal->getDataPtr();
6700   DropNodegroupImplReq *req = &reqCopy;
6701 
6702   Uint32 err = 0;
6703   Uint32 rt = req->requestType;
6704   Uint32 group = req->nodegroupId;
6705 
6706   switch(rt){
6707   case DropNodegroupImplReq::RT_ABORT:
6708     jam();
6709     break;
6710   case DropNodegroupImplReq::RT_PARSE:
6711     jam();
6712     break;
6713   case DropNodegroupImplReq::RT_PREPARE:
6714     jam();
6715     break;
6716   case DropNodegroupImplReq::RT_COMMIT:
6717     jam();
6718     break;
6719   case DropNodegroupImplReq::RT_COMPLETE:
6720     jam();
6721     CRASH_INSERTION(13043);
6722 
6723     Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6724     ndbrequire(gci > m_last_complete_gci);
6725 
6726     Uint32 state;
6727     if (c_nodeGroup != group)
6728     {
6729       jam();
6730       state = Bucket::BUCKET_DROPPED_OTHER;
6731       break;
6732     }
6733     else
6734     {
6735       jam();
6736       state = Bucket::BUCKET_DROPPED_SELF;
6737     }
6738 
6739     for (Uint32 i = 0; i<c_no_of_buckets; i++)
6740     {
6741       jam();
6742       m_switchover_buckets.set(i);
6743       if (c_buckets[i].m_state != 0)
6744       {
6745         jamLine(c_buckets[i].m_state);
6746         ndbout_c("c_buckets[%u].m_state: %u", i, c_buckets[i].m_state);
6747       }
6748       ndbrequire(c_buckets[i].m_state == 0); // XXX todo
6749       c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6750       c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6751     }
6752     break;
6753   }
6754 
6755   {
6756     DropNodegroupImplConf* conf =
6757       (DropNodegroupImplConf*)signal->getDataPtrSend();
6758     conf->senderRef = reference();
6759     conf->senderData = req->senderData;
6760     sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_CONF, signal,
6761                DropNodegroupImplConf::SignalLength, JBB);
6762   }
6763   return;
6764 
6765 //error:
6766   DropNodegroupImplRef *ref =
6767     (DropNodegroupImplRef*)signal->getDataPtrSend();
6768   ref->senderRef = reference();
6769   ref->senderData = req->senderData;
6770   ref->errorCode = err;
6771   sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_REF, signal,
6772              DropNodegroupImplRef::SignalLength, JBB);
6773   return;
6774 }
6775 
6776 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
6777 
6778