1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef SUMA_H
26 #define SUMA_H
27 
28 #include <ndb_limits.h>
29 #include <SimulatedBlock.hpp>
30 
31 #include <NodeBitmask.hpp>
32 
33 #include <IntrusiveList.hpp>
34 #include <KeyTable.hpp>
35 #include <DataBuffer.hpp>
36 #include <SignalCounter.hpp>
37 #include <AttributeHeader.hpp>
38 #include <AttributeList.hpp>
39 
40 #include <signaldata/UtilSequence.hpp>
41 #include <signaldata/SumaImpl.hpp>
42 #include <ndbapi/NdbDictionary.hpp>
43 #include <NdbTick.h>
44 
45 #define JAM_FILE_ID 469
46 
47 
48 class Suma : public SimulatedBlock {
49   BLOCK_DEFINES(Suma);
50 public:
51   Suma(Block_context& ctx);
52   virtual ~Suma();
53 
54   /**
55    * Private interface
56    */
57   void execSUB_CREATE_REQ(Signal* signal);
58   void execSUB_REMOVE_REQ(Signal* signal);
59 
60   void execSUB_START_REQ(Signal* signal);
61   void execSUB_STOP_REQ(Signal* signal);
62 
63   void execSUB_SYNC_REQ(Signal* signal);
64   void execSUB_ABORT_SYNC_REQ(Signal* signal);
65 
66  /**
67    * Dict interface
68    */
69   void execGET_TABINFOREF(Signal* signal);
70   void execGET_TABINFO_CONF(Signal* signal);
71 
72   void execGET_TABLEID_CONF(Signal* signal);
73   void execGET_TABLEID_REF(Signal* signal);
74 
75   void execDROP_TAB_CONF(Signal* signal);
76   void execALTER_TAB_REQ(Signal* signal);
77   void execCREATE_TAB_CONF(Signal* signal);
78 
79   void execDICT_LOCK_REF(Signal*);
80   void execDICT_LOCK_CONF(Signal*);
81 
82   /**
83    * Scan interface
84    */
85   void execSCAN_HBREP(Signal* signal);
86   void execSCAN_FRAGREF(Signal* signal);
87   void execSCAN_FRAGCONF(Signal* signal);
88   void execTRANSID_AI(Signal* signal);
89   void execKEYINFO20(Signal* signal);
90   void execSUB_SYNC_CONTINUE_REF(Signal* signal);
91   void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
92 
93   /**
94    * Trigger logging
95    */
96   void execTRIG_ATTRINFO(Signal* signal);
97   void execFIRE_TRIG_ORD(Signal* signal);
98   void execFIRE_TRIG_ORD_L(Signal* signal);
99   void execSUB_GCP_COMPLETE_REP(Signal* signal);
100 
101   /**
102    * DIH signals
103    */
104   void execDIH_SCAN_TAB_REF(Signal* signal);
105   void execDIH_SCAN_TAB_CONF(Signal* signal);
106   void execDIH_SCAN_GET_NODES_REF(Signal* signal);
107   void execDIH_SCAN_GET_NODES_CONF(Signal* signal);
108   void execCHECKNODEGROUPSCONF(Signal *signal);
109   void execGCP_PREPARE(Signal *signal);
110 
111   /**
112    * Trigger administration
113    */
114   void execCREATE_TRIG_IMPL_REF(Signal* signal);
115   void execCREATE_TRIG_IMPL_CONF(Signal* signal);
116   void execDROP_TRIG_IMPL_REF(Signal* signal);
117   void execDROP_TRIG_IMPL_CONF(Signal* signal);
118 
119   /**
120    * continueb
121    */
122   void execCONTINUEB(Signal* signal);
123 
124   void execCREATE_NODEGROUP_IMPL_REQ(Signal*);
125   void execDROP_NODEGROUP_IMPL_REQ(Signal*);
126 public:
127 
128   void suma_ndbrequire(bool v);
129 
130   // wl4391_todo big enough for now
131   // Keep m_fragDesc within 32 bit,
132   // m_dummy is used to pass value.
133   union FragmentDescriptor {
134     struct  {
135       Uint16 m_fragmentNo;
136       Uint8 m_lqhInstanceKey;
137       Uint8 m_nodeId;
138     } m_fragDesc;
139     Uint32 m_dummy;
140   };
141 
142   /**
143    * Used when sending SCAN_FRAG
144    */
145   union AttributeDescriptor {
146     struct {
147       Uint16 attrId;
148       Uint16 unused;
149     } m_attrDesc;
150     Uint32 m_dummy;
151   };
152 
153   struct Subscriber {
SubscriberSuma::Subscriber154     Subscriber() {}
155     Uint32 m_senderRef;
156     Uint32 m_senderData;
157     Uint32 nextList;
158 
159     union { Uint32 nextPool; Uint32 prevList; };
160   };
161   typedef Ptr<Subscriber> SubscriberPtr;
162 
163   struct Table;
164   friend struct Table;
165   typedef Ptr<Table> TablePtr;
166 
167   struct SyncRecord {
SyncRecordSuma::SyncRecord168     SyncRecord(Suma& s, DataBuffer<15>::DataBufferPool & p)
169       : suma(s)
170 #ifdef ERROR_INSERT
171 	, cerrorInsert(s.cerrorInsert)
172 #endif
173     {}
174 
175     void release();
176 
177     Uint32 m_senderRef;
178     Uint32 m_senderData;
179 
180     Uint32 m_subscriptionPtrI;
181     Uint32 m_error;
182     Uint32 m_requestInfo;
183 
184     Uint32 m_frag_cnt; // only scan this many fragments...
185     Uint32 m_frag_id;  // only scan this specific fragment...
186     Uint32 m_tableId;  // redundant...
187 
188     /**
189      * Fragments
190      */
191     Uint32 m_scan_cookie;
192     DataBuffer<15>::Head m_fragments;  // Fragment descriptors
193 
194     /**
195      * Sync data
196      */
197     Uint32 m_currentFragment;       // Index in tabPtr.p->m_fragments
198     Uint32 m_currentNoOfAttributes; // No of attributes for current table
199     DataBuffer<15>::Head m_attributeList; // Attribute if other than default
200     DataBuffer<15>::Head m_boundInfo;  // For range scan
201 
202     /**
203      * Current row
204      * (assumes max 1 concurrent frag scan / syncrecord for LM_Exclusive)
205      */
206     Uint32 m_sourceInstance;
207     Uint32 m_headersSection;
208     Uint32 m_dataSection;
209 
210 
211     void startScan(Signal*);
212     void nextScan(Signal*);
213     bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
214     void completeScan(Signal*, int error= 0);
215 
216     Suma & suma;
217 #ifdef ERROR_INSERT
218     UintR &cerrorInsert;
219 #endif
numberSuma::SyncRecord220     BlockNumber number() const { return suma.number(); }
jamBufferSuma::SyncRecord221     EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
progErrorSuma::SyncRecord222     void progError(int line, int cause, const char * extra) {
223       suma.progError(line, cause, extra);
224     }
225 
226     Uint32 prevList; Uint32 ptrI;
227     union { Uint32 nextPool; Uint32 nextList; };
228   };
229   friend struct SyncRecord;
230 
231   struct SubOpRecord
232   {
SubOpRecordSuma::SubOpRecord233     SubOpRecord() {}
234 
235     enum OpType
236     {
237       R_SUB_START_REQ,
238       R_SUB_STOP_REQ,
239       R_START_ME_REQ,
240       R_API_FAIL_REQ,
241       R_SUB_ABORT_START_REQ
242     };
243 
244     Uint32 m_opType;
245     Uint32 m_subPtrI;
246     Uint32 m_senderRef;
247     Uint32 m_senderData;
248     Uint32 m_subscriberRef;
249     Uint32 m_subscriberData;
250 
251     Uint32 nextList;
252     union {
253       Uint32 prevList;
254       Uint32 nextPool;
255     };
256   };
257   friend struct SubOpRecord;
258 
259   struct Subscription
260   {
261     Uint32 m_seq_no;
262     Uint32 m_subscriptionId;
263     Uint32 m_subscriptionKey;
264     Uint32 m_subscriptionType;
265     Uint32 m_schemaTransId;
266     Uint16 m_options;
267 
268     enum Options {
269       REPORT_ALL       = 0x1,
270       REPORT_SUBSCRIBE = 0x2,
271       MARKED_DROPPED   = 0x4,
272       NO_REPORT_DDL    = 0x8
273     };
274 
275     enum State {
276       UNDEFINED,
277       DEFINED,
278       DEFINING
279     };
280 
281     enum TriggerState {
282       T_UNDEFINED,
283       T_CREATING,
284       T_DEFINED,
285       T_DROPPING,
286       T_ERROR
287     };
288 
289     State m_state;
290     TriggerState m_trigger_state;
291 
292     DLList<Subscriber>::Head m_subscribers;
293     DLFifoList<SubOpRecord>::Head m_create_req;
294     DLFifoList<SubOpRecord>::Head m_start_req;
295     DLFifoList<SubOpRecord>::Head m_stop_req;
296     DLList<SyncRecord>::Head m_syncRecords;
297 
298     Uint32 m_errorCode;
299     Uint32 m_outstanding_trigger;
300     Uint32 m_triggers[3];
301 
302     Uint32 nextList, prevList;
303     Uint32 nextHash;
304     union { Uint32 prevHash; Uint32 nextPool; };
305 
hashValueSuma::Subscription306     Uint32 hashValue() const {
307       return m_subscriptionId + m_subscriptionKey;
308     }
309 
equalSuma::Subscription310     bool equal(const Subscription & s) const {
311       return
312 	m_subscriptionId == s.m_subscriptionId &&
313 	m_subscriptionKey == s.m_subscriptionKey;
314     }
315     /**
316      * The following holds the tables included
317      * in the subscription.
318      */
319     Uint32 m_tableId;
320     Uint32 m_table_ptrI;
321   };
322   typedef Ptr<Subscription> SubscriptionPtr;
323 
324   struct Table {
TableSuma::Table325     Table() { m_tableId = ~0; }
326     void release(Suma&);
327 
328     DLList<Subscription>::Head m_subscriptions;
329 
330     enum State {
331       UNDEFINED,
332       DEFINING,
333       DEFINED,
334       DROPPED
335     };
336     State m_state;
337 
338     Uint32 m_ptrI;
339 
340     bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
341     /**
342      * Create triggers
343      */
344     void createAttributeMask(AttributeMask&, Suma &suma);
345 
346     union { Uint32 m_tableId; Uint32 key; };
347     Uint32 m_schemaVersion;
348 
349     Uint32 m_error;
350 
351     Uint32 m_noOfAttributes;
352 
353     /**
354      * Hash table stuff
355      */
356     Uint32 nextHash;
357     union { Uint32 prevHash; Uint32 nextPool; };
hashValueSuma::Table358     Uint32 hashValue() const {
359       return m_tableId;
360     }
equalSuma::Table361     bool equal(const Table& rec) const {
362       return m_tableId == rec.m_tableId;
363     }
364 
365     // copy from Subscription
366     Uint32 m_schemaTransId;
367   };
368 
369   /**
370    *
371    */
372 
373   /**
374    * Lists
375    */
376   KeyTable<Table> c_tables;
377   DLHashTable<Subscription> c_subscriptions;
378 
379   /**
380    * Pools
381    */
382   ArrayPool<Subscriber> c_subscriberPool;
383   ArrayPool<Table> c_tablePool;
384   ArrayPool<Subscription> c_subscriptionPool;
385   ArrayPool<SyncRecord> c_syncPool;
386   DataBuffer<15>::DataBufferPool c_dataBufferPool;
387   ArrayPool<SubOpRecord> c_subOpPool;
388 
389   Uint32 c_maxBufferedEpochs;
390 
391   NodeBitmask c_failedApiNodes;
392   Uint32 c_failedApiNodesState[MAX_NODES];
393 
394   /**
395    * Functions
396    */
397   bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
398 
399   void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
400 
401   void sendSubCreateRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
402   void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
403   void sendSubStopRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
404   void report_sub_stop_conf(Signal* signal,
405                             Ptr<SubOpRecord> subOpPtr,
406                             Ptr<Subscriber> ptr,
407                             bool report,
408                             LocalDLList<Subscriber>& list);
409 
410   void sendSubSyncRef(Signal* signal, Uint32 errorCode);
411   void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
412 			Uint32 errorCode);
413   void sendSubStopReq(Signal* signal, bool unlock= false);
414 
415   void completeSubRemove(SubscriptionPtr subPtr);
416 
417   void send_sub_start_stop_event(Signal *signal,
418                                  Ptr<Subscriber> ptr,
419                                  NdbDictionary::Event::_TableEvent event,
420                                  bool report,
421                                  LocalDLList<Subscriber>& list);
422 
423   Uint32 getFirstGCI(Signal* signal);
424 
425   void create_triggers(Signal*, Ptr<Subscription>);
426   void drop_triggers(Signal*, Ptr<Subscription>);
427   void drop_triggers_complete(Signal*, Ptr<Subscription>);
428 
429   bool check_sub_start(Uint32 subscriberRef);
430   void report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr);
431   void report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32);
432 
433   void sub_stop_req(Signal*);
434   void check_remove_queue(Signal*, Ptr<Subscription>,
435                           Ptr<SubOpRecord>,bool,bool);
436   void check_release_subscription(Signal* signal, Ptr<Subscription>);
437   void get_tabinfo_ref_release(Signal*, Ptr<Table>);
438 
439   /**
440    * Public interface
441    */
442   void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
443   void execDROP_SUBSCRIPTION_REQ(Signal* signal);
444 
445   void execSTART_SUBSCRIPTION_REQ(Signal* signal);
446   void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
447 
448   void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
449   void execABORT_SYNC_REQ(Signal* signal);
450 
451   /**
452    * Framework signals
453    */
454 
455   void getNodeGroupMembers(Signal* signal);
456   void execREAD_CONFIG_REQ(Signal* signal);
457 
458   void execSTTOR(Signal* signal);
459   void sendSTTORRY(Signal*);
460   void execNDB_STTOR(Signal* signal);
461   void execDUMP_STATE_ORD(Signal* signal);
462   void execDBINFO_SCANREQ(Signal* signal);
463   void execREAD_NODESCONF(Signal* signal);
464   void execNODE_FAILREP(Signal* signal);
465   void execINCL_NODEREQ(Signal* signal);
466   void execSIGNAL_DROPPED_REP(Signal* signal);
467   void execAPI_START_REP(Signal* signal);
468   void execAPI_FAILREQ(Signal* signal) ;
469 
470   void api_fail_gci_list(Signal*, Uint32 node);
471   void api_fail_subscriber_list(Signal*, Uint32 node);
472   void api_fail_subscription(Signal*);
473   void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
474   void api_fail_block_cleanup_callback(Signal* signal,
475                                        Uint32 failedNodeId,
476                                        Uint32 elementsCleaned);
477 
478   void execSUB_GCP_COMPLETE_ACK(Signal* signal);
479 
480   /**
481    * Controller interface
482    */
483   void execSUB_CREATE_REF(Signal* signal);
484   void execSUB_CREATE_CONF(Signal* signal);
485 
486   void execSUB_DROP_REF(Signal* signal);
487   void execSUB_DROP_CONF(Signal* signal);
488 
489   void execSUB_START_REF(Signal* signal);
490   void execSUB_START_CONF(Signal* signal);
491 
492   void execSUB_ABORT_SYNC_REF(Signal* signal);
493   void execSUB_ABORT_SYNC_CONF(Signal* signal);
494 
495   void execSUMA_START_ME_REQ(Signal* signal);
496   void execSUMA_START_ME_REF(Signal* signal);
497   void execSUMA_START_ME_CONF(Signal* signal);
498 
499   void execSTOP_ME_REQ(Signal*);
500 
501   void copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator);
502   void sendSubCreateReq(Signal* signal, Ptr<Subscription>);
503   void copySubscriber(Signal*, Ptr<Subscription>, Ptr<Subscriber>);
504   void abort_start_me(Signal*, Ptr<Subscription>, bool lockowner);
505 
506   void execSUMA_HANDOVER_REQ(Signal* signal);
507   void execSUMA_HANDOVER_REF(Signal* signal);
508   void execSUMA_HANDOVER_CONF(Signal* signal);
509 
510   /**
511    * Subscription generation interface
512    */
513   void createSequence(Signal* signal);
514   void createSequenceReply(Signal* signal,
515 			   UtilSequenceConf* conf,
516 			   UtilSequenceRef* ref);
517   void execUTIL_SEQUENCE_CONF(Signal* signal);
518   void execUTIL_SEQUENCE_REF(Signal* signal);
519   void execCREATE_SUBID_REQ(Signal* signal);
520 
521   /**
522    * for Suma that is restarting another
523    */
524 
525   // for LQH transporter overload check
getSubscriberNodes() const526   const NodeBitmask& getSubscriberNodes() const { return c_subscriber_nodes; }
527 
528 protected:
529   virtual bool getParam(const char * param, Uint32 * retVal);
530 
531 private:
532   /**
533    * Variables
534    */
535   NodeId c_masterNodeId;
536   NdbNodeBitmask c_alive_nodes;
537 
538   /**
539    * for restarting Suma not to start sending data too early
540    */
541 
542   struct Startup
543   {
544     Uint32 m_wait_handover_timeout_ms; // Max time to wait in phase 101 for API nodes to connect
545     bool m_wait_handover;
546     NDB_TICKS m_wait_handover_expire;
547     NDB_TICKS m_wait_handover_message_expire;
548     bool m_forced_disconnect_attempted;
549     Uint32 m_restart_server_node_id;
550     NdbNodeBitmask m_handover_nodes;
551   } c_startup;
552 
553   /**
554    * for graceful shutdown
555    */
556   struct Shutdown
557   {
558     bool m_wait_handover;
559     Uint32 m_senderRef;
560     Uint32 m_senderData;
561   } c_shutdown;
562 
563   struct Restart
564   {
565     Uint16 m_abort;
566     Uint16 m_waiting_on_self;
567     Uint32 m_ref;
568     Uint32 m_max_seq;
569     Uint32 m_subPtrI;
570     Uint32 m_subOpPtrI;
571     Uint32 m_bucket; // In c_subscribers hashtable
572   } c_restart;
573 
574   Uint32 c_current_seq; // Sequence no on subscription(s)
575   Uint32 c_outstanding_drop_trig_req;
576 
577   NodeBitmask c_connected_nodes;  // (NODE/API) START REP / (API/NODE) FAIL REQ
578   NodeBitmask c_subscriber_nodes; //
579 
580   /**
581    * for all Suma's to keep track of other Suma's in Node group
582    */
583   Uint32 c_nodeGroup;
584   Uint32 c_noNodesInGroup;
585   Uint32 c_nodesInGroup[MAX_REPLICAS];
586   NdbNodeBitmask c_nodes_in_nodegroup_mask;  // NodeId's of nodes in nodegroup
587 
588   void send_dict_lock_req(Signal* signal, Uint32 state);
589   void send_dict_unlock_ord(Signal* signal, Uint32 state);
590   void send_start_me_req(Signal* signal);
591   void check_start_handover(Signal* signal);
592   void check_wait_handover_timeout(Signal* signal);
593   void check_wait_handover_message(NDB_TICKS now);
594   void send_handover_req(Signal* signal, Uint32 type);
595 
596   void calculate_sub_data_stream(Uint16 bucket, Uint16 buckets, Uint16 replicas);
597   Uint16 get_sub_data_stream(Uint16 bucket) const;
598   Uint32 get_responsible_node(Uint32 B) const;
599   Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
600   bool check_switchover(Uint32 bucket, Uint64 gci);
601 
602   void fix_nodegroup();
603 
604 public:
605   struct Page_pos
606   {
607     Uint32 m_page_id;
608     Uint32 m_page_pos;
609     Uint64 m_max_gci;   // max gci on page
610     Uint64 m_last_gci;  // last gci on page
611   };
612 private:
613 
614   struct Bucket
615   {
616     enum {
617       BUCKET_STARTING = 0x1  // On starting node
618       ,BUCKET_HANDOVER = 0x2 // On running node
619       ,BUCKET_TAKEOVER = 0x4 // On takeing over node
620       ,BUCKET_RESEND   = 0x8 // On takeing over node
621       ,BUCKET_CREATED_SELF  = 0x10 // New nodegroup (me)
622       ,BUCKET_CREATED_OTHER = 0x20 // New nodegroup (not me)
623       ,BUCKET_CREATED_MASK  = (BUCKET_CREATED_SELF | BUCKET_CREATED_OTHER)
624       ,BUCKET_DROPPED_SELF  = 0x40 // New nodegroup (me) uses hi 8 bit for cnt
625       ,BUCKET_DROPPED_OTHER = 0x80 // New nodegroup (not me)
626       ,BUCKET_DROPPED_MASK  = (BUCKET_DROPPED_SELF | BUCKET_DROPPED_OTHER)
627       ,BUCKET_SHUTDOWN = 0x100 // Graceful shutdown
628       ,BUCKET_SHUTDOWN_TO = 0x200 // Graceful shutdown
629     };
630     Uint16 m_state;
631     Uint16 m_switchover_node;
632     Uint16 m_nodes[MAX_REPLICAS];
633     Uint16 m_sub_data_stream;
634     Uint32 m_buffer_tail;   // Page
635     Uint64 m_switchover_gci;
636     Uint64 m_max_acked_gci;
637     Page_pos m_buffer_head;
638   };
639 
640   struct Buffer_page
641   {
642     STATIC_CONST( DATA_WORDS = 8192 - 10);
643     STATIC_CONST( GCI_SZ32 = 2 );
644 
645     Uint32 _tupdata1;
646     Uint32 _tupdata2;
647     Uint32 _tupdata3;
648     Uint32 _tupdata4;
649     Uint32 m_page_state;     // Used by TUP buddy algorithm
650     Uint32 m_page_chunk_ptr_i;
651     Uint32 m_next_page;
652     Uint32 m_words_used;     //
653     Uint32 m_max_gci_hi;     //
654     Uint32 m_max_gci_lo;     //
655     Uint32 m_data[DATA_WORDS];
656   };
657 
658   STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
659   Uint32 c_no_of_buckets;
660   struct Bucket c_buckets[NO_OF_BUCKETS];
661   Uint32 c_subscriber_per_node[MAX_NODES];
662 
663   STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
664   typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
665   Bucket_mask m_active_buckets;
666   Bucket_mask m_switchover_buckets;
667 
668   void init_buffers();
669   Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint64 gci, Uint32 sz);
670   Uint32 seize_page();
671   void free_page(Uint32 page_id, Buffer_page* page);
672   void out_of_buffer(Signal*);
673   void out_of_buffer_release(Signal* signal, Uint32 buck);
674 
675   void start_resend(Signal*, Uint32 bucket);
676   void resend_bucket(Signal*, Uint32 bucket, Uint64 gci,
677 		     Uint32 page_pos, Uint64 last_gci);
678   void release_gci(Signal*, Uint32 bucket, Uint64 gci);
679 
680   Uint64 get_current_gci(Signal*);
681 
682   void checkMaxBufferedEpochs(Signal *signal);
683 
684   Uint64 m_max_seen_gci;      // FIRE_TRIG_ORD
685   Uint64 m_max_sent_gci;      // FIRE_TRIG_ORD -> send
686   Uint64 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
687   Uint64 m_out_of_buffer_gci;
688   Uint32 m_gcp_complete_rep_count;
689   bool m_missing_data;
690 
691   struct Gcp_record
692   {
693     Uint64 m_gci;
694     NodeBitmask m_subscribers;
695     union {
696       Uint32 nextPool;
697       Uint32 nextList;
698     };
699     Uint32 prevList;
700   };
701   ArrayPool<Gcp_record> c_gcp_pool;
702   DLCFifoList<Gcp_record> c_gcp_list;
703 
704   struct Page_chunk
705   {
706     STATIC_CONST( CHUNK_PAGE_SIZE = 32768 );
707     STATIC_CONST( PAGES_PER_CHUNK = 16 );
708 
709     Uint32 m_page_id;
710     Uint32 m_size;
711     Uint32 m_free;
712     union {
713       Uint32 nextPool;
714       Uint32 nextList;
715     };
716     Uint32 prevList;
717   };
718 
719   Uint32 m_first_free_page;
720   ArrayPool<Page_chunk> c_page_chunk_pool;
721   ArrayPool<Buffer_page> c_page_pool;
722 
723 #ifdef VM_TRACE
724   Uint64 m_gcp_monitor;
725 #endif
726 
727   struct SubGcpCompleteCounter
728   {
729     Uint64 m_gci;
730     Uint32 m_cnt;
731   };
732 
733   Uint32 m_gcp_rep_cnt;
734   Uint32 m_min_gcp_rep_counter_index;
735   Uint32 m_max_gcp_rep_counter_index;
736   struct SubGcpCompleteCounter m_gcp_rep_counter[10];
737 
738   /* Buffer used in Suma::execALTER_TAB_REQ(). */
739   Uint32 b_dti_buf[MAX_WORDS_META_FILE];
740   Uint64 m_current_gci;
741 
742   Uint32 m_startphase;
743   Uint32 m_typeOfStart;
744 
745   void sendScanSubTableData(Signal* signal, Ptr<SyncRecord>, Uint32);
746 };
747 
748 inline
749 Uint16
get_sub_data_stream(Uint16 bucket) const750 Suma::get_sub_data_stream(Uint16 bucket) const
751 {
752   ndbassert(bucket < NO_OF_BUCKETS);
753   const Bucket* ptr= c_buckets + bucket;
754   return ptr->m_sub_data_stream;
755 }
756 
757 #undef JAM_FILE_ID
758 
759 #endif
760