1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef SUMA_H
26 #define SUMA_H
27
28 #include <ndb_limits.h>
29 #include <SimulatedBlock.hpp>
30
31 #include <NodeBitmask.hpp>
32
33 #include <IntrusiveList.hpp>
34 #include <KeyTable.hpp>
35 #include <DataBuffer.hpp>
36 #include <SignalCounter.hpp>
37 #include <AttributeHeader.hpp>
38 #include <AttributeList.hpp>
39
40 #include <signaldata/UtilSequence.hpp>
41 #include <signaldata/SumaImpl.hpp>
42 #include <ndbapi/NdbDictionary.hpp>
43 #include <NdbTick.h>
44
45 #define JAM_FILE_ID 469
46
47
48 class Suma : public SimulatedBlock {
49 BLOCK_DEFINES(Suma);
50 public:
51 Suma(Block_context& ctx);
52 virtual ~Suma();
53
54 /**
55 * Private interface
56 */
57 void execSUB_CREATE_REQ(Signal* signal);
58 void execSUB_REMOVE_REQ(Signal* signal);
59
60 void execSUB_START_REQ(Signal* signal);
61 void execSUB_STOP_REQ(Signal* signal);
62
63 void execSUB_SYNC_REQ(Signal* signal);
64 void execSUB_ABORT_SYNC_REQ(Signal* signal);
65
66 /**
67 * Dict interface
68 */
69 void execGET_TABINFOREF(Signal* signal);
70 void execGET_TABINFO_CONF(Signal* signal);
71
72 void execGET_TABLEID_CONF(Signal* signal);
73 void execGET_TABLEID_REF(Signal* signal);
74
75 void execDROP_TAB_CONF(Signal* signal);
76 void execALTER_TAB_REQ(Signal* signal);
77 void execCREATE_TAB_CONF(Signal* signal);
78
79 void execDICT_LOCK_REF(Signal*);
80 void execDICT_LOCK_CONF(Signal*);
81
82 /**
83 * Scan interface
84 */
85 void execSCAN_HBREP(Signal* signal);
86 void execSCAN_FRAGREF(Signal* signal);
87 void execSCAN_FRAGCONF(Signal* signal);
88 void execTRANSID_AI(Signal* signal);
89 void execKEYINFO20(Signal* signal);
90 void execSUB_SYNC_CONTINUE_REF(Signal* signal);
91 void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
92
93 /**
94 * Trigger logging
95 */
96 void execTRIG_ATTRINFO(Signal* signal);
97 void execFIRE_TRIG_ORD(Signal* signal);
98 void execFIRE_TRIG_ORD_L(Signal* signal);
99 void execSUB_GCP_COMPLETE_REP(Signal* signal);
100
101 /**
102 * DIH signals
103 */
104 void execDIH_SCAN_TAB_REF(Signal* signal);
105 void execDIH_SCAN_TAB_CONF(Signal* signal);
106 void execDIH_SCAN_GET_NODES_REF(Signal* signal);
107 void execDIH_SCAN_GET_NODES_CONF(Signal* signal);
108 void execCHECKNODEGROUPSCONF(Signal *signal);
109 void execGCP_PREPARE(Signal *signal);
110
111 /**
112 * Trigger administration
113 */
114 void execCREATE_TRIG_IMPL_REF(Signal* signal);
115 void execCREATE_TRIG_IMPL_CONF(Signal* signal);
116 void execDROP_TRIG_IMPL_REF(Signal* signal);
117 void execDROP_TRIG_IMPL_CONF(Signal* signal);
118
119 /**
120 * continueb
121 */
122 void execCONTINUEB(Signal* signal);
123
124 void execCREATE_NODEGROUP_IMPL_REQ(Signal*);
125 void execDROP_NODEGROUP_IMPL_REQ(Signal*);
126 public:
127
128 void suma_ndbrequire(bool v);
129
130 // wl4391_todo big enough for now
131 // Keep m_fragDesc within 32 bit,
132 // m_dummy is used to pass value.
133 union FragmentDescriptor {
134 struct {
135 Uint16 m_fragmentNo;
136 Uint8 m_lqhInstanceKey;
137 Uint8 m_nodeId;
138 } m_fragDesc;
139 Uint32 m_dummy;
140 };
141
142 /**
143 * Used when sending SCAN_FRAG
144 */
145 union AttributeDescriptor {
146 struct {
147 Uint16 attrId;
148 Uint16 unused;
149 } m_attrDesc;
150 Uint32 m_dummy;
151 };
152
153 struct Subscriber {
SubscriberSuma::Subscriber154 Subscriber() {}
155 Uint32 m_senderRef;
156 Uint32 m_senderData;
157 Uint32 nextList;
158
159 union { Uint32 nextPool; Uint32 prevList; };
160 };
161 typedef Ptr<Subscriber> SubscriberPtr;
162
163 struct Table;
164 friend struct Table;
165 typedef Ptr<Table> TablePtr;
166
167 struct SyncRecord {
SyncRecordSuma::SyncRecord168 SyncRecord(Suma& s, DataBuffer<15>::DataBufferPool & p)
169 : suma(s)
170 #ifdef ERROR_INSERT
171 , cerrorInsert(s.cerrorInsert)
172 #endif
173 {}
174
175 void release();
176
177 Uint32 m_senderRef;
178 Uint32 m_senderData;
179
180 Uint32 m_subscriptionPtrI;
181 Uint32 m_error;
182 Uint32 m_requestInfo;
183
184 Uint32 m_frag_cnt; // only scan this many fragments...
185 Uint32 m_frag_id; // only scan this specific fragment...
186 Uint32 m_tableId; // redundant...
187
188 /**
189 * Fragments
190 */
191 Uint32 m_scan_cookie;
192 DataBuffer<15>::Head m_fragments; // Fragment descriptors
193
194 /**
195 * Sync data
196 */
197 Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
198 Uint32 m_currentNoOfAttributes; // No of attributes for current table
199 DataBuffer<15>::Head m_attributeList; // Attribute if other than default
200 DataBuffer<15>::Head m_boundInfo; // For range scan
201
202 /**
203 * Current row
204 * (assumes max 1 concurrent frag scan / syncrecord for LM_Exclusive)
205 */
206 Uint32 m_sourceInstance;
207 Uint32 m_headersSection;
208 Uint32 m_dataSection;
209
210
211 void startScan(Signal*);
212 void nextScan(Signal*);
213 bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
214 void completeScan(Signal*, int error= 0);
215
216 Suma & suma;
217 #ifdef ERROR_INSERT
218 UintR &cerrorInsert;
219 #endif
numberSuma::SyncRecord220 BlockNumber number() const { return suma.number(); }
jamBufferSuma::SyncRecord221 EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
progErrorSuma::SyncRecord222 void progError(int line, int cause, const char * extra) {
223 suma.progError(line, cause, extra);
224 }
225
226 Uint32 prevList; Uint32 ptrI;
227 union { Uint32 nextPool; Uint32 nextList; };
228 };
229 friend struct SyncRecord;
230
231 struct SubOpRecord
232 {
SubOpRecordSuma::SubOpRecord233 SubOpRecord() {}
234
235 enum OpType
236 {
237 R_SUB_START_REQ,
238 R_SUB_STOP_REQ,
239 R_START_ME_REQ,
240 R_API_FAIL_REQ,
241 R_SUB_ABORT_START_REQ
242 };
243
244 Uint32 m_opType;
245 Uint32 m_subPtrI;
246 Uint32 m_senderRef;
247 Uint32 m_senderData;
248 Uint32 m_subscriberRef;
249 Uint32 m_subscriberData;
250
251 Uint32 nextList;
252 union {
253 Uint32 prevList;
254 Uint32 nextPool;
255 };
256 };
257 friend struct SubOpRecord;
258
259 struct Subscription
260 {
261 Uint32 m_seq_no;
262 Uint32 m_subscriptionId;
263 Uint32 m_subscriptionKey;
264 Uint32 m_subscriptionType;
265 Uint32 m_schemaTransId;
266 Uint16 m_options;
267
268 enum Options {
269 REPORT_ALL = 0x1,
270 REPORT_SUBSCRIBE = 0x2,
271 MARKED_DROPPED = 0x4,
272 NO_REPORT_DDL = 0x8
273 };
274
275 enum State {
276 UNDEFINED,
277 DEFINED,
278 DEFINING
279 };
280
281 enum TriggerState {
282 T_UNDEFINED,
283 T_CREATING,
284 T_DEFINED,
285 T_DROPPING,
286 T_ERROR
287 };
288
289 State m_state;
290 TriggerState m_trigger_state;
291
292 DLList<Subscriber>::Head m_subscribers;
293 DLFifoList<SubOpRecord>::Head m_create_req;
294 DLFifoList<SubOpRecord>::Head m_start_req;
295 DLFifoList<SubOpRecord>::Head m_stop_req;
296 DLList<SyncRecord>::Head m_syncRecords;
297
298 Uint32 m_errorCode;
299 Uint32 m_outstanding_trigger;
300 Uint32 m_triggers[3];
301
302 Uint32 nextList, prevList;
303 Uint32 nextHash;
304 union { Uint32 prevHash; Uint32 nextPool; };
305
hashValueSuma::Subscription306 Uint32 hashValue() const {
307 return m_subscriptionId + m_subscriptionKey;
308 }
309
equalSuma::Subscription310 bool equal(const Subscription & s) const {
311 return
312 m_subscriptionId == s.m_subscriptionId &&
313 m_subscriptionKey == s.m_subscriptionKey;
314 }
315 /**
316 * The following holds the tables included
317 * in the subscription.
318 */
319 Uint32 m_tableId;
320 Uint32 m_table_ptrI;
321 };
322 typedef Ptr<Subscription> SubscriptionPtr;
323
324 struct Table {
TableSuma::Table325 Table() { m_tableId = ~0; }
326 void release(Suma&);
327
328 DLList<Subscription>::Head m_subscriptions;
329
330 enum State {
331 UNDEFINED,
332 DEFINING,
333 DEFINED,
334 DROPPED
335 };
336 State m_state;
337
338 Uint32 m_ptrI;
339
340 bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
341 /**
342 * Create triggers
343 */
344 void createAttributeMask(AttributeMask&, Suma &suma);
345
346 union { Uint32 m_tableId; Uint32 key; };
347 Uint32 m_schemaVersion;
348
349 Uint32 m_error;
350
351 Uint32 m_noOfAttributes;
352
353 /**
354 * Hash table stuff
355 */
356 Uint32 nextHash;
357 union { Uint32 prevHash; Uint32 nextPool; };
hashValueSuma::Table358 Uint32 hashValue() const {
359 return m_tableId;
360 }
equalSuma::Table361 bool equal(const Table& rec) const {
362 return m_tableId == rec.m_tableId;
363 }
364
365 // copy from Subscription
366 Uint32 m_schemaTransId;
367 };
368
369 /**
370 *
371 */
372
373 /**
374 * Lists
375 */
376 KeyTable<Table> c_tables;
377 DLHashTable<Subscription> c_subscriptions;
378
379 /**
380 * Pools
381 */
382 ArrayPool<Subscriber> c_subscriberPool;
383 ArrayPool<Table> c_tablePool;
384 ArrayPool<Subscription> c_subscriptionPool;
385 ArrayPool<SyncRecord> c_syncPool;
386 DataBuffer<15>::DataBufferPool c_dataBufferPool;
387 ArrayPool<SubOpRecord> c_subOpPool;
388
389 Uint32 c_maxBufferedEpochs;
390
391 NodeBitmask c_failedApiNodes;
392 Uint32 c_failedApiNodesState[MAX_NODES];
393
394 /**
395 * Functions
396 */
397 bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
398
399 void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
400
401 void sendSubCreateRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
402 void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
403 void sendSubStopRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
404 void report_sub_stop_conf(Signal* signal,
405 Ptr<SubOpRecord> subOpPtr,
406 Ptr<Subscriber> ptr,
407 bool report,
408 LocalDLList<Subscriber>& list);
409
410 void sendSubSyncRef(Signal* signal, Uint32 errorCode);
411 void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
412 Uint32 errorCode);
413 void sendSubStopReq(Signal* signal, bool unlock= false);
414
415 void completeSubRemove(SubscriptionPtr subPtr);
416
417 void send_sub_start_stop_event(Signal *signal,
418 Ptr<Subscriber> ptr,
419 NdbDictionary::Event::_TableEvent event,
420 bool report,
421 LocalDLList<Subscriber>& list);
422
423 Uint32 getFirstGCI(Signal* signal);
424
425 void create_triggers(Signal*, Ptr<Subscription>);
426 void drop_triggers(Signal*, Ptr<Subscription>);
427 void drop_triggers_complete(Signal*, Ptr<Subscription>);
428
429 bool check_sub_start(Uint32 subscriberRef);
430 void report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr);
431 void report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32);
432
433 void sub_stop_req(Signal*);
434 void check_remove_queue(Signal*, Ptr<Subscription>,
435 Ptr<SubOpRecord>,bool,bool);
436 void check_release_subscription(Signal* signal, Ptr<Subscription>);
437 void get_tabinfo_ref_release(Signal*, Ptr<Table>);
438
439 /**
440 * Public interface
441 */
442 void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
443 void execDROP_SUBSCRIPTION_REQ(Signal* signal);
444
445 void execSTART_SUBSCRIPTION_REQ(Signal* signal);
446 void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
447
448 void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
449 void execABORT_SYNC_REQ(Signal* signal);
450
451 /**
452 * Framework signals
453 */
454
455 void getNodeGroupMembers(Signal* signal);
456 void execREAD_CONFIG_REQ(Signal* signal);
457
458 void execSTTOR(Signal* signal);
459 void sendSTTORRY(Signal*);
460 void execNDB_STTOR(Signal* signal);
461 void execDUMP_STATE_ORD(Signal* signal);
462 void execDBINFO_SCANREQ(Signal* signal);
463 void execREAD_NODESCONF(Signal* signal);
464 void execNODE_FAILREP(Signal* signal);
465 void execINCL_NODEREQ(Signal* signal);
466 void execSIGNAL_DROPPED_REP(Signal* signal);
467 void execAPI_START_REP(Signal* signal);
468 void execAPI_FAILREQ(Signal* signal) ;
469
470 void api_fail_gci_list(Signal*, Uint32 node);
471 void api_fail_subscriber_list(Signal*, Uint32 node);
472 void api_fail_subscription(Signal*);
473 void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
474 void api_fail_block_cleanup_callback(Signal* signal,
475 Uint32 failedNodeId,
476 Uint32 elementsCleaned);
477
478 void execSUB_GCP_COMPLETE_ACK(Signal* signal);
479
480 /**
481 * Controller interface
482 */
483 void execSUB_CREATE_REF(Signal* signal);
484 void execSUB_CREATE_CONF(Signal* signal);
485
486 void execSUB_DROP_REF(Signal* signal);
487 void execSUB_DROP_CONF(Signal* signal);
488
489 void execSUB_START_REF(Signal* signal);
490 void execSUB_START_CONF(Signal* signal);
491
492 void execSUB_ABORT_SYNC_REF(Signal* signal);
493 void execSUB_ABORT_SYNC_CONF(Signal* signal);
494
495 void execSUMA_START_ME_REQ(Signal* signal);
496 void execSUMA_START_ME_REF(Signal* signal);
497 void execSUMA_START_ME_CONF(Signal* signal);
498
499 void execSTOP_ME_REQ(Signal*);
500
501 void copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator);
502 void sendSubCreateReq(Signal* signal, Ptr<Subscription>);
503 void copySubscriber(Signal*, Ptr<Subscription>, Ptr<Subscriber>);
504 void abort_start_me(Signal*, Ptr<Subscription>, bool lockowner);
505
506 void execSUMA_HANDOVER_REQ(Signal* signal);
507 void execSUMA_HANDOVER_REF(Signal* signal);
508 void execSUMA_HANDOVER_CONF(Signal* signal);
509
510 /**
511 * Subscription generation interface
512 */
513 void createSequence(Signal* signal);
514 void createSequenceReply(Signal* signal,
515 UtilSequenceConf* conf,
516 UtilSequenceRef* ref);
517 void execUTIL_SEQUENCE_CONF(Signal* signal);
518 void execUTIL_SEQUENCE_REF(Signal* signal);
519 void execCREATE_SUBID_REQ(Signal* signal);
520
521 /**
522 * for Suma that is restarting another
523 */
524
525 // for LQH transporter overload check
getSubscriberNodes() const526 const NodeBitmask& getSubscriberNodes() const { return c_subscriber_nodes; }
527
528 protected:
529 virtual bool getParam(const char * param, Uint32 * retVal);
530
531 private:
532 /**
533 * Variables
534 */
535 NodeId c_masterNodeId;
536 NdbNodeBitmask c_alive_nodes;
537
538 /**
539 * for restarting Suma not to start sending data too early
540 */
541
542 struct Startup
543 {
544 Uint32 m_wait_handover_timeout_ms; // Max time to wait in phase 101 for API nodes to connect
545 bool m_wait_handover;
546 NDB_TICKS m_wait_handover_expire;
547 NDB_TICKS m_wait_handover_message_expire;
548 bool m_forced_disconnect_attempted;
549 Uint32 m_restart_server_node_id;
550 NdbNodeBitmask m_handover_nodes;
551 } c_startup;
552
553 /**
554 * for graceful shutdown
555 */
556 struct Shutdown
557 {
558 bool m_wait_handover;
559 Uint32 m_senderRef;
560 Uint32 m_senderData;
561 } c_shutdown;
562
563 struct Restart
564 {
565 Uint16 m_abort;
566 Uint16 m_waiting_on_self;
567 Uint32 m_ref;
568 Uint32 m_max_seq;
569 Uint32 m_subPtrI;
570 Uint32 m_subOpPtrI;
571 Uint32 m_bucket; // In c_subscribers hashtable
572 } c_restart;
573
574 Uint32 c_current_seq; // Sequence no on subscription(s)
575 Uint32 c_outstanding_drop_trig_req;
576
577 NodeBitmask c_connected_nodes; // (NODE/API) START REP / (API/NODE) FAIL REQ
578 NodeBitmask c_subscriber_nodes; //
579
580 /**
581 * for all Suma's to keep track of other Suma's in Node group
582 */
583 Uint32 c_nodeGroup;
584 Uint32 c_noNodesInGroup;
585 Uint32 c_nodesInGroup[MAX_REPLICAS];
586 NdbNodeBitmask c_nodes_in_nodegroup_mask; // NodeId's of nodes in nodegroup
587
588 void send_dict_lock_req(Signal* signal, Uint32 state);
589 void send_dict_unlock_ord(Signal* signal, Uint32 state);
590 void send_start_me_req(Signal* signal);
591 void check_start_handover(Signal* signal);
592 void check_wait_handover_timeout(Signal* signal);
593 void check_wait_handover_message(NDB_TICKS now);
594 void send_handover_req(Signal* signal, Uint32 type);
595
596 void calculate_sub_data_stream(Uint16 bucket, Uint16 buckets, Uint16 replicas);
597 Uint16 get_sub_data_stream(Uint16 bucket) const;
598 Uint32 get_responsible_node(Uint32 B) const;
599 Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
600 bool check_switchover(Uint32 bucket, Uint64 gci);
601
602 void fix_nodegroup();
603
604 public:
605 struct Page_pos
606 {
607 Uint32 m_page_id;
608 Uint32 m_page_pos;
609 Uint64 m_max_gci; // max gci on page
610 Uint64 m_last_gci; // last gci on page
611 };
612 private:
613
614 struct Bucket
615 {
616 enum {
617 BUCKET_STARTING = 0x1 // On starting node
618 ,BUCKET_HANDOVER = 0x2 // On running node
619 ,BUCKET_TAKEOVER = 0x4 // On takeing over node
620 ,BUCKET_RESEND = 0x8 // On takeing over node
621 ,BUCKET_CREATED_SELF = 0x10 // New nodegroup (me)
622 ,BUCKET_CREATED_OTHER = 0x20 // New nodegroup (not me)
623 ,BUCKET_CREATED_MASK = (BUCKET_CREATED_SELF | BUCKET_CREATED_OTHER)
624 ,BUCKET_DROPPED_SELF = 0x40 // New nodegroup (me) uses hi 8 bit for cnt
625 ,BUCKET_DROPPED_OTHER = 0x80 // New nodegroup (not me)
626 ,BUCKET_DROPPED_MASK = (BUCKET_DROPPED_SELF | BUCKET_DROPPED_OTHER)
627 ,BUCKET_SHUTDOWN = 0x100 // Graceful shutdown
628 ,BUCKET_SHUTDOWN_TO = 0x200 // Graceful shutdown
629 };
630 Uint16 m_state;
631 Uint16 m_switchover_node;
632 Uint16 m_nodes[MAX_REPLICAS];
633 Uint16 m_sub_data_stream;
634 Uint32 m_buffer_tail; // Page
635 Uint64 m_switchover_gci;
636 Uint64 m_max_acked_gci;
637 Page_pos m_buffer_head;
638 };
639
640 struct Buffer_page
641 {
642 STATIC_CONST( DATA_WORDS = 8192 - 10);
643 STATIC_CONST( GCI_SZ32 = 2 );
644
645 Uint32 _tupdata1;
646 Uint32 _tupdata2;
647 Uint32 _tupdata3;
648 Uint32 _tupdata4;
649 Uint32 m_page_state; // Used by TUP buddy algorithm
650 Uint32 m_page_chunk_ptr_i;
651 Uint32 m_next_page;
652 Uint32 m_words_used; //
653 Uint32 m_max_gci_hi; //
654 Uint32 m_max_gci_lo; //
655 Uint32 m_data[DATA_WORDS];
656 };
657
658 STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
659 Uint32 c_no_of_buckets;
660 struct Bucket c_buckets[NO_OF_BUCKETS];
661 Uint32 c_subscriber_per_node[MAX_NODES];
662
663 STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
664 typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
665 Bucket_mask m_active_buckets;
666 Bucket_mask m_switchover_buckets;
667
668 void init_buffers();
669 Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint64 gci, Uint32 sz);
670 Uint32 seize_page();
671 void free_page(Uint32 page_id, Buffer_page* page);
672 void out_of_buffer(Signal*);
673 void out_of_buffer_release(Signal* signal, Uint32 buck);
674
675 void start_resend(Signal*, Uint32 bucket);
676 void resend_bucket(Signal*, Uint32 bucket, Uint64 gci,
677 Uint32 page_pos, Uint64 last_gci);
678 void release_gci(Signal*, Uint32 bucket, Uint64 gci);
679
680 Uint64 get_current_gci(Signal*);
681
682 void checkMaxBufferedEpochs(Signal *signal);
683
684 Uint64 m_max_seen_gci; // FIRE_TRIG_ORD
685 Uint64 m_max_sent_gci; // FIRE_TRIG_ORD -> send
686 Uint64 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
687 Uint64 m_out_of_buffer_gci;
688 Uint32 m_gcp_complete_rep_count;
689 bool m_missing_data;
690
691 struct Gcp_record
692 {
693 Uint64 m_gci;
694 NodeBitmask m_subscribers;
695 union {
696 Uint32 nextPool;
697 Uint32 nextList;
698 };
699 Uint32 prevList;
700 };
701 ArrayPool<Gcp_record> c_gcp_pool;
702 DLCFifoList<Gcp_record> c_gcp_list;
703
704 struct Page_chunk
705 {
706 STATIC_CONST( CHUNK_PAGE_SIZE = 32768 );
707 STATIC_CONST( PAGES_PER_CHUNK = 16 );
708
709 Uint32 m_page_id;
710 Uint32 m_size;
711 Uint32 m_free;
712 union {
713 Uint32 nextPool;
714 Uint32 nextList;
715 };
716 Uint32 prevList;
717 };
718
719 Uint32 m_first_free_page;
720 ArrayPool<Page_chunk> c_page_chunk_pool;
721 ArrayPool<Buffer_page> c_page_pool;
722
723 #ifdef VM_TRACE
724 Uint64 m_gcp_monitor;
725 #endif
726
727 struct SubGcpCompleteCounter
728 {
729 Uint64 m_gci;
730 Uint32 m_cnt;
731 };
732
733 Uint32 m_gcp_rep_cnt;
734 Uint32 m_min_gcp_rep_counter_index;
735 Uint32 m_max_gcp_rep_counter_index;
736 struct SubGcpCompleteCounter m_gcp_rep_counter[10];
737
738 /* Buffer used in Suma::execALTER_TAB_REQ(). */
739 Uint32 b_dti_buf[MAX_WORDS_META_FILE];
740 Uint64 m_current_gci;
741
742 Uint32 m_startphase;
743 Uint32 m_typeOfStart;
744
745 void sendScanSubTableData(Signal* signal, Ptr<SyncRecord>, Uint32);
746 };
747
748 inline
749 Uint16
get_sub_data_stream(Uint16 bucket) const750 Suma::get_sub_data_stream(Uint16 bucket) const
751 {
752 ndbassert(bucket < NO_OF_BUCKETS);
753 const Bucket* ptr= c_buckets + bucket;
754 return ptr->m_sub_data_stream;
755 }
756
757 #undef JAM_FILE_ID
758
759 #endif
760