1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef NdbEventOperationImpl_H
26 #define NdbEventOperationImpl_H
27
28 #include <NdbEventOperation.hpp>
29 #include <signaldata/SumaImpl.hpp>
30 #include <NdbRecAttr.hpp>
31 #include <AttributeHeader.hpp>
32 #include <UtilBuffer.hpp>
33 #include <Vector.hpp>
34 #include <NdbMutex.h>
35 #include <NdbTick.h>
36
37 #include "my_pointer_arithmetic.h"
38
39 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
40 //#define EVENT_DEBUG
41 #ifdef EVENT_DEBUG
42 #define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
43 #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
44 #define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
45 #define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
46 #define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
47 #else
48 #define DBUG_ENTER_EVENT(A)
49 #define DBUG_RETURN_EVENT(A) return(A)
50 #define DBUG_VOID_RETURN_EVENT return
51 #define DBUG_PRINT_EVENT(A,B)
52 #define DBUG_DUMP_EVENT(A,B,C)
53 #endif
54
55 #include <ndb_logevent.h>
56 typedef enum ndb_logevent_event_buffer_status_report_reason ReportReason;
57
58 class NdbEventOperationImpl;
59 class EpochData;
60
61 class EventBufData
62 {
63 public:
64 union {
65 SubTableData *sdata;
66 Uint32 *memory;
67 };
68 LinearSectionPtr ptr[3];
69 NdbEventOperationImpl *m_event_op;
70
71 /*
72 * Blobs are stored in blob list (m_next_blob) where each entry
73 * is list of parts (m_next). TODO order by part number
74 *
75 * Data item lists keep track of item count and sum(sz) and
76 * these include both main items and blob parts.
77 */
78
79 EventBufData *m_next; // Next wrt to global order or Next blob part
80 EventBufData *m_next_blob; // First part in next blob
81
82 EventBufData *m_next_hash; // Next in per-GCI hash
83 Uint32 m_pkhash; // PK hash (without op) for fast compare
84
EventBufData()85 EventBufData()
86 : memory(NULL),
87 m_event_op(NULL), m_next(NULL), m_next_blob(NULL)
88 {}
89
90 Uint32 get_count() const;
91 Uint32 get_size() const;
92 Uint64 getGCI() const;
93 };
94
95
96 /**
97 * The MonotonicEpoch class provides a monotonic increasing epoch
98 * identifier - Even across an initial restart which may start a
99 * new sequence of GCIs from 0/0.
100 * Several garbage collection mechanism in the EventBuffer relies
101 * on the monotonicity of the GCI being used as an 'expiry stamp'
102 * for when the object can be permanently deleted.
103 */
104 class MonotonicEpoch
105 {
106 public:
107 static const MonotonicEpoch min;
108 static const MonotonicEpoch max;
109
MonotonicEpoch()110 MonotonicEpoch()
111 : m_seq(0), m_epoch(0) {}
112
MonotonicEpoch(Uint32 seq,Uint64 epoch)113 MonotonicEpoch(Uint32 seq, Uint64 epoch)
114 : m_seq(seq), m_epoch(epoch) {}
115
operator ==(const MonotonicEpoch & other) const116 bool operator == (const MonotonicEpoch& other) const
117 { return m_epoch == other.m_epoch && m_seq == other.m_seq; }
operator !=(const MonotonicEpoch & other) const118 bool operator != (const MonotonicEpoch& other) const
119 { return m_epoch != other.m_epoch || m_seq != other.m_seq; }
operator <(const MonotonicEpoch & other) const120 bool operator < (const MonotonicEpoch& other) const
121 { return m_seq < other.m_seq || (m_seq == other.m_seq && m_epoch < other.m_epoch); }
operator <=(const MonotonicEpoch & other) const122 bool operator <= (const MonotonicEpoch& other) const
123 { return m_seq < other.m_seq || (m_seq == other.m_seq && m_epoch <= other.m_epoch); }
operator >(const MonotonicEpoch & other) const124 bool operator > (const MonotonicEpoch& other) const
125 { return m_seq > other.m_seq || (m_seq == other.m_seq && m_epoch > other.m_epoch); }
operator >=(const MonotonicEpoch & other) const126 bool operator >= (const MonotonicEpoch& other) const
127 { return m_seq > other.m_seq || (m_seq == other.m_seq && m_epoch >= other.m_epoch); }
128
getGCI() const129 Uint64 getGCI() const { return m_epoch; }
130
131 // 'operator <<' is allowed to access privat members
132 friend NdbOut& operator<<(NdbOut& out, const MonotonicEpoch& gci);
133
134 private:
135 Uint32 m_seq;
136 Uint64 m_epoch;
137 };
138
139 /**
140 * All memory allocation for events are done from memory blocks.
141 * Each memory block is tagged with an 'expiry-epoch', which holds
142 * the highest epoch known upto the point where the block got full.
143 *
144 * No freeing of objects allocted from the memory block is required.
145 * Instead we free the entire block when the client has consumed the
146 * last event with an epoch >= the 'expiry-epoch' of the memory block.
147 */
148 class EventMemoryBlock
149 {
150 public:
EventMemoryBlock(Uint32 size)151 EventMemoryBlock(Uint32 size)
152 : m_size(data_size(size))
153 {
154 init();
155 }
156
init()157 void init()
158 {
159 /**
160 * Alloc must start from an aligned memory addr, add padding if required.
161 * Assumes that EventMemoryBlock itself is correctly aligned.
162 */
163 const Uint32 data_offs = my_offsetof(EventMemoryBlock, m_data);
164 const Uint32 pad = ALIGN_SIZE(data_offs) - data_offs;
165 m_used = pad;
166 m_expiry_epoch = MonotonicEpoch::max;
167 m_next = NULL;
168 }
169
destruct()170 void destruct()
171 {
172 #ifndef NDEBUG
173 // Shredd the memory if debugging
174 memset(m_data, 0x11, m_size);
175 m_used = 0;
176 m_expiry_epoch = MonotonicEpoch::min;
177 #endif
178 }
179
180 // Allocate a chunk of memory from this MemoryBlock
alloc(unsigned size)181 void* alloc(unsigned size)
182 {
183 if (unlikely(m_used + size > m_size))
184 return NULL;
185
186 char *mem = m_data + m_used;
187 m_used += ALIGN_SIZE(size); //Keep alignment for next object
188 return (void*)mem;
189 }
190
191 // Get remaining free memory from block
get_free() const192 Uint32 get_free() const
193 {
194 return (m_size - m_used);
195 }
196
197 // Get total usable memory size from block (if empty)
get_size() const198 Uint32 get_size() const
199 {
200 return m_size;
201 }
202
203 // Get total size of block as once allocated
alloced_size() const204 Uint32 alloced_size() const
205 {
206 return m_size + my_offsetof(EventMemoryBlock, m_data);
207 }
208
209 const Uint32 m_size; // Number of bytes available to allocate from m_data
210 Uint32 m_used; // Offset of next free position
211
212 /**
213 * Highest epoch of any object allocated memory from this block.
214 * Entire block expires when all epoch <= expiry_epoch are consumed.
215 */
216 MonotonicEpoch m_expiry_epoch;
217
218 EventMemoryBlock *m_next; // Next memory block
219
220 private:
221 char m_data[1];
222
223 // Calculates usable size of m_data given total size 'full_sz'
data_size(Uint32 full_sz)224 Uint32 data_size(Uint32 full_sz)
225 {
226 return full_sz - my_offsetof(EventMemoryBlock, m_data);
227 }
228 };
229
230
231 // GCI bucket has also a hash over data, with key event op, table PK.
232 // It can only be appended to and is invalid after remove_first().
233 class EventBufData_hash
234 {
235 public:
236 struct Pos { // search result
237 Uint32 index; // index into hash array
238 EventBufData* data; // non-zero if found
239 Uint32 pkhash; // PK hash
240 };
241
242 static Uint32 getpkhash(NdbEventOperationImpl* op, const LinearSectionPtr ptr[3]);
243 static bool getpkequal(NdbEventOperationImpl* op, const LinearSectionPtr ptr1[3], const LinearSectionPtr ptr2[3]);
244
245 void search(Pos& hpos, NdbEventOperationImpl* op, const LinearSectionPtr ptr[3]);
246 void append(Pos& hpos, EventBufData* data);
247
248 enum { GCI_EVENT_HASH_SIZE = 101 };
249 EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
250 };
251
252 inline
append(Pos & hpos,EventBufData * data)253 void EventBufData_hash::append(Pos& hpos, EventBufData* data)
254 {
255 data->m_next_hash = m_hash[hpos.index];
256 m_hash[hpos.index] = data;
257 }
258
259 /**
260 * The Gci_container creates a collection of EventBufData and
261 * the NdbEventOperationImpl receiving an event withing this
262 * specific epoch. Once 'completed', an 'EpochData' is created from
263 * the Gci_container, representing a more static view of the
264 * epoch ready to be consumed by the client.
265 */
266 struct Gci_op //A helper
267 {
268 NdbEventOperationImpl* op;
269 Uint32 event_types;
270 Uint32 cumulative_any_value;// Merged for table/epoch events
271 };
272
273 class Gci_container
274 {
275 public:
Gci_container(NdbEventBuffer * event_buffer)276 Gci_container(NdbEventBuffer* event_buffer)
277 : m_event_buffer(event_buffer),
278 m_state(0),
279 m_gcp_complete_rep_count(0),
280 m_gcp_complete_rep_sub_data_streams(),
281 m_gci(0),
282 m_head(NULL), m_tail(NULL),
283 m_gci_op_list(NULL),
284 m_gci_op_count(0),
285 m_gci_op_alloc(0)
286 {
287 bzero(&m_data_hash, sizeof(m_data_hash));
288 }
289
clear()290 void clear()
291 {
292 assert(m_event_buffer != NULL);
293 m_state = 0;
294 m_gcp_complete_rep_count = 0;
295 m_gcp_complete_rep_sub_data_streams.clear();
296 m_gci = 0;
297 m_head = m_tail = NULL;
298 bzero(&m_data_hash, sizeof(m_data_hash));
299
300 m_gci_op_list = NULL;
301 m_gci_op_count = 0;
302 m_gci_op_alloc = 0;
303 }
304
is_empty() const305 bool is_empty() const
306 { return (m_head == NULL); }
307
308 enum State
309 {
310 GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
311 ,GC_INCONSISTENT = 0x2 // GCI might be missing event data
312 ,GC_CHANGE_CNT = 0x4 // Change m_total_buckets
313 ,GC_OUT_OF_MEMORY = 0x8 // Not enough event buffer memory to buffer data
314 };
315
316 NdbEventBuffer *const m_event_buffer; //Owner
317
318 Uint16 m_state;
319 Uint16 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
320 Bitmask<(MAX_SUB_DATA_STREAMS+31)/32> m_gcp_complete_rep_sub_data_streams;
321 Uint64 m_gci; // GCI
322
323 EventBufData *m_head, *m_tail;
324 EventBufData_hash m_data_hash;
325
326 Gci_op *m_gci_op_list;
327 Uint32 m_gci_op_count; //Current size of gci_op_list[]
328 Uint32 m_gci_op_alloc; //Items allocated in gci_op_list[]
329
hasError() const330 bool hasError() const
331 { return (m_state & (GC_OUT_OF_MEMORY | GC_INCONSISTENT)); }
332
333 // get number of EventBufData in this Gci_container (For debug)
334 Uint32 count_event_data() const;
335
336 // add Gci_op to container for this Gci
337 void add_gci_op(Gci_op g);
338
339 // append data and insert data into Gci_op list with add_gci_op
340 void append_data(EventBufData *data);
341
342 // Create an EpochData containing the Gci_op and event data added above.
343 // This effectively 'completes' the epoch represented by this Gci_container
344 EpochData* createEpochData(Uint64 gci);
345 };
346
347 struct Gci_container_pod
348 {
349 char data[sizeof(Gci_container)];
350 };
351
352
353 /**
354 * An EpochData is created from a Gci_container when it contains a complete
355 * epoch. It contains all EventBufData received within this epoch, and
356 * a list of all NdbEventOperationImpl which received an event.
357 * (Except exceptional events)
358 *
359 * m_error shows the error identified when receiveing an epoch:
360 * a buffer overflow at the sender (ndb suma) or receiver (event buffer).
361 * This error information is a duplicate, same info is available in
362 * the dummy EventBufData. The reason to store the duplicate is to remove
363 * the need to search the EventBufData by isConsistent(Uint64 &) to find
364 * whether an inconsistency has occurred in the epoch stream.
365 * This method is kept for backward compatibility.
366 */
367 class EpochData
368 {
369 public:
EpochData(MonotonicEpoch gci,Gci_op * gci_op_list,Uint32 count,EventBufData * data)370 EpochData(MonotonicEpoch gci,
371 Gci_op *gci_op_list, Uint32 count,
372 EventBufData *data)
373 : m_gci(gci),
374 m_error(0),
375 m_gci_op_count(count),
376 m_gci_op_list(gci_op_list),
377 m_data(data),
378 m_next(NULL)
379 {}
~EpochData()380 ~EpochData() {}
381
382 // get number of EventBufData in EpochDataList (For debug)
383 Uint32 count_event_data() const;
384
385 const MonotonicEpoch m_gci;
386 Uint32 m_error;
387 Uint32 const m_gci_op_count;
388 Gci_op* const m_gci_op_list; //All event_op receiving an event
389 EventBufData* m_data; //All event data within epoch
390 EpochData *m_next; //Next completed epoch
391 };
392
393
394 /**
395 * A list of EpochData in increasing GCI order is prepared for the
396 * client to consume. Actually it is a 'list of lists'.
397 * - The EpochDataList presents a list of epoch which has completed.
398 * - Within each epoch the client can navigate the EventBufData
399 * valid for this specific Epoch.
400 */
401 class EpochDataList
402 {
403 public:
EpochDataList()404 EpochDataList()
405 : m_head(NULL), m_tail(NULL) {}
406
407 // Gci list is cleared to an empty state.
clear()408 void clear()
409 { m_head = m_tail = NULL; }
410
is_empty() const411 bool is_empty() const
412 { return (m_head == NULL); }
413
414 // append EpochData to list
append(EpochData * epoch)415 void append(EpochData *epoch)
416 {
417 if (m_tail)
418 m_tail->m_next = epoch;
419 else
420 {
421 assert(m_head == NULL);
422 m_head = epoch;
423 }
424 m_tail = epoch;
425 }
426
427 // append list to another
append_list(EpochDataList * list)428 void append_list(EpochDataList *list)
429 {
430 if (m_tail)
431 m_tail->m_next= list->m_head;
432 else
433 m_head= list->m_head;
434 m_tail= list->m_tail;
435
436 list->m_head = list->m_tail = NULL;
437 }
438
first_epoch() const439 EpochData *first_epoch() const
440 { return m_head;}
441
442 // advance list head to next EpochData
next_epoch()443 EpochData *next_epoch()
444 {
445 m_head = m_head->m_next;
446 if (m_head == NULL)
447 m_tail = NULL;
448
449 return m_head;
450 }
451
452 // find first event data to be delivered.
get_first_event_data() const453 EventBufData *get_first_event_data() const
454 {
455 EpochData *epoch = m_head;
456 while (epoch != NULL)
457 {
458 if (epoch->m_data != NULL)
459 return epoch->m_data;
460 epoch = epoch->m_next;
461 }
462 return NULL;
463 }
464
465 // get and consume first EventData
consume_first_event_data()466 EventBufData *consume_first_event_data()
467 {
468 EpochData *epoch = m_head;
469 if (epoch != NULL)
470 {
471 EventBufData *data = epoch->m_data;
472 if (data != NULL)
473 m_head->m_data = data->m_next;
474
475 return data;
476 }
477 return NULL;
478 }
479
480 // get number of EventBufData in EpochDataList (For debug)
481 Uint32 count_event_data() const;
482
483 //private:
484 EpochData *m_head, *m_tail;
485 };
486
487
488 class NdbEventOperationImpl : public NdbEventOperation {
489 public:
490 NdbEventOperationImpl(NdbEventOperation &f,
491 Ndb *theNdb,
492 const char* eventName);
493 NdbEventOperationImpl(Ndb *theNdb,
494 NdbEventImpl& evnt);
495 void init(NdbEventImpl& evnt);
496 NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
497 NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
498 ~NdbEventOperationImpl();
499
500 NdbEventOperation::State getState();
501
502 int execute();
503 int execute_nolock();
504 int stop();
505 NdbRecAttr *getValue(const char *colName, char *aValue, int n);
506 NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
507 NdbBlob *getBlobHandle(const char *colName, int n);
508 NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
509 Uint32 get_blob_part_no(bool hasDist);
510 int readBlobParts(char* buf, NdbBlob* blob,
511 Uint32 part, Uint32 count, Uint16* lenLoc);
512 int receive_event();
513 bool tableNameChanged() const;
514 bool tableFrmChanged() const;
515 bool tableFragmentationChanged() const;
516 bool tableRangeListChanged() const;
517 Uint64 getGCI() const;
518 Uint32 getAnyValue() const;
519 bool isErrorEpoch(NdbDictionary::Event::TableEvent *error_type);
520 bool isEmptyEpoch();
521 Uint64 getLatestGCI();
522 Uint64 getTransId() const;
523 bool execSUB_TABLE_DATA(const NdbApiSignal * signal,
524 const LinearSectionPtr ptr[3]);
525
526 NdbDictionary::Event::TableEvent getEventType2();
527
528 void print();
529 void printAll();
530
531 NdbEventOperation *m_facade;
532 Uint32 m_magic_number;
533
534 const NdbError & getNdbError() const;
535 NdbError m_error;
536
537 Ndb *m_ndb;
538 NdbEventImpl *m_eventImpl;
539
540 NdbRecAttr *theFirstPkAttrs[2];
541 NdbRecAttr *theCurrentPkAttrs[2];
542 NdbRecAttr *theFirstDataAttrs[2];
543 NdbRecAttr *theCurrentDataAttrs[2];
544
545 NdbBlob* theBlobList;
546 NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
547 NdbEventOperationImpl* theMainOp; // in blob op, the main op
548 int theBlobVersion; // in blob op, NDB_BLOB_V1 or NDB_BLOB_V2
549
550 NdbEventOperation::State m_state; /* note connection to mi_type */
551 Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
552 * else same as in EventImpl
553 */
554 Uint32 m_eventId;
555 Uint32 m_oid;
556
557 /*
558 when parsed gci > m_stop_gci it is safe to drop operation
559 as kernel will not have any more references
560 */
561 MonotonicEpoch m_stop_gci;
562
563 /*
564 m_ref_count keeps track of outstanding references to an event
565 operation impl object. To make sure that the object is not
566 deleted too early.
567
568 If on dropEventOperation there are still references to an
569 object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
570
571 the following references exists for a _non_ blob event op:
572 * user reference
573 - add - NdbEventBuffer::createEventOperation
574 - remove - NdbEventBuffer::dropEventOperation
575 * kernel reference
576 - add - execute_nolock
577 - remove - TE_STOP, TE_CLUSTER_FAILURE
578 * blob reference
579 - add - execute_nolock on blob event
580 - remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
581 * gci reference
582 - add - insertDataL/add_gci_op
583 - remove - NdbEventBuffer::deleteUsedEventOperations
584
585 the following references exists for a blob event op:
586 * kernel reference
587 - add - execute_nolock
588 - remove - TE_STOP, TE_CLUSTER_FAILURE
589 */
590
591 int m_ref_count;
592 bool m_mergeEvents;
593
594 EventBufData *m_data_item;
595
596 void *m_custom_data;
597 int m_has_error;
598
599 Uint32 m_fragmentId;
600 UtilBuffer m_buffer;
601
602 // Bit mask for what has changed in a table (for TE_ALTER event)
603 Uint32 m_change_mask;
604
605 #ifdef VM_TRACE
606 Uint32 m_data_done_count;
607 Uint32 m_data_count;
608 #endif
609
610 // managed by the ndb object
611 NdbEventOperationImpl *m_next;
612 NdbEventOperationImpl *m_prev;
613
614 // Used for allowing empty updates be passed to the user
615 bool m_allow_empty_update;
616
617 private:
618 void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
619 };
620
621
622 class EventBufferManager {
623 public:
624 EventBufferManager(const Ndb* const m_ndb);
~EventBufferManager()625 ~EventBufferManager() {}
626
627 private:
628
629 const Ndb* const m_ndb;
630 /* Last epoch that will be buffered completely before
631 * the beginning of a gap.
632 */
633 Uint64 m_pre_gap_epoch;
634
635 /* The epoch where the gap begins. The received event data for this epoch
636 * will be thrown. Gcp-completion of this epoch will add a dummy event
637 * data and a dummy gci-ops list denoting the problem causing the gap.
638 */
639 Uint64 m_begin_gap_epoch;
640
641 /* This is the last epoch that will NOT be buffered during the gap period.
642 * From the next epoch (post-gap epoch), all event data will be
643 * completely buffered.
644 */
645 Uint64 m_end_gap_epoch;
646
647 // Epochs newer than this will be discarded when event buffer
648 // is used up.
649 Uint64 m_max_buffered_epoch;
650
651 /* Since no buffering will take place during a gap, m_max_buffered_epoch
652 * will not be updated. Therefore, use m_max_received_epoch to
653 * find end_gap_epoch when memory becomes available again.
654 * Epochs newer than this will be buffered.
655 */
656 Uint64 m_max_received_epoch;
657
658 /* After the max_alloc limit is hit, the % of event buffer memory
659 * that should be available before resuming buffering:
660 * min 1, max 99, default 20.
661 */
662 unsigned m_free_percent;
663
664 enum {
665 EBM_COMPLETELY_BUFFERING,
666 EBM_PARTIALLY_DISCARDING,
667 EBM_COMPLETELY_DISCARDING,
668 EBM_PARTIALLY_BUFFERING
669 } m_event_buffer_manager_state;
670
671 /**
672 * Event buffer manager has 4 states :
673 * COMPLETELY_BUFFERING :
674 * all received event data are buffered.
675 * Entry condition:
676 * m_pre_gap_epoch = 0 && m_begin_gap_epoch = 0 && m_end_gap_epoch = 0.
677 *
678 * PARTIALLY_DISCARDING :
679 * event data upto epochs m_pre_gap_epoch are buffered,
680 * others are discarded.
681 * Entry condition:
682 * m_pre_gap_epoch > 0 && m_begin_gap = 0 && m_end_gap_epoch = 0.
683 *
684 * COMPLETELY_DISCARDING :
685 * all received epoch event data are discarded.
686 * Entry condition:
687 * m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 && m_end_gap_epoch = 0.
688 *
689 * PARTIALLY_BUFFERING :
690 * all received event data <= m_end_gap are discarded, others are buffered.
691 * Entry condition:
692 * m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 && m_end_gap_epoch > 0.
693 *
694 * Transitions :
695 * COMPLETELY_BUFFERING -> PARTIALLY_DISCARDING :
696 * memory is completely used up at the reception of SUB_TABLE_DATA,
697 * Action: m_pre_gap_epoch is set with m_max_buffered_epoch.
698 * ==> An incoming new epoch, which is larger than the
699 * m_max_buffered_epoch can NOT be an m_pre_gap_epoch.
700 *
701 * PARTIALLY_DISCARDING -> COMPLETELY_DISCARDING :
702 * epoch next to m_pre_gap_epoch, has gcp-completed,
703 * Action: set m_begin_gap_epoch with the gcp_completing epoch
704 * (marking the beginning of a gap).
705 * The reason to have an m_begin_gap in addition to m_pre_gap is:
706 * The gci of the epoch next to m_pre_gap is needed for creating the
707 * exceptional epoch. We reuse the code in complete_bucket that will
708 * create the exceptional epoch. Complete_bucket is called only when
709 * an epoch is gcp-completing.
710 *
711 * COMPLETELY_DISCARDING -> PARTIALLY_BUFFERING :
712 * m_free_percent of the event buffer becomes available at the
713 * reception of SUB_TABLE_DATA.
714 * Action : set m_end_gap_epoch with max_received_epoch
715 * (cannot use m_max_buffered_epoch since it has not been updated
716 * since PARTIALLY_DISCARDING).
717 *
718 * PARTIALLY_BUFFERING -> COMPLETELY_BUFFERING :
719 * epoch next to m_end_gap_epoch (post-gap epoch) has buffered
720 * completely and gcp_completed.
721 * Action : reset m_pre_gap_epoch, m_begin_gap_epoch and m_end_gap_epoch.
722 */
723
724 bool isCompletelyBuffering();
725 bool isPartiallyDiscarding();
726 bool isCompletelyDiscarding();
727 bool isPartiallyBuffering();
728 bool isInDiscardingState();
729
730 public:
731 unsigned get_eventbuffer_free_percent();
732 void set_eventbuffer_free_percent(unsigned free);
733
734 void onBufferingEpoch(Uint64 received_epoch); // update m_max_buffered_epoch
735
736 /* Execute the state machine by checking the buffer manager state
737 * and performing the correct transition according to buffer availability:
738 * Returned value indicates whether reportStatus() is necessary.
739 * Transitions CB -> PD and CD -> PB and updating m_max_received epoc
740 * are performed here.
741 */
742 ReportReason onEventDataReceived(Uint32 memory_usage_percent, Uint64 received_epoch);
743
744 // Check whether the received event data can be discarded.
745 // Discard-criteria : m_pre_gap_epoch < received_epoch <= m_end_gap_epoch.
746 bool isEventDataToBeDiscarded(Uint64 received_epoch);
747
748 /* Execute the state machine by checking the buffer manager state
749 * and performing the correct transition according to gcp_completion.
750 * Transitions PD -> CD and PB -> CB are performed here.
751 * Check whether the received gcp_completing epoch can mark the beginning
752 * of a gap (qualifies as m_begin_gap_epoch) or
753 * the gap is ended and the transition to COMPLETE_BUFFERING can be performed.
754 * The former case sets gap_begins to true.
755 */
756 ReportReason onEpochCompleted(Uint64 completed_epoch, bool& gap_begins);
757
758 // Check whether the received SUB_GCP_COMPLETE can be discarded
759 // Discard-criteria: m_begin_gap_epoch < completed_epoch <= m_end_gap_epoch
760 bool isGcpCompleteToBeDiscarded(Uint64 completed_epoch);
761 };
762
763 class NdbEventBuffer {
764 public:
765 NdbEventBuffer(Ndb*);
766 ~NdbEventBuffer();
767
768 Uint32 m_total_buckets;
769 Uint16 m_min_gci_index;
770 Uint16 m_max_gci_index;
771 Vector<Uint64> m_known_gci;
772 Vector<Gci_container_pod> m_active_gci;
773 STATIC_CONST( ACTIVE_GCI_DIRECTORY_SIZE = 4 );
774 STATIC_CONST( ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1 );
775
776 NdbEventOperation *createEventOperation(const char* eventName,
777 NdbError &);
778 NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
779 NdbError &);
780 void dropEventOperation(NdbEventOperation *);
781 static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
782
add_drop_lock()783 void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
add_drop_unlock()784 void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
lock()785 void lock() { NdbMutex_Lock(m_mutex); }
trylock()786 bool trylock() { return NdbMutex_Trylock(m_mutex) == 0; }
unlock()787 void unlock() { NdbMutex_Unlock(m_mutex); }
788
789 void add_op();
790 void remove_op();
791 void init_gci_containers();
792
793 // accessed from the "receive thread"
794 int insertDataL(NdbEventOperationImpl *op,
795 const SubTableData * const sdata, Uint32 len,
796 const LinearSectionPtr ptr[3]);
797 void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const, Uint32 len,
798 int complete_cluster_failure= 0);
799 void execSUB_START_CONF(const SubStartConf * const, Uint32 len);
800 void execSUB_STOP_CONF(const SubStopConf * const, Uint32 len);
801 void execSUB_STOP_REF(const SubStopRef * const, Uint32 len);
802
803 void complete_outof_order_gcis();
804
805 void report_node_failure_completed(Uint32 node_id);
806
807 // used by user thread
808 Uint64 getLatestGCI();
809 Uint32 getEventId(int bufferId);
810 Uint64 getHighestQueuedEpoch();
811 void setEventBufferQueueEmptyEpoch(bool queue_empty_epoch);
812
813 int pollEvents(Uint64 *HighestQueuedEpoch= NULL);
814 int flushIncompleteEvents(Uint64 gci);
815
816 void remove_consumed_memory(MonotonicEpoch consumedGci);
817 void remove_consumed_epoch_data(MonotonicEpoch consumedGci);
818
819 /* Remove all resources related to specified epoch
820 * after it has been completely consumed.
821 */
822 void remove_consumed(MonotonicEpoch consumedGci);
823
824 // Count the buffered epochs (in event queue and completed list).
825 Uint32 count_buffered_epochs() const;
826
827 /* Consume and discard all completed events.
828 * Memory related to discarded events are released.
829 */
830 void consume_all();
831
832 // Check if event data belongs to an exceptional epoch, such as,
833 // an inconsistent, out-of-memory or empty epoch.
834 bool is_exceptional_epoch(EventBufData *data);
835
836 // Consume current EventData and dequeue next for consumption
837 EventBufData *nextEventData();
838
839 // Dequeue event data from event queue and give it for consumption.
840 NdbEventOperation *nextEvent2();
841 bool isConsistent(Uint64& gci);
842 bool isConsistentGCI(Uint64 gci);
843
844 NdbEventOperationImpl* getEpochEventOperations(Uint32* iter,
845 Uint32* event_types,
846 Uint32* cumulative_any_value);
847 void deleteUsedEventOperations(MonotonicEpoch last_consumed_gci);
848
849 EventBufData *move_data();
850
851 // routines to copy/merge events
852 EventBufData* alloc_data();
853 int alloc_mem(EventBufData* data,
854 const LinearSectionPtr ptr[3]);
855 int copy_data(const SubTableData * const sdata, Uint32 len,
856 const LinearSectionPtr ptr[3],
857 EventBufData* data);
858 int merge_data(const SubTableData * const sdata, Uint32 len,
859 const LinearSectionPtr ptr[3],
860 EventBufData* data);
861 int get_main_data(Gci_container* bucket,
862 EventBufData_hash::Pos& hpos,
863 EventBufData* blob_data);
864 void add_blob_data(Gci_container* bucket,
865 EventBufData* main_data,
866 EventBufData* blob_data);
867
868 void *alloc(Uint32 sz);
869 Uint32 get_free_data_sz() const;
870 Uint32 get_used_data_sz() const;
871
872 //Must report status if buffer manager state is changed
873 void reportStatus(ReportReason reason = NO_REPORT);
874
875 //Get event buffer memory usage statistics
876 void get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage& usage);
877
878 // Global Mutex used for some things
879 static NdbMutex *p_add_drop_mutex;
880
881 #ifdef VM_TRACE
882 const char *m_latest_command;
883 Uint64 m_flush_gci;
884 #endif
885
886 Ndb *m_ndb;
887
888 // Gci are monotonic increasing while the cluster is not restarted.
889 // A restart will start a new generation of epochs which also inc:
890 Uint32 m_epoch_generation;
891
892 // "latest gci" variables updated in receiver thread
893 Uint64 m_latestGCI; // latest GCI completed in order
894 Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)
895 Uint64 m_highest_sub_gcp_complete_GCI; // highest gci seen in api
896 // "latest gci" variables updated in user thread
897 MonotonicEpoch m_latest_poll_GCI; // latest gci handed over to user thread
898 Uint64 m_latest_consumed_epoch; // latest epoch consumed by user thread
899
900 /**
901 * m_buffered_epochs = #completed epochs - #completely consumed epochs.
902 * Updated in receiver thread when an epoch completes.
903 * User thread updates it when an epoch is completely consumed.
904 * Owned by receiver thread and user thread update needs mutex.
905 */
906 Uint32 m_buffered_epochs;
907
908 bool m_failure_detected; // marker that event operations have failure events
909
910 bool m_startup_hack;
911 bool m_prevent_nodegroup_change;
912
913 NdbMutex *m_mutex;
914
915 // receive thread
916 EpochDataList m_complete_data;
917
918 // user thread
919 EpochDataList m_event_queue;
920 const EventBufData *m_current_data;
921
922 unsigned m_total_alloc; // total allocated memory
923
924 // ceiling for total allocated memory, 0 means unlimited
925 unsigned m_max_alloc;
926
927 // Crash when OS memory allocation for event buffer fails
928 void crashMemAllocError(const char *error_text);
929
930 EventBufferManager m_event_buffer_manager; // managing buffer memory usage
931
932 unsigned get_eventbuffer_free_percent();
933 void set_eventbuffer_free_percent(unsigned free);
934
935 // threshholds to report status
936 unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
937 unsigned m_gci_slip_thresh;
938 NDB_TICKS m_last_log_time; // Limit frequency of event buffer status reports
939
940 NdbError m_error;
941
942 private:
943 void insert_event(NdbEventOperationImpl* impl,
944 SubTableData &data,
945 const LinearSectionPtr *ptr,
946 Uint32 &oid_ref);
947
948 EventMemoryBlock* expand_memory_blocks();
949 void complete_memory_block(MonotonicEpoch highest_epoch);
950
951 /*
952 List of Memory blocks in use in increasing 'epoch-expiry' order.
953 Thus, allocation is always from 'tail' and we release
954 expired blocks from 'head.
955 */
956 EventMemoryBlock *m_mem_block_head;
957 EventMemoryBlock *m_mem_block_tail;
958
959 /*
960 List of free memory blocks available for recycle and its size
961 (Included in ::get_free_data_sz())
962 */
963 EventMemoryBlock *m_mem_block_free;
964 Uint32 m_mem_block_free_sz; //Total size of above
965
966 bool m_queue_empty_epoch;
967
968 /*
969 dropped event operations (dropEventOperation) that have not yet
970 been deleted because of outstanding m_ref_count
971
972 check for delete is done on occations when the ref_count may have
973 changed by calling deleteUsedEventOperations:
974 - nextEvent - each time the user has completed processing a gci
975 */
976 NdbEventOperationImpl *m_dropped_ev_op;
977
978 Uint32 m_active_op_count;
979 NdbMutex *m_add_drop_mutex;
980
find_bucket(Uint64 gci)981 inline Gci_container* find_bucket(Uint64 gci){
982 Uint32 pos = (Uint32)(gci & ACTIVE_GCI_MASK);
983 Gci_container *bucket= ((Gci_container*)(m_active_gci.getBase())) + pos;
984 if(likely(gci == bucket->m_gci))
985 return bucket;
986
987 return find_bucket_chained(gci);
988 }
989
990 #ifdef VM_TRACE
991 void verify_known_gci(bool allowempty);
992 #endif
993 Gci_container* find_bucket_chained(Uint64 gci);
994 void complete_bucket(Gci_container*);
995 bool find_max_known_gci(Uint64 * res) const;
996 void resize_known_gci();
997
998 Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_alive_node_bit_mask;
999 Uint16 m_sub_data_streams[MAX_SUB_DATA_STREAMS];
1000
1001 void handle_change_nodegroup(const SubGcpCompleteRep*);
1002
1003 Uint16 find_sub_data_stream_number(Uint16 sub_data_stream);
1004 void crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
1005 const SubGcpCompleteRep * const rep,
1006 Uint32 replen,
1007 Uint32 remcnt,
1008 Uint32 repcnt) const;
1009 public:
1010 // Create an epoch with only a exceptional event and an empty gci_op list.
1011 EpochData* create_empty_exceptional_epoch(Uint64 gci, Uint32 type);
1012
1013 void set_total_buckets(Uint32);
1014 };
1015
1016 inline
1017 NdbEventOperationImpl*
getEventOperationImpl(NdbEventOperation * tOp)1018 NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
1019 {
1020 return &tOp->m_impl;
1021 }
1022
1023 inline void
receive_data(NdbRecAttr * r,const Uint32 * data,Uint32 sz)1024 NdbEventOperationImpl::receive_data(NdbRecAttr *r,
1025 const Uint32 *data,
1026 Uint32 sz)
1027 {
1028 r->receive_data(data,sz);
1029 #if 0
1030 if (sz)
1031 {
1032 assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
1033 r->theNULLind= 0;
1034 memcpy(r->aRef(), data, 4 * sz);
1035 return;
1036 }
1037 r->theNULLind= 1;
1038 #endif
1039 }
1040
1041 inline bool
isCompletelyBuffering()1042 EventBufferManager::isCompletelyBuffering()
1043 {
1044 if (m_event_buffer_manager_state == EBM_COMPLETELY_BUFFERING)
1045 {
1046 assert(m_pre_gap_epoch == 0 && m_begin_gap_epoch == 0 &&
1047 m_end_gap_epoch == 0);
1048 return true;
1049 }
1050 return false;
1051 }
1052
1053 inline bool
isPartiallyDiscarding()1054 EventBufferManager::isPartiallyDiscarding()
1055 {
1056 if (m_event_buffer_manager_state == EBM_PARTIALLY_DISCARDING)
1057 {
1058 assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch == 0 &&
1059 m_end_gap_epoch == 0);
1060 return true;
1061 }
1062 return false;
1063 }
1064
1065 inline bool
isCompletelyDiscarding()1066 EventBufferManager::isCompletelyDiscarding()
1067 {
1068 if (m_event_buffer_manager_state == EBM_COMPLETELY_DISCARDING)
1069 {
1070 assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 &&
1071 m_end_gap_epoch == 0);
1072 return true;
1073 }
1074 return false;
1075 }
1076
1077 inline bool
isPartiallyBuffering()1078 EventBufferManager::isPartiallyBuffering()
1079 {
1080 if (m_event_buffer_manager_state == EBM_PARTIALLY_BUFFERING)
1081 {
1082 assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 &&
1083 m_end_gap_epoch > 0);
1084 return true;
1085 }
1086 return false;
1087 }
1088
1089 inline bool
isInDiscardingState()1090 EventBufferManager::isInDiscardingState()
1091 {
1092 return (m_event_buffer_manager_state != EBM_COMPLETELY_BUFFERING);
1093 }
1094
1095 #endif
1096