1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef NdbEventOperationImpl_H
26 #define NdbEventOperationImpl_H
27 
28 #include <NdbEventOperation.hpp>
29 #include <signaldata/SumaImpl.hpp>
30 #include <NdbRecAttr.hpp>
31 #include <AttributeHeader.hpp>
32 #include <UtilBuffer.hpp>
33 #include <Vector.hpp>
34 #include <NdbMutex.h>
35 #include <NdbTick.h>
36 
37 #include "my_pointer_arithmetic.h"
38 
39 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
40 //#define EVENT_DEBUG
41 #ifdef EVENT_DEBUG
42 #define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
43 #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
44 #define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
45 #define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
46 #define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
47 #else
48 #define DBUG_ENTER_EVENT(A)
49 #define DBUG_RETURN_EVENT(A) return(A)
50 #define DBUG_VOID_RETURN_EVENT return
51 #define DBUG_PRINT_EVENT(A,B)
52 #define DBUG_DUMP_EVENT(A,B,C)
53 #endif
54 
55 #include <ndb_logevent.h>
56 typedef enum ndb_logevent_event_buffer_status_report_reason ReportReason;
57 
58 class NdbEventOperationImpl;
59 class EpochData;
60 
61 class EventBufData
62 {
63 public:
64   union {
65     SubTableData *sdata;
66     Uint32 *memory;
67   };
68   LinearSectionPtr ptr[3];
69   NdbEventOperationImpl *m_event_op;
70 
71   /*
72    * Blobs are stored in blob list (m_next_blob) where each entry
73    * is list of parts (m_next).  TODO order by part number
74    *
75    * Data item lists keep track of item count and sum(sz) and
76    * these include both main items and blob parts.
77    */
78 
79   EventBufData *m_next; // Next wrt to global order or Next blob part
80   EventBufData *m_next_blob; // First part in next blob
81 
82   EventBufData *m_next_hash; // Next in per-GCI hash
83   Uint32 m_pkhash; // PK hash (without op) for fast compare
84 
EventBufData()85   EventBufData()
86     : memory(NULL),
87       m_event_op(NULL), m_next(NULL), m_next_blob(NULL)
88   {}
89 
90   Uint32 get_count() const;
91   Uint32 get_size() const;
92   Uint64 getGCI() const;
93 };
94 
95 
96 /**
97  * The MonotonicEpoch class provides a monotonic increasing epoch
98  * identifier - Even across an initial restart which may start a
99  * new sequence of GCIs from 0/0.
100  * Several garbage collection mechanism in the EventBuffer relies
101  * on the monotonicity of the GCI being used as an 'expiry stamp'
102  * for when the object can be permanently deleted.
103  */
104 class MonotonicEpoch
105 {
106 public:
107   static const MonotonicEpoch min;
108   static const MonotonicEpoch max;
109 
MonotonicEpoch()110   MonotonicEpoch()
111     : m_seq(0), m_epoch(0) {}
112 
MonotonicEpoch(Uint32 seq,Uint64 epoch)113   MonotonicEpoch(Uint32 seq, Uint64 epoch)
114     : m_seq(seq), m_epoch(epoch) {}
115 
operator ==(const MonotonicEpoch & other) const116   bool operator == (const MonotonicEpoch& other) const
117   { return m_epoch == other.m_epoch && m_seq == other.m_seq; }
operator !=(const MonotonicEpoch & other) const118   bool operator != (const MonotonicEpoch& other) const
119   { return m_epoch != other.m_epoch || m_seq != other.m_seq; }
operator <(const MonotonicEpoch & other) const120   bool operator <  (const MonotonicEpoch& other) const
121   { return m_seq < other.m_seq || (m_seq == other.m_seq && m_epoch < other.m_epoch); }
operator <=(const MonotonicEpoch & other) const122   bool operator <= (const MonotonicEpoch& other) const
123   { return m_seq < other.m_seq || (m_seq == other.m_seq && m_epoch <= other.m_epoch); }
operator >(const MonotonicEpoch & other) const124   bool operator >  (const MonotonicEpoch& other) const
125   { return m_seq > other.m_seq || (m_seq == other.m_seq && m_epoch > other.m_epoch); }
operator >=(const MonotonicEpoch & other) const126   bool operator >= (const MonotonicEpoch& other) const
127   { return m_seq > other.m_seq || (m_seq == other.m_seq && m_epoch >= other.m_epoch); }
128 
getGCI() const129   Uint64 getGCI() const { return m_epoch; }
130 
131   // 'operator <<' is allowed to access privat members
132   friend NdbOut& operator<<(NdbOut& out, const MonotonicEpoch& gci);
133 
134 private:
135   Uint32  m_seq;
136   Uint64  m_epoch;
137 };
138 
139 /**
140  * All memory allocation for events are done from memory blocks.
141  * Each memory block is tagged with an 'expiry-epoch', which holds
142  * the highest epoch known upto the point where the block got full.
143  *
144  * No freeing of objects allocted from the memory block is required.
145  * Instead we free the entire block when the client has consumed the
146  * last event with an epoch >= the 'expiry-epoch' of the memory block.
147  */
148 class EventMemoryBlock
149 {
150 public:
EventMemoryBlock(Uint32 size)151   EventMemoryBlock(Uint32 size)
152     : m_size(data_size(size))
153   {
154     init();
155   }
156 
init()157   void init()
158   {
159     /**
160      * Alloc must start from an aligned memory addr, add padding if required.
161      * Assumes that EventMemoryBlock itself is correctly aligned.
162      */
163     const Uint32 data_offs =  my_offsetof(EventMemoryBlock, m_data);
164     const Uint32 pad = ALIGN_SIZE(data_offs) - data_offs;
165     m_used = pad;
166     m_expiry_epoch = MonotonicEpoch::max;
167     m_next = NULL;
168   }
169 
destruct()170   void destruct()
171   {
172 #ifndef NDEBUG
173     // Shredd the memory if debugging
174     memset(m_data, 0x11, m_size);
175     m_used = 0;
176     m_expiry_epoch = MonotonicEpoch::min;
177 #endif
178   }
179 
180   // Allocate a chunk of memory from this MemoryBlock
alloc(unsigned size)181   void* alloc(unsigned size)
182   {
183     if (unlikely(m_used + size > m_size))
184       return NULL;
185 
186     char *mem = m_data + m_used;
187     m_used += ALIGN_SIZE(size);  //Keep alignment for next object
188     return (void*)mem;
189   }
190 
191   // Get remaining free memory from block
get_free() const192   Uint32 get_free() const
193   {
194     return (m_size - m_used);
195   }
196 
197   // Get total usable memory size from block (if empty)
get_size() const198   Uint32 get_size() const
199   {
200     return m_size;
201   }
202 
203   // Get total size of block as once allocated
alloced_size() const204   Uint32 alloced_size() const
205   {
206     return m_size + my_offsetof(EventMemoryBlock, m_data);
207   }
208 
209   const Uint32      m_size;   // Number of bytes available to allocate from m_data
210   Uint32            m_used;   // Offset of next free position
211 
212   /**
213    * Highest epoch of any object allocated memory from this block.
214    * Entire block expires when all epoch <= expiry_epoch are consumed.
215    */
216   MonotonicEpoch m_expiry_epoch;
217 
218   EventMemoryBlock *m_next;   // Next memory block
219 
220 private:
221   char m_data[1];
222 
223   // Calculates usable size of m_data given total size 'full_sz'
data_size(Uint32 full_sz)224   Uint32 data_size(Uint32 full_sz)
225   {
226     return full_sz - my_offsetof(EventMemoryBlock, m_data);
227   }
228 };
229 
230 
231 // GCI bucket has also a hash over data, with key event op, table PK.
232 // It can only be appended to and is invalid after remove_first().
233 class EventBufData_hash
234 {
235 public:
236   struct Pos { // search result
237     Uint32 index;       // index into hash array
238     EventBufData* data; // non-zero if found
239     Uint32 pkhash;      // PK hash
240   };
241 
242   static Uint32 getpkhash(NdbEventOperationImpl* op, const LinearSectionPtr ptr[3]);
243   static bool getpkequal(NdbEventOperationImpl* op, const LinearSectionPtr ptr1[3], const LinearSectionPtr ptr2[3]);
244 
245   void search(Pos& hpos, NdbEventOperationImpl* op, const LinearSectionPtr ptr[3]);
246   void append(Pos& hpos, EventBufData* data);
247 
248   enum { GCI_EVENT_HASH_SIZE = 101 };
249   EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
250 };
251 
252 inline
append(Pos & hpos,EventBufData * data)253 void EventBufData_hash::append(Pos& hpos, EventBufData* data)
254 {
255   data->m_next_hash = m_hash[hpos.index];
256   m_hash[hpos.index] = data;
257 }
258 
259 /**
260  * The Gci_container creates a collection of EventBufData and
261  * the NdbEventOperationImpl receiving an event withing this
262  * specific epoch. Once 'completed', an 'EpochData' is created from
263  * the Gci_container, representing a more static view of the
264  * epoch ready to be consumed by the client.
265  */
266 struct Gci_op  //A helper
267 {
268   NdbEventOperationImpl* op;
269   Uint32 event_types;
270   Uint32 cumulative_any_value;// Merged for table/epoch events
271 };
272 
273 class Gci_container
274 {
275 public:
Gci_container(NdbEventBuffer * event_buffer)276   Gci_container(NdbEventBuffer* event_buffer)
277   : m_event_buffer(event_buffer),
278     m_state(0),
279     m_gcp_complete_rep_count(0),
280     m_gcp_complete_rep_sub_data_streams(),
281     m_gci(0),
282     m_head(NULL), m_tail(NULL),
283     m_gci_op_list(NULL),
284     m_gci_op_count(0),
285     m_gci_op_alloc(0)
286   {
287     bzero(&m_data_hash, sizeof(m_data_hash));
288   }
289 
clear()290   void clear()
291   {
292     assert(m_event_buffer != NULL);
293     m_state = 0;
294     m_gcp_complete_rep_count = 0;
295     m_gcp_complete_rep_sub_data_streams.clear();
296     m_gci = 0;
297     m_head = m_tail = NULL;
298     bzero(&m_data_hash, sizeof(m_data_hash));
299 
300     m_gci_op_list = NULL;
301     m_gci_op_count = 0;
302     m_gci_op_alloc = 0;
303   }
304 
is_empty() const305   bool is_empty() const
306   { return (m_head == NULL); }
307 
308   enum State
309   {
310     GC_COMPLETE       = 0x1 // GCI is complete, but waiting for out of order
311     ,GC_INCONSISTENT  = 0x2  // GCI might be missing event data
312     ,GC_CHANGE_CNT    = 0x4  // Change m_total_buckets
313     ,GC_OUT_OF_MEMORY = 0x8 // Not enough event buffer memory to buffer data
314   };
315 
316   NdbEventBuffer *const m_event_buffer;  //Owner
317 
318   Uint16 m_state;
319   Uint16 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
320   Bitmask<(MAX_SUB_DATA_STREAMS+31)/32> m_gcp_complete_rep_sub_data_streams;
321   Uint64 m_gci;                    // GCI
322 
323   EventBufData *m_head, *m_tail;
324   EventBufData_hash m_data_hash;
325 
326   Gci_op *m_gci_op_list;
327   Uint32 m_gci_op_count;   //Current size of gci_op_list[]
328   Uint32 m_gci_op_alloc;   //Items allocated in gci_op_list[]
329 
hasError() const330   bool hasError() const
331   { return (m_state & (GC_OUT_OF_MEMORY | GC_INCONSISTENT)); }
332 
333   // get number of EventBufData in this Gci_container (For debug)
334   Uint32 count_event_data() const;
335 
336   // add Gci_op to container for this Gci
337   void add_gci_op(Gci_op g);
338 
339   // append data and insert data into Gci_op list with add_gci_op
340   void append_data(EventBufData *data);
341 
342   // Create an EpochData containing the Gci_op and event data added above.
343   // This effectively 'completes' the epoch represented by this Gci_container
344   EpochData* createEpochData(Uint64 gci);
345 };
346 
347 struct Gci_container_pod
348 {
349   char data[sizeof(Gci_container)];
350 };
351 
352 
353 /**
354  * An EpochData is created from a Gci_container when it contains a complete
355  * epoch. It contains all EventBufData received within this epoch, and
356  * a list of all NdbEventOperationImpl which received an event.
357  * (Except exceptional events)
358  *
359  * m_error shows the error identified when receiveing an epoch:
360  *  a buffer overflow at the sender (ndb suma) or receiver (event buffer).
361  *  This error information is a duplicate, same info is available in
362  *  the dummy EventBufData. The reason to store the duplicate is to remove
363  *  the need to search the EventBufData by isConsistent(Uint64 &) to find
364  *  whether an inconsistency has occurred in the epoch stream.
365  *  This method is kept for backward compatibility.
366  */
367 class EpochData
368 {
369 public:
EpochData(MonotonicEpoch gci,Gci_op * gci_op_list,Uint32 count,EventBufData * data)370   EpochData(MonotonicEpoch gci,
371             Gci_op *gci_op_list, Uint32 count,
372             EventBufData *data)
373     : m_gci(gci),
374       m_error(0),
375       m_gci_op_count(count),
376       m_gci_op_list(gci_op_list),
377       m_data(data),
378       m_next(NULL)
379     {}
~EpochData()380   ~EpochData() {}
381 
382   // get number of EventBufData in EpochDataList (For debug)
383   Uint32 count_event_data() const;
384 
385   const MonotonicEpoch m_gci;
386   Uint32 m_error;
387   Uint32 const m_gci_op_count;
388   Gci_op* const m_gci_op_list;  //All event_op receiving an event
389   EventBufData* m_data;         //All event data within epoch
390   EpochData *m_next;            //Next completed epoch
391 };
392 
393 
394 /**
395  * A list of EpochData in increasing GCI order is prepared for the
396  * client to consume. Actually it is a 'list of lists'.
397  *  - The EpochDataList presents a list of epoch which has completed.
398  *  - Within each epoch the client can navigate the EventBufData
399  *    valid for this specific Epoch.
400  */
401 class EpochDataList
402 {
403 public:
EpochDataList()404   EpochDataList()
405     : m_head(NULL), m_tail(NULL) {}
406 
407   // Gci list is cleared to an empty state.
clear()408   void clear()
409   { m_head = m_tail = NULL; }
410 
is_empty() const411   bool is_empty() const
412   { return (m_head == NULL); }
413 
414   // append EpochData to list
append(EpochData * epoch)415   void append(EpochData *epoch)
416   {
417     if (m_tail)
418       m_tail->m_next = epoch;
419     else
420     {
421       assert(m_head == NULL);
422       m_head = epoch;
423     }
424     m_tail = epoch;
425   }
426 
427   // append list to another
append_list(EpochDataList * list)428   void append_list(EpochDataList *list)
429   {
430     if (m_tail)
431       m_tail->m_next= list->m_head;
432     else
433       m_head= list->m_head;
434     m_tail= list->m_tail;
435 
436     list->m_head = list->m_tail = NULL;
437   }
438 
first_epoch() const439   EpochData *first_epoch() const
440   { return m_head;}
441 
442   // advance list head to next EpochData
next_epoch()443   EpochData *next_epoch()
444   {
445     m_head = m_head->m_next;
446     if (m_head == NULL)
447       m_tail = NULL;
448 
449     return m_head;
450   }
451 
452   // find first event data to be delivered.
get_first_event_data() const453   EventBufData *get_first_event_data() const
454   {
455     EpochData *epoch = m_head;
456     while (epoch != NULL)
457     {
458       if (epoch->m_data != NULL)
459         return epoch->m_data;
460       epoch = epoch->m_next;
461     }
462     return NULL;
463   }
464 
465   // get and consume first EventData
consume_first_event_data()466   EventBufData *consume_first_event_data()
467   {
468     EpochData *epoch = m_head;
469     if (epoch != NULL)
470     {
471       EventBufData *data = epoch->m_data;
472       if (data != NULL)
473         m_head->m_data = data->m_next;
474 
475       return data;
476     }
477     return NULL;
478   }
479 
480   // get number of EventBufData in EpochDataList (For debug)
481   Uint32 count_event_data() const;
482 
483 //private:
484   EpochData *m_head, *m_tail;
485 };
486 
487 
488 class NdbEventOperationImpl : public NdbEventOperation {
489 public:
490   NdbEventOperationImpl(NdbEventOperation &f,
491 			Ndb *theNdb,
492 			const char* eventName);
493   NdbEventOperationImpl(Ndb *theNdb,
494 			NdbEventImpl& evnt);
495   void init(NdbEventImpl& evnt);
496   NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
497   NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
498   ~NdbEventOperationImpl();
499 
500   NdbEventOperation::State getState();
501 
502   int execute();
503   int execute_nolock();
504   int stop();
505   NdbRecAttr *getValue(const char *colName, char *aValue, int n);
506   NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
507   NdbBlob *getBlobHandle(const char *colName, int n);
508   NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
509   Uint32 get_blob_part_no(bool hasDist);
510   int readBlobParts(char* buf, NdbBlob* blob,
511                     Uint32 part, Uint32 count, Uint16* lenLoc);
512   int receive_event();
513   bool tableNameChanged() const;
514   bool tableFrmChanged() const;
515   bool tableFragmentationChanged() const;
516   bool tableRangeListChanged() const;
517   Uint64 getGCI() const;
518   Uint32 getAnyValue() const;
519   bool isErrorEpoch(NdbDictionary::Event::TableEvent *error_type);
520   bool isEmptyEpoch();
521   Uint64 getLatestGCI();
522   Uint64 getTransId() const;
523   bool execSUB_TABLE_DATA(const NdbApiSignal * signal,
524                           const LinearSectionPtr ptr[3]);
525 
526   NdbDictionary::Event::TableEvent getEventType2();
527 
528   void print();
529   void printAll();
530 
531   NdbEventOperation *m_facade;
532   Uint32 m_magic_number;
533 
534   const NdbError & getNdbError() const;
535   NdbError m_error;
536 
537   Ndb *m_ndb;
538   NdbEventImpl *m_eventImpl;
539 
540   NdbRecAttr *theFirstPkAttrs[2];
541   NdbRecAttr *theCurrentPkAttrs[2];
542   NdbRecAttr *theFirstDataAttrs[2];
543   NdbRecAttr *theCurrentDataAttrs[2];
544 
545   NdbBlob* theBlobList;
546   NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
547   NdbEventOperationImpl* theMainOp; // in blob op, the main op
548   int theBlobVersion; // in blob op, NDB_BLOB_V1 or NDB_BLOB_V2
549 
550   NdbEventOperation::State m_state; /* note connection to mi_type */
551   Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
552 		   * else same as in EventImpl
553 		   */
554   Uint32 m_eventId;
555   Uint32 m_oid;
556 
557   /*
558     when parsed gci > m_stop_gci it is safe to drop operation
559     as kernel will not have any more references
560   */
561   MonotonicEpoch m_stop_gci;
562 
563   /*
564     m_ref_count keeps track of outstanding references to an event
565     operation impl object.  To make sure that the object is not
566     deleted too early.
567 
568     If on dropEventOperation there are still references to an
569     object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
570 
571     the following references exists for a _non_ blob event op:
572     * user reference
573     - add    - NdbEventBuffer::createEventOperation
574     - remove - NdbEventBuffer::dropEventOperation
575     * kernel reference
576     - add    - execute_nolock
577     - remove - TE_STOP, TE_CLUSTER_FAILURE
578     * blob reference
579     - add    - execute_nolock on blob event
580     - remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
581     * gci reference
582     - add    - insertDataL/add_gci_op
583     - remove - NdbEventBuffer::deleteUsedEventOperations
584 
585     the following references exists for a blob event op:
586     * kernel reference
587     - add    - execute_nolock
588     - remove - TE_STOP, TE_CLUSTER_FAILURE
589    */
590 
591   int m_ref_count;
592   bool m_mergeEvents;
593 
594   EventBufData *m_data_item;
595 
596   void *m_custom_data;
597   int m_has_error;
598 
599   Uint32 m_fragmentId;
600   UtilBuffer m_buffer;
601 
602   // Bit mask for what has changed in a table (for TE_ALTER event)
603   Uint32 m_change_mask;
604 
605 #ifdef VM_TRACE
606   Uint32 m_data_done_count;
607   Uint32 m_data_count;
608 #endif
609 
610   // managed by the ndb object
611   NdbEventOperationImpl *m_next;
612   NdbEventOperationImpl *m_prev;
613 
614   // Used for allowing empty updates be passed to the user
615   bool m_allow_empty_update;
616 
617 private:
618   void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
619 };
620 
621 
622 class EventBufferManager {
623 public:
624   EventBufferManager(const Ndb* const m_ndb);
~EventBufferManager()625   ~EventBufferManager() {}
626 
627 private:
628 
629   const Ndb* const m_ndb;
630   /* Last epoch that will be buffered completely before
631    * the beginning of a gap.
632    */
633   Uint64 m_pre_gap_epoch;
634 
635   /* The epoch where the gap begins. The received event data for this epoch
636    * will be thrown. Gcp-completion of this epoch will add a dummy event
637    * data and a dummy gci-ops list denoting the problem causing the gap.
638    */
639   Uint64 m_begin_gap_epoch;
640 
641   /* This is the last epoch that will NOT be buffered during the gap period.
642    * From the next epoch (post-gap epoch), all event data will be
643    * completely buffered.
644    */
645   Uint64 m_end_gap_epoch;
646 
647   // Epochs newer than this will be discarded when event buffer
648   // is used up.
649   Uint64 m_max_buffered_epoch;
650 
651   /* Since no buffering will take place during a gap, m_max_buffered_epoch
652    * will not be updated. Therefore, use m_max_received_epoch to
653    * find end_gap_epoch when memory becomes available again.
654    * Epochs newer than this will be buffered.
655    */
656   Uint64 m_max_received_epoch;
657 
658   /* After the max_alloc limit is hit, the % of event buffer memory
659    * that should be available before resuming buffering:
660    * min 1, max 99, default 20.
661    */
662   unsigned m_free_percent;
663 
664   enum {
665     EBM_COMPLETELY_BUFFERING,
666     EBM_PARTIALLY_DISCARDING,
667     EBM_COMPLETELY_DISCARDING,
668     EBM_PARTIALLY_BUFFERING
669   } m_event_buffer_manager_state;
670 
671   /**
672    * Event buffer manager has 4 states :
673    * COMPLETELY_BUFFERING :
674    *  all received event data are buffered.
675    * Entry condition:
676    *  m_pre_gap_epoch = 0 && m_begin_gap_epoch = 0 && m_end_gap_epoch = 0.
677    *
678    * PARTIALLY_DISCARDING :
679    *  event data upto epochs m_pre_gap_epoch are buffered,
680    *  others are discarded.
681    *  Entry condition:
682    *   m_pre_gap_epoch > 0 && m_begin_gap = 0 && m_end_gap_epoch = 0.
683    *
684    * COMPLETELY_DISCARDING :
685    *  all received epoch event data are discarded.
686    *  Entry condition:
687    *   m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 && m_end_gap_epoch = 0.
688    *
689    * PARTIALLY_BUFFERING :
690    *  all received event data <= m_end_gap are discarded, others are buffered.
691    *  Entry condition:
692    *   m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 && m_end_gap_epoch > 0.
693    *
694    * Transitions :
695    * COMPLETELY_BUFFERING -> PARTIALLY_DISCARDING :
696    *  memory is completely used up at the reception of SUB_TABLE_DATA,
697    *  Action: m_pre_gap_epoch is set with m_max_buffered_epoch.
698    *   ==> An incoming new epoch, which is larger than the
699    *       m_max_buffered_epoch can NOT be an m_pre_gap_epoch.
700    *
701    * PARTIALLY_DISCARDING -> COMPLETELY_DISCARDING :
702    *  epoch next to m_pre_gap_epoch, has gcp-completed,
703    * Action: set m_begin_gap_epoch with the gcp_completing epoch
704    * (marking the beginning of a gap).
705    * The reason to have an m_begin_gap in addition to m_pre_gap is:
706    * The gci of the epoch next to m_pre_gap is needed for creating the
707    * exceptional epoch. We reuse the code in complete_bucket that will
708    * create the exceptional epoch. Complete_bucket is called only when
709    * an epoch is gcp-completing.
710    *
711    * COMPLETELY_DISCARDING -> PARTIALLY_BUFFERING :
712    *  m_free_percent of the event buffer  becomes available at the
713    *  reception of SUB_TABLE_DATA.
714    * Action : set m_end_gap_epoch with max_received_epoch
715    * (cannot use m_max_buffered_epoch since it has not been updated
716    * since PARTIALLY_DISCARDING).
717    *
718    * PARTIALLY_BUFFERING -> COMPLETELY_BUFFERING :
719    *  epoch next to m_end_gap_epoch (post-gap epoch) has buffered
720    *  completely and gcp_completed.
721    * Action : reset m_pre_gap_epoch, m_begin_gap_epoch and m_end_gap_epoch.
722    */
723 
724   bool isCompletelyBuffering();
725   bool isPartiallyDiscarding();
726   bool isCompletelyDiscarding();
727   bool isPartiallyBuffering();
728   bool isInDiscardingState();
729 
730 public:
731   unsigned get_eventbuffer_free_percent();
732   void set_eventbuffer_free_percent(unsigned free);
733 
734   void onBufferingEpoch(Uint64 received_epoch); // update m_max_buffered_epoch
735 
736   /* Execute the state machine by checking the buffer manager state
737    * and performing the correct transition according to buffer availability:
738    * Returned value indicates whether reportStatus() is necessary.
739    * Transitions CB -> PD and CD -> PB and updating m_max_received epoc
740    * are performed here.
741    */
742   ReportReason onEventDataReceived(Uint32 memory_usage_percent, Uint64 received_epoch);
743 
744   // Check whether the received event data can be discarded.
745   // Discard-criteria : m_pre_gap_epoch < received_epoch <= m_end_gap_epoch.
746   bool isEventDataToBeDiscarded(Uint64 received_epoch);
747 
748   /* Execute the state machine by checking the buffer manager state
749    * and performing the correct transition according to gcp_completion.
750    * Transitions PD -> CD and PB -> CB are performed here.
751    * Check whether the received gcp_completing epoch can mark the beginning
752    * of a gap (qualifies as m_begin_gap_epoch) or
753    * the gap is ended and the transition to COMPLETE_BUFFERING can be performed.
754    * The former case sets gap_begins to true.
755    */
756   ReportReason onEpochCompleted(Uint64 completed_epoch, bool& gap_begins);
757 
758   // Check whether the received SUB_GCP_COMPLETE can be discarded
759   // Discard-criteria: m_begin_gap_epoch < completed_epoch <= m_end_gap_epoch
760   bool isGcpCompleteToBeDiscarded(Uint64 completed_epoch);
761 };
762 
763 class NdbEventBuffer {
764 public:
765   NdbEventBuffer(Ndb*);
766   ~NdbEventBuffer();
767 
768   Uint32 m_total_buckets;
769   Uint16 m_min_gci_index;
770   Uint16 m_max_gci_index;
771   Vector<Uint64> m_known_gci;
772   Vector<Gci_container_pod> m_active_gci;
773   STATIC_CONST( ACTIVE_GCI_DIRECTORY_SIZE = 4 );
774   STATIC_CONST( ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1 );
775 
776   NdbEventOperation *createEventOperation(const char* eventName,
777 					  NdbError &);
778   NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
779                                                   NdbError &);
780   void dropEventOperation(NdbEventOperation *);
781   static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
782 
add_drop_lock()783   void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
add_drop_unlock()784   void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
lock()785   void lock() { NdbMutex_Lock(m_mutex); }
trylock()786   bool trylock() { return NdbMutex_Trylock(m_mutex) == 0; }
unlock()787   void unlock() { NdbMutex_Unlock(m_mutex); }
788 
789   void add_op();
790   void remove_op();
791   void init_gci_containers();
792 
793   // accessed from the "receive thread"
794   int insertDataL(NdbEventOperationImpl *op,
795 		  const SubTableData * const sdata, Uint32 len,
796                   const LinearSectionPtr ptr[3]);
797   void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const, Uint32 len,
798                                 int complete_cluster_failure= 0);
799   void execSUB_START_CONF(const SubStartConf * const, Uint32 len);
800   void execSUB_STOP_CONF(const SubStopConf * const, Uint32 len);
801   void execSUB_STOP_REF(const SubStopRef * const, Uint32 len);
802 
803   void complete_outof_order_gcis();
804 
805   void report_node_failure_completed(Uint32 node_id);
806 
807   // used by user thread
808   Uint64 getLatestGCI();
809   Uint32 getEventId(int bufferId);
810   Uint64 getHighestQueuedEpoch();
811   void setEventBufferQueueEmptyEpoch(bool queue_empty_epoch);
812 
813   int pollEvents(Uint64 *HighestQueuedEpoch= NULL);
814   int flushIncompleteEvents(Uint64 gci);
815 
816   void remove_consumed_memory(MonotonicEpoch consumedGci);
817   void remove_consumed_epoch_data(MonotonicEpoch consumedGci);
818 
819   /* Remove all resources related to specified epoch
820    * after it has been completely consumed.
821    */
822   void remove_consumed(MonotonicEpoch consumedGci);
823 
824   // Count the buffered epochs (in event queue and completed list).
825   Uint32 count_buffered_epochs() const;
826 
827   /* Consume and discard all completed events.
828    * Memory related to discarded events are released.
829    */
830   void consume_all();
831 
832   // Check if event data belongs to an exceptional epoch, such as,
833   // an inconsistent, out-of-memory or empty epoch.
834   bool is_exceptional_epoch(EventBufData *data);
835 
836   // Consume current EventData and dequeue next for consumption
837   EventBufData *nextEventData();
838 
839   // Dequeue event data from event queue and give it for consumption.
840   NdbEventOperation *nextEvent2();
841   bool isConsistent(Uint64& gci);
842   bool isConsistentGCI(Uint64 gci);
843 
844   NdbEventOperationImpl* getEpochEventOperations(Uint32* iter,
845                                                  Uint32* event_types,
846                                                  Uint32* cumulative_any_value);
847   void deleteUsedEventOperations(MonotonicEpoch last_consumed_gci);
848 
849   EventBufData *move_data();
850 
851   // routines to copy/merge events
852   EventBufData* alloc_data();
853   int alloc_mem(EventBufData* data,
854                 const LinearSectionPtr ptr[3]);
855   int copy_data(const SubTableData * const sdata, Uint32 len,
856                 const LinearSectionPtr ptr[3],
857                 EventBufData* data);
858   int merge_data(const SubTableData * const sdata, Uint32 len,
859                  const LinearSectionPtr ptr[3],
860                  EventBufData* data);
861   int get_main_data(Gci_container* bucket,
862                     EventBufData_hash::Pos& hpos,
863                     EventBufData* blob_data);
864   void add_blob_data(Gci_container* bucket,
865                      EventBufData* main_data,
866                      EventBufData* blob_data);
867 
868   void *alloc(Uint32 sz);
869   Uint32 get_free_data_sz() const;
870   Uint32 get_used_data_sz() const;
871 
872   //Must report status if buffer manager state is changed
873   void reportStatus(ReportReason reason = NO_REPORT);
874 
875   //Get event buffer memory usage statistics
876   void get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage& usage);
877 
878   // Global Mutex used for some things
879   static NdbMutex *p_add_drop_mutex;
880 
881 #ifdef VM_TRACE
882   const char *m_latest_command;
883   Uint64 m_flush_gci;
884 #endif
885 
886   Ndb *m_ndb;
887 
888   // Gci are monotonic increasing while the cluster is not restarted.
889   // A restart will start a new generation of epochs which also inc:
890   Uint32 m_epoch_generation;
891 
892   // "latest gci" variables updated in receiver thread
893   Uint64 m_latestGCI;           // latest GCI completed in order
894   Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)
895   Uint64 m_highest_sub_gcp_complete_GCI; // highest gci seen in api
896   // "latest gci" variables updated in user thread
897   MonotonicEpoch m_latest_poll_GCI; // latest gci handed over to user thread
898   Uint64 m_latest_consumed_epoch; // latest epoch consumed by user thread
899 
900   /**
901    * m_buffered_epochs = #completed epochs - #completely consumed epochs.
902    * Updated in receiver thread when an epoch completes.
903    * User thread updates it when an epoch is completely consumed.
904    * Owned by receiver thread and user thread update needs mutex.
905   */
906   Uint32 m_buffered_epochs;
907 
908   bool m_failure_detected; // marker that event operations have failure events
909 
910   bool m_startup_hack;
911   bool m_prevent_nodegroup_change;
912 
913   NdbMutex *m_mutex;
914 
915   // receive thread
916   EpochDataList m_complete_data;
917 
918   // user thread
919   EpochDataList m_event_queue;
920   const EventBufData *m_current_data;
921 
922   unsigned m_total_alloc; // total allocated memory
923 
924   // ceiling for total allocated memory, 0 means unlimited
925   unsigned m_max_alloc;
926 
927   // Crash when OS memory allocation for event buffer fails
928   void crashMemAllocError(const char *error_text);
929 
930   EventBufferManager m_event_buffer_manager; // managing buffer memory usage
931 
932   unsigned get_eventbuffer_free_percent();
933   void set_eventbuffer_free_percent(unsigned free);
934 
935   // threshholds to report status
936   unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
937   unsigned m_gci_slip_thresh;
938   NDB_TICKS m_last_log_time; // Limit frequency of event buffer status reports
939 
940   NdbError m_error;
941 
942 private:
943   void insert_event(NdbEventOperationImpl* impl,
944                     SubTableData &data,
945                     const LinearSectionPtr *ptr,
946                     Uint32 &oid_ref);
947 
948   EventMemoryBlock* expand_memory_blocks();
949   void complete_memory_block(MonotonicEpoch highest_epoch);
950 
951   /*
952     List of Memory blocks in use in increasing 'epoch-expiry' order.
953     Thus, allocation is always from 'tail' and we release
954     expired blocks from 'head.
955   */
956   EventMemoryBlock *m_mem_block_head;
957   EventMemoryBlock *m_mem_block_tail;
958 
959   /*
960     List of free memory blocks available for recycle and its size
961     (Included in ::get_free_data_sz())
962   */
963   EventMemoryBlock *m_mem_block_free;
964   Uint32 m_mem_block_free_sz; //Total size of above
965 
966   bool m_queue_empty_epoch;
967 
968   /*
969     dropped event operations (dropEventOperation) that have not yet
970     been deleted because of outstanding m_ref_count
971 
972     check for delete is done on occations when the ref_count may have
973     changed by calling deleteUsedEventOperations:
974     - nextEvent - each time the user has completed processing a gci
975   */
976   NdbEventOperationImpl *m_dropped_ev_op;
977 
978   Uint32 m_active_op_count;
979   NdbMutex *m_add_drop_mutex;
980 
find_bucket(Uint64 gci)981   inline Gci_container* find_bucket(Uint64 gci){
982     Uint32 pos = (Uint32)(gci & ACTIVE_GCI_MASK);
983     Gci_container *bucket= ((Gci_container*)(m_active_gci.getBase())) + pos;
984     if(likely(gci == bucket->m_gci))
985       return bucket;
986 
987     return find_bucket_chained(gci);
988   }
989 
990 #ifdef VM_TRACE
991   void verify_known_gci(bool allowempty);
992 #endif
993   Gci_container* find_bucket_chained(Uint64 gci);
994   void complete_bucket(Gci_container*);
995   bool find_max_known_gci(Uint64 * res) const;
996   void resize_known_gci();
997 
998   Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_alive_node_bit_mask;
999   Uint16 m_sub_data_streams[MAX_SUB_DATA_STREAMS];
1000 
1001   void handle_change_nodegroup(const SubGcpCompleteRep*);
1002 
1003   Uint16 find_sub_data_stream_number(Uint16 sub_data_stream);
1004   void crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
1005                                       const SubGcpCompleteRep * const rep,
1006                                       Uint32 replen,
1007                                       Uint32 remcnt,
1008                                       Uint32 repcnt) const;
1009 public:
1010   // Create an epoch with only a exceptional event and an empty gci_op list.
1011   EpochData* create_empty_exceptional_epoch(Uint64 gci, Uint32 type);
1012 
1013   void set_total_buckets(Uint32);
1014 };
1015 
1016 inline
1017 NdbEventOperationImpl*
getEventOperationImpl(NdbEventOperation * tOp)1018 NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
1019 {
1020   return &tOp->m_impl;
1021 }
1022 
1023 inline void
receive_data(NdbRecAttr * r,const Uint32 * data,Uint32 sz)1024 NdbEventOperationImpl::receive_data(NdbRecAttr *r,
1025 				    const Uint32 *data,
1026 				    Uint32 sz)
1027 {
1028   r->receive_data(data,sz);
1029 #if 0
1030   if (sz)
1031   {
1032     assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
1033     r->theNULLind= 0;
1034     memcpy(r->aRef(), data, 4 * sz);
1035     return;
1036   }
1037   r->theNULLind= 1;
1038 #endif
1039 }
1040 
1041 inline bool
isCompletelyBuffering()1042 EventBufferManager::isCompletelyBuffering()
1043 {
1044   if (m_event_buffer_manager_state == EBM_COMPLETELY_BUFFERING)
1045   {
1046     assert(m_pre_gap_epoch == 0 && m_begin_gap_epoch == 0 &&
1047            m_end_gap_epoch == 0);
1048     return true;
1049   }
1050   return false;
1051 }
1052 
1053 inline bool
isPartiallyDiscarding()1054 EventBufferManager::isPartiallyDiscarding()
1055 {
1056   if (m_event_buffer_manager_state == EBM_PARTIALLY_DISCARDING)
1057   {
1058     assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch == 0 &&
1059            m_end_gap_epoch == 0);
1060     return true;
1061   }
1062   return false;
1063 }
1064 
1065 inline bool
isCompletelyDiscarding()1066 EventBufferManager::isCompletelyDiscarding()
1067 {
1068   if (m_event_buffer_manager_state == EBM_COMPLETELY_DISCARDING)
1069   {
1070     assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 &&
1071            m_end_gap_epoch == 0);
1072     return true;
1073   }
1074   return false;
1075 }
1076 
1077 inline bool
isPartiallyBuffering()1078 EventBufferManager::isPartiallyBuffering()
1079 {
1080   if (m_event_buffer_manager_state == EBM_PARTIALLY_BUFFERING)
1081   {
1082     assert(m_pre_gap_epoch > 0 && m_begin_gap_epoch > 0 &&
1083            m_end_gap_epoch > 0);
1084     return true;
1085   }
1086   return false;
1087 }
1088 
1089 inline bool
isInDiscardingState()1090 EventBufferManager::isInDiscardingState()
1091 {
1092   return (m_event_buffer_manager_state != EBM_COMPLETELY_BUFFERING);
1093 }
1094 
1095 #endif
1096