1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 #include <kernel_types.h>
28 
29 #include "API.hpp"
30 #include <NdbOut.hpp>
31 
32 #include <signaldata/CreateEvnt.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <SimpleProperties.hpp>
35 #include <Bitmask.hpp>
36 #include <AttributeHeader.hpp>
37 #include <AttributeList.hpp>
38 #include <NdbError.hpp>
39 #include <BaseString.hpp>
40 #include <UtilBuffer.hpp>
41 #include <portlib/NdbMem.h>
42 #include <signaldata/AlterTable.hpp>
43 #include "ndb_internal.hpp"
44 
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47 
48 #define TOTAL_BUCKETS_INIT (1U << 15)
49 static Gci_container_pod g_empty_gci_container;
50 
51 #if defined(VM_TRACE) && defined(NOT_USED)
52 static void
print_std(const SubTableData * sdata,LinearSectionPtr ptr[3])53 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
54 {
55   printf("addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (void*)sdata,
56          sdata->gci_hi, sdata->gci_lo,
57 	 SubTableData::getOperation(sdata->requestInfo));
58   for (int i = 0; i <= 2; i++) {
59     printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
60     for (int j = 0; (uint) j < ptr[i].sz; j++)
61       printf("%08x ", ptr[i].p[j]);
62     printf("\n");
63   }
64 }
65 #endif
66 
67 // EventBufData
68 
69 void
add_part_size(Uint32 & full_count,Uint32 & full_sz) const70 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
71 {
72   Uint32 tmp_count = 0;
73   Uint32 tmp_sz = 0;
74   const EventBufData* data2 = m_next_blob;
75   while (data2 != 0) {
76     tmp_count++;
77     tmp_sz += data2->sz;
78     const EventBufData* data3 = data2->m_next;
79     while (data3 != 0) {
80       tmp_count++;
81       tmp_sz += data3->sz;
82       data3 = data3->m_next;
83     }
84     data2 = data2->m_next_blob;
85   }
86   full_count += tmp_count;
87   full_sz += tmp_sz;
88 }
89 
90 /*
91  * Class NdbEventOperationImpl
92  *
93  *
94  */
95 
96 // todo handle several ndb objects
97 // todo free allocated data when closing NdbEventBuffer
98 
NdbEventOperationImpl(NdbEventOperation & f,Ndb * theNdb,const char * eventName)99 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f,
100 					     Ndb *theNdb,
101 					     const char* eventName) :
102   NdbEventOperation(*this),
103   m_facade(&f),
104   m_ndb(theNdb),
105   m_state(EO_ERROR),
106   m_oid(~(Uint32)0),
107   m_allow_empty_update(false)
108 {
109   DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
110 
111   assert(m_ndb != NULL);
112   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
113   assert(myDict != NULL);
114 
115   const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
116   if (!myEvnt)
117   {
118     m_error.code= myDict->getNdbError().code;
119     DBUG_VOID_RETURN;
120   }
121 
122   init(myEvnt->m_impl);
123   DBUG_VOID_RETURN;
124 }
125 
NdbEventOperationImpl(Ndb * theNdb,NdbEventImpl & evnt)126 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb,
127                                              NdbEventImpl& evnt) :
128   NdbEventOperation(*this),
129   m_facade(this),
130   m_ndb(theNdb),
131   m_state(EO_ERROR),
132   m_oid(~(Uint32)0),
133   m_allow_empty_update(false)
134 {
135   DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
136   init(evnt);
137   DBUG_VOID_RETURN;
138 }
139 
140 void
init(NdbEventImpl & evnt)141 NdbEventOperationImpl::init(NdbEventImpl& evnt)
142 {
143   DBUG_ENTER("NdbEventOperationImpl::init");
144 
145   m_magic_number = 0;
146   mi_type = 0;
147   m_change_mask = 0;
148 #ifdef VM_TRACE
149   m_data_done_count = 0;
150   m_data_count = 0;
151 #endif
152   m_next = 0;
153   m_prev = 0;
154 
155   m_eventId = 0;
156   theFirstPkAttrs[0] = NULL;
157   theCurrentPkAttrs[0] = NULL;
158   theFirstPkAttrs[1] = NULL;
159   theCurrentPkAttrs[1] = NULL;
160   theFirstDataAttrs[0] = NULL;
161   theCurrentDataAttrs[0] = NULL;
162   theFirstDataAttrs[1] = NULL;
163   theCurrentDataAttrs[1] = NULL;
164 
165   theBlobList = NULL;
166   theBlobOpList = NULL;
167   theMainOp = NULL;
168   theBlobVersion = 0;
169 
170   m_data_item= NULL;
171   m_eventImpl = NULL;
172 
173   m_custom_data= 0;
174   m_has_error= 1;
175 
176   // we should lookup id in Dictionary, TODO
177   // also make sure we only have one listener on each event
178 
179   m_eventImpl = &evnt;
180 
181   m_eventId = m_eventImpl->m_eventId;
182 
183   m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
184 
185   m_state= EO_CREATED;
186 
187   m_stop_gci = 0;
188 #ifdef ndb_event_stores_merge_events_flag
189   m_mergeEvents = m_eventImpl->m_mergeEvents;
190 #else
191   m_mergeEvents = false;
192 #endif
193   m_ref_count = 0;
194   DBUG_PRINT("info", ("m_ref_count = 0 for op: 0x%lx", (long) this));
195 
196   m_has_error= 0;
197 
198   DBUG_PRINT("exit",("this: 0x%lx  oid: %u", (long) this, m_oid));
199   DBUG_VOID_RETURN;
200 }
201 
~NdbEventOperationImpl()202 NdbEventOperationImpl::~NdbEventOperationImpl()
203 {
204   DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
205   m_magic_number= 0;
206 
207   if (m_oid == ~(Uint32)0)
208     DBUG_VOID_RETURN;
209 
210   stop();
211 
212   if (theMainOp == NULL)
213   {
214     NdbEventOperationImpl* tBlobOp = theBlobOpList;
215     while (tBlobOp != NULL)
216     {
217       NdbEventOperationImpl *op = tBlobOp;
218       tBlobOp = tBlobOp->m_next;
219       delete op;
220     }
221   }
222 
223   m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
224   DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p",
225              this, m_facade, m_oid, theMainOp));
226 
227   if (m_eventImpl)
228   {
229     delete m_eventImpl->m_facade;
230     m_eventImpl= 0;
231   }
232 
233   DBUG_VOID_RETURN;
234 }
235 
236 NdbEventOperation::State
getState()237 NdbEventOperationImpl::getState()
238 {
239   return m_state;
240 }
241 
242 NdbRecAttr*
getValue(const char * colName,char * aValue,int n)243 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
244 {
245   DBUG_ENTER("NdbEventOperationImpl::getValue");
246   if (m_state != EO_CREATED) {
247     ndbout_c("NdbEventOperationImpl::getValue may only be called between "
248 	     "instantiation and execute()");
249     DBUG_RETURN(NULL);
250   }
251 
252   NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
253 
254   if (tAttrInfo == NULL) {
255     ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
256     DBUG_RETURN(NULL);
257   }
258 
259   DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
260 }
261 
262 NdbRecAttr*
getValue(const NdbColumnImpl * tAttrInfo,char * aValue,int n)263 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
264 {
265   DBUG_ENTER("NdbEventOperationImpl::getValue");
266   // Insert Attribute Id into ATTRINFO part.
267 
268   NdbRecAttr **theFirstAttr;
269   NdbRecAttr **theCurrentAttr;
270 
271   if (tAttrInfo->getPrimaryKey())
272   {
273     theFirstAttr = &theFirstPkAttrs[n];
274     theCurrentAttr = &theCurrentPkAttrs[n];
275   }
276   else
277   {
278     theFirstAttr = &theFirstDataAttrs[n];
279     theCurrentAttr = &theCurrentDataAttrs[n];
280   }
281 
282   /************************************************************************
283    *	Get a Receive Attribute object and link it into the operation object.
284    ************************************************************************/
285   NdbRecAttr *tAttr = m_ndb->getRecAttr();
286   if (tAttr == NULL) {
287     exit(-1);
288     //setErrorCodeAbort(4000);
289     DBUG_RETURN(NULL);
290   }
291 
292   /**********************************************************************
293    * Now set the attribute identity and the pointer to the data in
294    * the RecAttr object
295    * Also set attribute size, array size and attribute type
296    ********************************************************************/
297   if (tAttr->setup(tAttrInfo, aValue)) {
298     //setErrorCodeAbort(4000);
299     m_ndb->releaseRecAttr(tAttr);
300     exit(-1);
301     DBUG_RETURN(NULL);
302   }
303   //theErrorLine++;
304 
305   tAttr->setUNDEFINED();
306 
307   // We want to keep the list sorted to make data insertion easier later
308 
309   if (*theFirstAttr == NULL) {
310     *theFirstAttr = tAttr;
311     *theCurrentAttr = tAttr;
312     tAttr->next(NULL);
313   } else {
314     Uint32 tAttrId = tAttrInfo->m_attrId;
315     if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
316       (*theCurrentAttr)->next(tAttr);
317       tAttr->next(NULL);
318       *theCurrentAttr = tAttr;
319     } else if ((*theFirstAttr)->next() == NULL ||    // only one in list
320 	       (*theFirstAttr)->attrId() > tAttrId) {// or first
321       tAttr->next(*theFirstAttr);
322       *theFirstAttr = tAttr;
323     } else { // at least 2 in list and not first and not last
324       NdbRecAttr *p = *theFirstAttr;
325       NdbRecAttr *p_next = p->next();
326       while (tAttrId > p_next->attrId()) {
327 	p = p_next;
328 	p_next = p->next();
329       }
330       if (tAttrId == p_next->attrId()) { // Using same attribute twice
331 	tAttr->release(); // do I need to do this?
332 	m_ndb->releaseRecAttr(tAttr);
333 	exit(-1);
334 	DBUG_RETURN(NULL);
335       }
336       // this is it, between p and p_next
337       p->next(tAttr);
338       tAttr->next(p_next);
339     }
340   }
341   DBUG_RETURN(tAttr);
342 }
343 
344 NdbBlob*
getBlobHandle(const char * colName,int n)345 NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
346 {
347   DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
348 
349   assert(m_mergeEvents);
350 
351   if (m_state != EO_CREATED) {
352     ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
353 	     "instantiation and execute()");
354     DBUG_RETURN(NULL);
355   }
356 
357   NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
358 
359   if (tAttrInfo == NULL) {
360     ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
361     DBUG_RETURN(NULL);
362   }
363 
364   NdbBlob* bh = getBlobHandle(tAttrInfo, n);
365   DBUG_RETURN(bh);
366 }
367 
368 NdbBlob*
getBlobHandle(const NdbColumnImpl * tAttrInfo,int n)369 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
370 {
371   DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
372   DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
373 
374   // as in NdbOperation, create only one instance
375   NdbBlob* tBlob = theBlobList;
376   NdbBlob* tLastBlob = NULL;
377   while (tBlob != NULL) {
378     if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
379       DBUG_RETURN(tBlob);
380     tLastBlob = tBlob;
381     tBlob = tBlob->theNext;
382   }
383 
384   NdbEventOperationImpl* tBlobOp = NULL;
385 
386   const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
387   assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
388 
389   if (! is_tinyblob) {
390     // blob event name
391     char bename[MAX_TAB_NAME_SIZE];
392     NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
393 
394     // find blob event op if any (it serves both post and pre handles)
395     tBlobOp = theBlobOpList;
396     NdbEventOperationImpl* tLastBlopOp = NULL;
397     while (tBlobOp != NULL) {
398       if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
399         break;
400       }
401       tLastBlopOp = tBlobOp;
402       tBlobOp = tBlobOp->m_next;
403     }
404 
405     DBUG_PRINT("info", ("%s blob event op for %s",
406                         tBlobOp ? " reuse" : " create", bename));
407 
408     // create blob event op if not found
409     if (tBlobOp == NULL) {
410       // get blob event
411       NdbDictionaryImpl& dict =
412         NdbDictionaryImpl::getImpl(*m_ndb->getDictionary());
413       NdbEventImpl* blobEvnt =
414         dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
415       if (blobEvnt == NULL) {
416         m_error.code = dict.m_error.code;
417         DBUG_RETURN(NULL);
418       }
419 
420       // create blob event operation
421       tBlobOp =
422         m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
423       if (tBlobOp == NULL)
424         DBUG_RETURN(NULL);
425 
426       // pointer to main table op
427       tBlobOp->theMainOp = this;
428       tBlobOp->m_mergeEvents = m_mergeEvents;
429       tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
430 
431       // to hide blob op it is linked under main op, not under m_ndb
432       if (tLastBlopOp == NULL)
433         theBlobOpList = tBlobOp;
434       else
435         tLastBlopOp->m_next = tBlobOp;
436       tBlobOp->m_next = NULL;
437     }
438   }
439 
440   tBlob = m_ndb->getNdbBlob();
441   if (tBlob == NULL) {
442     m_error.code = m_ndb->getNdbError().code;
443     DBUG_RETURN(NULL);
444   }
445 
446   // calls getValue on inline and blob part
447   if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
448     m_error.code = tBlob->getNdbError().code;
449     m_ndb->releaseNdbBlob(tBlob);
450     DBUG_RETURN(NULL);
451   }
452 
453   // add to list end
454   if (tLastBlob == NULL)
455     theBlobList = tBlob;
456   else
457     tLastBlob->theNext = tBlob;
458   tBlob->theNext = NULL;
459   DBUG_RETURN(tBlob);
460 }
461 
462 Uint32
get_blob_part_no(bool hasDist)463 NdbEventOperationImpl::get_blob_part_no(bool hasDist)
464 {
465   assert(theBlobVersion == 1 || theBlobVersion == 2);
466   assert(theMainOp != NULL);
467   const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
468   assert(m_data_item != NULL);
469   LinearSectionPtr (&ptr)[3] = m_data_item->ptr;
470 
471   uint pos = 0; // PK and possibly DIST to skip
472 
473   if (unlikely(theBlobVersion == 1)) {
474     pos += AttributeHeader(ptr[0].p[0]).getDataSize();
475     assert(hasDist);
476     pos += AttributeHeader(ptr[0].p[1]).getDataSize();
477   } else {
478     uint n = mainTable->m_noOfKeys;
479     uint i;
480     for (i = 0; i < n; i++) {
481       pos += AttributeHeader(ptr[0].p[i]).getDataSize();
482     }
483     if (hasDist)
484       pos += AttributeHeader(ptr[0].p[n]).getDataSize();
485   }
486 
487   assert(pos < ptr[1].sz);
488   Uint32 no = ptr[1].p[pos];
489   return no;
490 }
491 
492 int
readBlobParts(char * buf,NdbBlob * blob,Uint32 part,Uint32 count,Uint16 * lenLoc)493 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
494                                      Uint32 part, Uint32 count, Uint16* lenLoc)
495 {
496   DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
497   DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
498                       part, count, blob->theEventBlobVersion));
499 
500   NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
501   const bool hasDist = (blob->theStripeSize != 0);
502 
503   DBUG_PRINT_EVENT("info", ("m_data_item=%p", m_data_item));
504   assert(m_data_item != NULL);
505 
506   // search for blob parts list head
507   EventBufData* head;
508   assert(m_data_item != NULL);
509   head = m_data_item->m_next_blob;
510   while (head != NULL)
511   {
512     if (head->m_event_op == blob_op)
513     {
514       DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
515       break;
516     }
517     head = head->m_next_blob;
518   }
519 
520   Uint32 nparts = 0;
521   Uint32 noutside = 0;
522   EventBufData* data = head;
523   // XXX optimize using part no ordering
524   while (data != NULL)
525   {
526     /*
527      * Hack part no directly out of buffer since it is not returned
528      * in pre data (PK buglet).  For part data use receive_event().
529      * This means extra copy. XXX fix
530      */
531     blob_op->m_data_item = data;
532     int r = blob_op->receive_event();
533     require(r > 0);
534     // XXX should be: no = blob->theBlobEventPartValue
535     Uint32 no = blob_op->get_blob_part_no(hasDist);
536 
537     DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part", data, no));
538 
539     if (part <= no && no < part + count)
540     {
541       DBUG_PRINT_EVENT("info", ("part within read range"));
542 
543       const char* src = blob->theBlobEventDataBuf.data;
544       Uint32 sz = 0;
545       if (blob->theFixedDataFlag) {
546         sz = blob->thePartSize;
547       } else {
548         const uchar* p = (const uchar*)blob->theBlobEventDataBuf.data;
549         sz = p[0] + (p[1] << 8);
550         src += 2;
551       }
552       memcpy(buf + (no - part) * sz, src, sz);
553       nparts++;
554       if (lenLoc != NULL) {
555         assert(count == 1);
556         *lenLoc = sz;
557       } else {
558         assert(sz == blob->thePartSize);
559       }
560     }
561     else
562     {
563       DBUG_PRINT_EVENT("info", ("part outside read range"));
564       noutside++;
565     }
566     data = data->m_next;
567   }
568   if (unlikely(nparts != count))
569   {
570     ndbout_c("nparts: %u count: %u noutside: %u", nparts, count, noutside);
571   }
572   assert(nparts == count);
573 
574   DBUG_RETURN_EVENT(0);
575 }
576 
577 int
execute()578 NdbEventOperationImpl::execute()
579 {
580   DBUG_ENTER("NdbEventOperationImpl::execute");
581   m_ndb->theEventBuffer->add_drop_lock();
582   int r = execute_nolock();
583   m_ndb->theEventBuffer->add_drop_unlock();
584   DBUG_RETURN(r);
585 }
586 
587 int
execute_nolock()588 NdbEventOperationImpl::execute_nolock()
589 {
590   DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
591   DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
592 
593   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
594   if (!myDict) {
595     m_error.code= m_ndb->getNdbError().code;
596     DBUG_RETURN(-1);
597   }
598 
599   bool schemaTrans = false;
600   if (m_ndb->theEventBuffer->m_prevent_nodegroup_change)
601   {
602     /*
603      * Since total count of sub data streams (Suma buckets)
604      * are initially set when the first subscription are setup,
605      * a dummy schema transaction are used to stop add or drop
606      * node to occur for first subscription.  Otherwise count may
607      * change before we are in a state to detect that correctly.
608      * This should not be needed since the handling of
609      * SUB_GCP_COMPLETE_REP in recevier thread(s) should handle
610      * this, but until sure this behaviour is kept.
611      */
612     int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(false);
613     if (res != 0)
614     {
615       switch(myDict->getNdbError().code){
616       case 711:
617       case 763:
618         // ignore;
619         break;
620       default:
621         m_error.code= myDict->getNdbError().code;
622         DBUG_RETURN(-1);
623       }
624     }
625     else
626     {
627       schemaTrans = true;
628     }
629   }
630 
631   if (theFirstPkAttrs[0] == NULL &&
632       theFirstDataAttrs[0] == NULL) { // defaults to get all
633   }
634 
635   m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
636   m_state= EO_EXECUTING;
637   mi_type= m_eventImpl->mi_type;
638   // add kernel reference
639   // removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
640   m_ref_count++;
641   m_stop_gci= ~(Uint64)0;
642   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
643   int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
644   if (r == 0)
645   {
646     m_ndb->theEventBuffer->m_prevent_nodegroup_change = false;
647     if (schemaTrans)
648     {
649       schemaTrans = false;
650       myDict->endSchemaTrans(1);
651     }
652 
653     if (theMainOp == NULL) {
654       DBUG_PRINT("info", ("execute blob ops"));
655       NdbEventOperationImpl* blob_op = theBlobOpList;
656       while (blob_op != NULL) {
657         r = blob_op->execute_nolock();
658         if (r != 0) {
659           // since main op is running and possibly some blob ops as well
660           // we can't just reset the main op.  Instead return with error,
661           // main op (and blob ops) will be cleaned up when user calls
662           // dropEventOperation
663           m_error.code= myDict->getNdbError().code;
664           DBUG_RETURN(r);
665         }
666         blob_op = blob_op->m_next;
667       }
668     }
669     if (r == 0)
670     {
671       DBUG_RETURN(0);
672     }
673   }
674   // Error
675   // remove kernel reference
676   // added above
677   m_ref_count--;
678   m_stop_gci = 0;
679   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
680   m_state= EO_ERROR;
681   mi_type= 0;
682   m_magic_number= 0;
683   m_error.code= myDict->getNdbError().code;
684 
685   if (schemaTrans)
686   {
687     schemaTrans = false;
688     myDict->endSchemaTrans(1);
689   }
690 
691   DBUG_RETURN(r);
692 }
693 
694 int
stop()695 NdbEventOperationImpl::stop()
696 {
697   DBUG_ENTER("NdbEventOperationImpl::stop");
698   int i;
699 
700   for (i=0 ; i<2; i++) {
701     NdbRecAttr *p = theFirstPkAttrs[i];
702     while (p) {
703       NdbRecAttr *p_next = p->next();
704       m_ndb->releaseRecAttr(p);
705       p = p_next;
706     }
707     theFirstPkAttrs[i]= 0;
708   }
709   for (i=0 ; i<2; i++) {
710     NdbRecAttr *p = theFirstDataAttrs[i];
711     while (p) {
712       NdbRecAttr *p_next = p->next();
713       m_ndb->releaseRecAttr(p);
714       p = p_next;
715     }
716     theFirstDataAttrs[i]= 0;
717   }
718 
719   if (m_state != EO_EXECUTING)
720   {
721     DBUG_RETURN(-1);
722   }
723 
724   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
725   if (!myDict) {
726     m_error.code= m_ndb->getNdbError().code;
727     DBUG_RETURN(-1);
728   }
729 
730   m_ndb->theEventBuffer->add_drop_lock();
731   int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
732   m_state= EO_DROPPED;
733   mi_type= 0;
734   if (r == 0) {
735     if (m_stop_gci == 0)
736     {
737       // response from old kernel
738       Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
739       if (gci)
740       {
741         // calculate a "safe" gci in the future to remove event op.
742         gci += Uint64(3) << 32;
743       }
744       else
745       {
746         // set highest value to ensure that operation does not get dropped
747         // too early. Note '-1' as ~Uint64(0) indicates active event
748         gci = ~Uint64(0)-1;
749       }
750       m_stop_gci = gci;
751     }
752     m_ndb->theEventBuffer->add_drop_unlock();
753     DBUG_RETURN(0);
754   }
755   //Error
756   m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
757   m_state= EO_ERROR;
758   m_ndb->theEventBuffer->add_drop_unlock();
759   DBUG_RETURN(r);
760 }
761 
tableNameChanged() const762 bool NdbEventOperationImpl::tableNameChanged() const
763 {
764   return (bool)AlterTableReq::getNameFlag(m_change_mask);
765 }
766 
tableFrmChanged() const767 bool NdbEventOperationImpl::tableFrmChanged() const
768 {
769   return (bool)AlterTableReq::getFrmFlag(m_change_mask);
770 }
771 
tableFragmentationChanged() const772 bool NdbEventOperationImpl::tableFragmentationChanged() const
773 {
774   return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
775 }
776 
tableRangeListChanged() const777 bool NdbEventOperationImpl::tableRangeListChanged() const
778 {
779   return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
780 }
781 
782 Uint64
getGCI()783 NdbEventOperationImpl::getGCI()
784 {
785   Uint32 gci_hi = m_data_item->sdata->gci_hi;
786   Uint32 gci_lo = m_data_item->sdata->gci_lo;
787   return gci_lo | (Uint64(gci_hi) << 32);
788 }
789 
790 bool
isErrorEpoch(NdbDictionary::Event::TableEvent * error_type)791 NdbEventOperationImpl::isErrorEpoch(NdbDictionary::Event::TableEvent *error_type)
792 {
793   const NdbDictionary::Event::TableEvent type = getEventType2();
794   // Error types are defined from TE_INCONSISTENT
795   if (type >= NdbDictionary::Event::TE_INCONSISTENT)
796   {
797     if (error_type)
798       *error_type = type;
799     return true;
800   }
801   return false;
802 }
803 
804 bool
isEmptyEpoch()805 NdbEventOperationImpl::isEmptyEpoch()
806 {
807   const Uint32 type = getEventType2();
808   if (type == NdbDictionary::Event::TE_EMPTY)
809     return true;
810   return false;
811 }
812 
813 Uint32
getAnyValue() const814 NdbEventOperationImpl::getAnyValue() const
815 {
816   return m_data_item->sdata->anyValue;
817 }
818 
819 Uint64
getLatestGCI()820 NdbEventOperationImpl::getLatestGCI()
821 {
822   return m_ndb->theEventBuffer->getLatestGCI();
823 }
824 
825 Uint64
getTransId() const826 NdbEventOperationImpl::getTransId() const
827 {
828   /* Return 64 bit composite */
829   Uint32 transId1 = m_data_item->sdata->transId1;
830   Uint32 transId2 = m_data_item->sdata->transId2;
831   return Uint64(transId1) << 32 | transId2;
832 }
833 
834 bool
execSUB_TABLE_DATA(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])835 NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal,
836                                           const LinearSectionPtr ptr[3])
837 {
838   DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
839   const SubTableData * const sdata=
840     CAST_CONSTPTR(SubTableData, signal->getDataPtr());
841 
842   if(signal->isFirstFragment()){
843     m_fragmentId = signal->getFragmentId();
844     m_buffer.grow(4 * sdata->totalLen);
845   } else {
846     if(m_fragmentId != signal->getFragmentId()){
847       abort();
848     }
849   }
850 
851   const Uint32 i = SubTableData::DICT_TAB_INFO;
852   DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
853                       4 * ptr[i].sz, m_fragmentId));
854   m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
855 
856   if(!signal->isLastFragment()){
857     DBUG_RETURN(FALSE);
858   }
859 
860   DBUG_RETURN(TRUE);
861 }
862 
863 
864 int
receive_event()865 NdbEventOperationImpl::receive_event()
866 {
867   Uint32 operation=
868     SubTableData::getOperation(m_data_item->sdata->requestInfo);
869   if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
870   {
871     DBUG_ENTER("NdbEventOperationImpl::receive_event");
872     DBUG_PRINT("info",("sdata->operation %u  this: %p", operation, this));
873     m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
874     if (operation == NdbDictionary::Event::_TE_ALTER)
875     {
876       // Parse the new table definition and
877       // create a table object
878       NdbDictInterface::Tx tx_unused;
879       NdbError error;
880       int warn;
881       NdbDictInterface dif(tx_unused, error, warn);
882       NdbTableImpl *at;
883       m_change_mask = m_data_item->sdata->changeMask;
884       error.code = dif.parseTableInfo(&at,
885                                       (Uint32*)m_buffer.get_data(),
886                                       m_buffer.length() / 4,
887                                       true);
888       m_buffer.clear();
889       if (unlikely(error.code))
890       {
891         DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
892                                   error.code));
893         ndbout_c("Failed to parse DictTabInfo error %u", error.code);
894         DBUG_RETURN(1);
895       }
896       at->buildColumnHash();
897 
898       NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
899       m_eventImpl->m_tableImpl = at;
900 
901       DBUG_PRINT("info", ("switching table impl 0x%lx -> 0x%lx",
902                           (long) tmp_table_impl, (long) at));
903 
904       // change the rec attrs to refer to the new table object
905       int i;
906       for (i = 0; i < 2; i++)
907       {
908         NdbRecAttr *p = theFirstPkAttrs[i];
909         while (p)
910         {
911           int no = p->getColumn()->getColumnNo();
912           NdbColumnImpl *tAttrInfo = at->getColumn(no);
913           DBUG_PRINT("info", ("rec_attr: 0x%lx  "
914                               "switching column impl 0x%lx -> 0x%lx",
915                               (long) p, (long) p->m_column, (long) tAttrInfo));
916           p->m_column = tAttrInfo;
917           p = p->next();
918         }
919       }
920       for (i = 0; i < 2; i++)
921       {
922         NdbRecAttr *p = theFirstDataAttrs[i];
923         while (p)
924         {
925           int no = p->getColumn()->getColumnNo();
926           NdbColumnImpl *tAttrInfo = at->getColumn(no);
927           DBUG_PRINT("info", ("rec_attr: 0x%lx  "
928                               "switching column impl 0x%lx -> 0x%lx",
929                               (long) p, (long) p->m_column, (long) tAttrInfo));
930           p->m_column = tAttrInfo;
931           p = p->next();
932         }
933       }
934       // change the blobHandle's to refer to the new table object.
935       NdbBlob *p = theBlobList;
936       while (p)
937       {
938         int no = p->getColumn()->getColumnNo();
939         NdbColumnImpl *tAttrInfo = at->getColumn(no);
940         DBUG_PRINT("info", ("blob_handle: 0x%lx  "
941                             "switching column impl 0x%lx -> 0x%lx",
942                             (long) p, (long) p->theColumn, (long) tAttrInfo));
943         p->theColumn = tAttrInfo;
944         p = p->next();
945       }
946       if (tmp_table_impl)
947         delete tmp_table_impl;
948     }
949     DBUG_RETURN(1);
950   }
951 
952   DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
953   DBUG_PRINT_EVENT("info",("sdata->operation %u  this: %p", operation, this));
954   // now move the data into the RecAttrs
955   m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
956 
957   int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
958 
959   Uint32 *aAttrPtr = m_data_item->ptr[0].p;
960   Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
961   Uint32 *aDataPtr = m_data_item->ptr[1].p;
962 
963   DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
964   DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
965 
966   // copy data into the RecAttr's
967   // we assume that the respective attribute lists are sorted
968 
969   // first the pk's
970   {
971     NdbRecAttr *tAttr= theFirstPkAttrs[0];
972     NdbRecAttr *tAttr1= theFirstPkAttrs[1];
973     while(tAttr)
974     {
975       assert(aAttrPtr < aAttrEndPtr);
976       unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
977       assert(tAttr->attrId() ==
978 	     AttributeHeader(*aAttrPtr).getAttributeId());
979       receive_data(tAttr, aDataPtr, tDataSz);
980       if (!is_insert)
981 	receive_data(tAttr1, aDataPtr, tDataSz);
982       else
983         tAttr1->setUNDEFINED(); // do not leave unspecified
984       tAttr1= tAttr1->next();
985       // next
986       aAttrPtr++;
987       aDataPtr+= (tDataSz + 3) >> 2;
988       tAttr= tAttr->next();
989     }
990   }
991 
992   NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
993   Uint32 tRecAttrId;
994   Uint32 tAttrId;
995   Uint32 tDataSz;
996   int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE) ||
997     m_allow_empty_update;
998   while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
999     tRecAttrId = tWorkingRecAttr->attrId();
1000     tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1001     tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
1002 
1003     while (tAttrId > tRecAttrId) {
1004       DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
1005                                tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
1006       tWorkingRecAttr->setUNDEFINED();
1007       tWorkingRecAttr = tWorkingRecAttr->next();
1008       if (tWorkingRecAttr == NULL)
1009 	break;
1010       tRecAttrId = tWorkingRecAttr->attrId();
1011     }
1012     if (tWorkingRecAttr == NULL)
1013       break;
1014 
1015     if (tAttrId == tRecAttrId) {
1016       hasSomeData=1;
1017 
1018       DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
1019                                tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
1020 
1021       receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1022       tWorkingRecAttr = tWorkingRecAttr->next();
1023     }
1024     aAttrPtr++;
1025     aDataPtr += (tDataSz + 3) >> 2;
1026   }
1027 
1028   while (tWorkingRecAttr != NULL) {
1029     tRecAttrId = tWorkingRecAttr->attrId();
1030     //printf("set undefined [%u] %u %u [%u]\n",
1031     //       tAttrId, tDataSz, *aDataPtr, tRecAttrId);
1032     tWorkingRecAttr->setUNDEFINED();
1033     tWorkingRecAttr = tWorkingRecAttr->next();
1034   }
1035 
1036   tWorkingRecAttr = theFirstDataAttrs[1];
1037   aDataPtr = m_data_item->ptr[2].p;
1038   Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1039   while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1040     tRecAttrId = tWorkingRecAttr->attrId();
1041     tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
1042     tDataSz = AttributeHeader(*aDataPtr).getByteSize();
1043     aDataPtr++;
1044     while (tAttrId > tRecAttrId) {
1045       tWorkingRecAttr->setUNDEFINED();
1046       tWorkingRecAttr = tWorkingRecAttr->next();
1047       if (tWorkingRecAttr == NULL)
1048 	break;
1049       tRecAttrId = tWorkingRecAttr->attrId();
1050     }
1051     if (tWorkingRecAttr == NULL)
1052       break;
1053     if (tAttrId == tRecAttrId) {
1054       assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
1055       hasSomeData=1;
1056 
1057       receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1058       tWorkingRecAttr = tWorkingRecAttr->next();
1059     }
1060     aDataPtr += (tDataSz + 3) >> 2;
1061   }
1062   while (tWorkingRecAttr != NULL) {
1063     tWorkingRecAttr->setUNDEFINED();
1064     tWorkingRecAttr = tWorkingRecAttr->next();
1065   }
1066 
1067   if (hasSomeData)
1068   {
1069     DBUG_RETURN_EVENT(1);
1070   }
1071 
1072   DBUG_RETURN_EVENT(0);
1073 }
1074 
1075 NdbDictionary::Event::TableEvent
getEventType2()1076 NdbEventOperationImpl::getEventType2()
1077 {
1078   return (NdbDictionary::Event::TableEvent)
1079     (1U << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1080 }
1081 
1082 
1083 
1084 void
print()1085 NdbEventOperationImpl::print()
1086 {
1087   int i;
1088   ndbout << "EventId " << m_eventId << "\n";
1089 
1090   for (i = 0; i < 2; i++) {
1091     NdbRecAttr *p = theFirstPkAttrs[i];
1092     ndbout << " %u " << i;
1093     while (p) {
1094       ndbout << " : " << p->attrId() << " = " << *p;
1095       p = p->next();
1096     }
1097     ndbout << "\n";
1098   }
1099   for (i = 0; i < 2; i++) {
1100     NdbRecAttr *p = theFirstDataAttrs[i];
1101     ndbout << " %u " << i;
1102     while (p) {
1103       ndbout << " : " << p->attrId() << " = " << *p;
1104       p = p->next();
1105     }
1106     ndbout << "\n";
1107   }
1108 }
1109 
1110 void
printAll()1111 NdbEventOperationImpl::printAll()
1112 {
1113   Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1114   Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1115   Uint32 *aDataPtr = m_data_item->ptr[1].p;
1116 
1117   //tRecAttr->setup(tAttrInfo, aValue)) {
1118 
1119   Uint32 tAttrId;
1120   Uint32 tDataSz;
1121   for (; aAttrPtr < aAttrEndPtr; ) {
1122     tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1123     tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
1124 
1125     aAttrPtr++;
1126     aDataPtr += tDataSz;
1127   }
1128 }
1129 
EventBufferManager(const Ndb * const ndb)1130 EventBufferManager::EventBufferManager(const Ndb* const ndb) :
1131   m_ndb(ndb),
1132   m_pre_gap_epoch(0), // equivalent to setting state COMPLETELY_BUFFERING
1133   m_begin_gap_epoch(0),
1134   m_end_gap_epoch(0),
1135   m_max_buffered_epoch(0),
1136   m_max_received_epoch(0),
1137   m_free_percent(20),
1138   m_event_buffer_manager_state(EBM_COMPLETELY_BUFFERING)
1139 {}
1140 
1141 unsigned
get_eventbuffer_free_percent()1142 EventBufferManager::get_eventbuffer_free_percent()
1143 {
1144   return m_free_percent;
1145 }
1146 
1147 void
set_eventbuffer_free_percent(unsigned free)1148 EventBufferManager::set_eventbuffer_free_percent(unsigned free)
1149 {
1150   m_free_percent = free;
1151 }
1152 
1153 void
onBufferingEpoch(Uint64 received_epoch)1154 EventBufferManager::onBufferingEpoch(Uint64 received_epoch)
1155 {
1156   if (m_max_buffered_epoch < received_epoch)
1157     m_max_buffered_epoch = received_epoch;
1158 }
1159 
1160 bool
onEventDataReceived(Uint32 memory_usage_percent,Uint64 received_epoch)1161 EventBufferManager::onEventDataReceived(Uint32 memory_usage_percent,
1162                                         Uint64 received_epoch)
1163 {
1164   bool report_status = false;
1165 
1166   if (isCompletelyBuffering())
1167   {
1168     if (memory_usage_percent >= 100)
1169     {
1170       // Transition COMPLETELY_BUFFERING -> PARTIALLY_DISCARDING.
1171       m_pre_gap_epoch = m_max_buffered_epoch;
1172       m_event_buffer_manager_state = EBM_PARTIALLY_DISCARDING;
1173       report_status = true;
1174     }
1175   }
1176   else if (isCompletelyDiscarding())
1177   {
1178     if (memory_usage_percent < 100 - m_free_percent)
1179     {
1180       // Transition COMPLETELY_DISCARDING -> PARTIALLY_BUFFERING
1181       m_end_gap_epoch = m_max_received_epoch;
1182       m_event_buffer_manager_state = EBM_PARTIALLY_BUFFERING;
1183       report_status = true;
1184     }
1185   }
1186   else if (isPartiallyBuffering())
1187   {
1188     if (memory_usage_percent >= 100)
1189     {
1190       // New gap is starting before the on-going gap ends.
1191       report_status = true;
1192 
1193       g_eventLogger->warning("Ndb 0x%x %s: Event Buffer: Ending gap epoch %u/%u (%llu) lacks event buffer memory. Overbuffering.",
1194               m_ndb->getReference(), m_ndb->getNdbObjectName(),
1195               Uint32(m_begin_gap_epoch >> 32), Uint32(m_begin_gap_epoch),
1196               m_begin_gap_epoch);
1197       g_eventLogger->warning("Check how many epochs the eventbuffer_free_percent memory can accommodate.\n");
1198       g_eventLogger->warning("Increase eventbuffer_free_percent, eventbuffer memory or both accordingly.\n");
1199     }
1200   }
1201   /**
1202    * else: transition from PARTIALLY_DISCARDING to COMPLETELY_DISCARDING
1203    * and PARTIALLY_BUFFERING to COMPLETELY_BUFFERING
1204    * will be handled in execSUB_GCP_COMPLETE()
1205    */
1206 
1207   // Any new epoch received after memory becomes available will be buffered
1208   if (m_max_received_epoch < received_epoch)
1209     m_max_received_epoch = received_epoch;
1210 
1211   return report_status;
1212 }
1213 
1214 bool
isEventDataToBeDiscarded(Uint64 received_epoch)1215 EventBufferManager::isEventDataToBeDiscarded(Uint64 received_epoch)
1216 {
1217   DBUG_ENTER_EVENT("EventBufferManager::isEventDataToBeDiscarded");
1218   /* Discard event data received via SUB_TABLE_DATA during gap period,
1219    * m_pre_gap_epoch > 0 : gap will start at the next epoch
1220    * m_end_gap_epoch == 0 : gap has not ended
1221    * received_epoch <= m_end_gap_epoch : gap has ended at m_end_gap_epoch
1222    */
1223   if (m_pre_gap_epoch > 0 && received_epoch > m_pre_gap_epoch &&
1224       (m_end_gap_epoch == 0 || received_epoch <= m_end_gap_epoch ))
1225   {
1226     assert(isInDiscardingState());
1227     DBUG_PRINT_EVENT("info", ("Discarding SUB_TABLE_DATA for epoch %u/%u (%llu) > begin_gap epoch %u/%u (%llu)",
1228                               Uint32(received_epoch >> 32),
1229                               Uint32(received_epoch),
1230                               received_epoch,
1231                               Uint32(m_pre_gap_epoch >> 32),
1232                               Uint32(m_pre_gap_epoch),
1233                               m_pre_gap_epoch));
1234     if (m_end_gap_epoch > 0)
1235     {
1236       DBUG_PRINT_EVENT("info", (" and <= end_gap epoch %u/%u (%llu)"
1237                                 Uint32(m_end_gap_epoch >> 32),
1238                                 Uint32(m_end_gap_epoch),
1239                                 m_end_gap_epoch));
1240     }
1241     DBUG_RETURN_EVENT(true);
1242   }
1243   DBUG_RETURN_EVENT(false);
1244 }
1245 
1246 bool
onEpochCompleted(Uint64 completed_epoch,bool & gap_begins)1247 EventBufferManager::onEpochCompleted(Uint64 completed_epoch, bool& gap_begins)
1248 {
1249   bool report_status = false;
1250 
1251   if (isPartiallyDiscarding() && completed_epoch > m_pre_gap_epoch)
1252   {
1253     /**
1254      * No on-going gap. This should be the first completed epoch after
1255      * a transition to PARTIALLY_DISCARDING (the first completed epoch
1256      * after m_pre_gap_epoch). Mark this as the beginning of a new gap.
1257      * Transition PARTIALLY_DISCARDING -> COMPLETELY_DISCARDING:
1258      */
1259     m_begin_gap_epoch = completed_epoch;
1260     m_event_buffer_manager_state = EBM_COMPLETELY_DISCARDING;
1261     gap_begins = true;
1262     report_status = true;
1263     g_eventLogger->warning("Ndb 0x%x %s: Event Buffer: New gap begins at epoch : %u/%u (%llu)",
1264                            m_ndb->getReference(), m_ndb->getNdbObjectName(),
1265                            (Uint32)(m_begin_gap_epoch >> 32),
1266                            (Uint32)m_begin_gap_epoch, m_begin_gap_epoch);
1267   }
1268   else if (isPartiallyBuffering() && completed_epoch > m_end_gap_epoch)
1269   {
1270     // The completed_epoch marks the first completely buffered post_gap epoch
1271     // Transition PARTIALLY_BUFFERNG -> COMPLETELY_BUFFERING
1272     g_eventLogger->warning("Ndb 0x%x %s: Event Buffer : Gap began at epoch : %u/%u (%llu) ends at epoch %u/%u (%llu)",
1273                            m_ndb->getReference(),
1274                            m_ndb->getNdbObjectName(),
1275                            (Uint32)(m_begin_gap_epoch >> 32),
1276                            (Uint32)m_begin_gap_epoch, m_begin_gap_epoch,
1277                            (Uint32)(completed_epoch >> 32),
1278                            (Uint32)completed_epoch, completed_epoch);
1279     m_pre_gap_epoch = 0;
1280     m_begin_gap_epoch = 0;
1281     m_end_gap_epoch = 0;
1282     m_event_buffer_manager_state = EBM_COMPLETELY_BUFFERING;
1283     report_status = true;
1284   }
1285   /**
1286    * else: transition from COMPLETELY_BUFFERING to PARTIALLY_DISCARDING
1287    * and COMPLETELY_DISCARDING to PARTIALLY_BUFFERING
1288    * are handled in insertDataL
1289    */
1290   return report_status;
1291 }
1292 
1293 bool
isGcpCompleteToBeDiscarded(Uint64 completed_epoch)1294 EventBufferManager::isGcpCompleteToBeDiscarded(Uint64 completed_epoch)
1295 {
1296   DBUG_ENTER_EVENT("EventBufferManager::isGcpCompleteToBeDiscarded");
1297   /* Discard SUB_GCP_COMPLETE during gap period,
1298    * m_begin_gap_epoch > 0 : gap has started at m_begin_gap_epoch
1299    * m_end_gap_epoch == 0 : gap has not ended
1300    * received_epoch <= m_end_gap_epoch : gap has ended at m_end_gap_epoch
1301    */
1302 
1303   // for m_begin_gap_epoch < completed_epoch <= m_end_gap_epoch
1304 
1305   if (m_begin_gap_epoch > 0 && completed_epoch > m_begin_gap_epoch &&
1306       (m_end_gap_epoch == 0 || completed_epoch <= m_end_gap_epoch ))
1307   {
1308     assert(isInDiscardingState());
1309     DBUG_PRINT_EVENT("info", ("Discarding SUB_GCP_COMPLETE_REP for epoch %u/%u (%llu) > begin_gap epoch %u/%u (%llu)",
1310                               Uint32(completed_epoch >> 32),
1311                               Uint32(completed_epoch),
1312                               completed_epoch,
1313                               Uint32(m_begin_gap_epoch >> 32),
1314                               Uint32(m_begin_gap_epoch),
1315                               m_begin_gap_epoch));
1316     if (m_end_gap_epoch > 0)
1317     {
1318       DBUG_PRINT_EVENT("info", (" and <= end_gap epoch %u/%u (%llu)"
1319                                 Uint32(m_end_gap_epoch >> 32),
1320                                 Uint32(m_end_gap_epoch),
1321                                 m_end_gap_epoch));
1322     }
1323     DBUG_RETURN_EVENT(true);
1324   }
1325   DBUG_RETURN_EVENT(false);
1326 }
1327 
1328 /*
1329  * Class NdbEventBuffer
1330  * Each Ndb object has a Object.
1331  */
NdbEventBuffer(Ndb * ndb)1332 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
1333   m_total_buckets(TOTAL_BUCKETS_INIT),
1334   m_min_gci_index(0),
1335   m_max_gci_index(0),
1336   m_ndb(ndb),
1337   m_latestGCI(0), m_latest_complete_GCI(0),
1338   m_highest_sub_gcp_complete_GCI(0),
1339   m_latest_poll_GCI(0),
1340   m_failure_detected(false),
1341   m_prevent_nodegroup_change(true),
1342   m_total_alloc(0),
1343   m_max_alloc(0),
1344   m_event_buffer_manager(ndb),
1345   m_free_thresh(0),
1346   m_min_free_thresh(0),
1347   m_max_free_thresh(0),
1348   m_gci_slip_thresh(0),
1349   m_dropped_ev_op(0),
1350   m_active_op_count(0)
1351 {
1352 #ifdef VM_TRACE
1353   m_latest_command= "NdbEventBuffer::NdbEventBuffer";
1354   m_flush_gci = 0;
1355 #endif
1356 
1357   if ((p_cond = NdbCondition_Create()) ==  NULL) {
1358     ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
1359     exit(-1);
1360   }
1361   m_mutex = 0; // Set in Ndb::init()
1362 
1363   // ToDo set event buffer size
1364   // pre allocate event data array
1365   m_sz= 0;
1366 #ifdef VM_TRACE
1367   m_free_data_count= 0;
1368 #endif
1369   m_free_data= 0;
1370   m_free_data_sz= 0;
1371 
1372   // get reference to mutex managed by current connection
1373   m_add_drop_mutex=
1374     m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1375 
1376   // initialize lists
1377   bzero(&g_empty_gci_container, sizeof(Gci_container));
1378   init_gci_containers();
1379 
1380   m_alive_node_bit_mask.clear();
1381 
1382   bzero(&m_sub_data_streams, sizeof(m_sub_data_streams));
1383 }
1384 
~NdbEventBuffer()1385 NdbEventBuffer::~NdbEventBuffer()
1386 {
1387   // todo lock?  what if receive thread writes here?
1388   NdbEventOperationImpl* op= m_dropped_ev_op;
1389   while ((op = m_dropped_ev_op))
1390   {
1391     m_dropped_ev_op = m_dropped_ev_op->m_next;
1392     delete op->m_facade;
1393   }
1394 
1395   unsigned j;
1396   Uint32 sz= m_active_gci.size();
1397   Gci_container* array = (Gci_container*)m_active_gci.getBase();
1398   for(j = 0; j < sz; j++)
1399   {
1400     array[j].~Gci_container();
1401   }
1402 
1403   // Return EventBufData lists to free list in a nice way
1404   // before actual deallocation using m_allocated_data.
1405   if(!m_complete_data.m_data.is_empty())
1406     free_list(m_complete_data.m_data);
1407   if(!m_available_data.is_empty())
1408     free_list(m_available_data);
1409   if(!m_used_data.is_empty())
1410     free_list(m_used_data);
1411   // Return event queue leftovers to free list
1412   clear_event_queue();
1413 
1414 
1415   for (j= 0; j < m_allocated_data.size(); j++)
1416   {
1417     unsigned sz= m_allocated_data[j]->sz;
1418     EventBufData *data= m_allocated_data[j]->data;
1419     EventBufData *end_data= data+sz;
1420     for (; data < end_data; data++)
1421     {
1422       if (data->sdata)
1423 	NdbMem_Free(data->sdata);
1424     }
1425     NdbMem_Free((char*)m_allocated_data[j]);
1426   }
1427 
1428   NdbCondition_Destroy(p_cond);
1429 }
1430 
1431 unsigned
get_eventbuffer_free_percent()1432 NdbEventBuffer::get_eventbuffer_free_percent()
1433 {
1434   return m_event_buffer_manager.get_eventbuffer_free_percent();
1435 }
1436 
1437 void
set_eventbuffer_free_percent(unsigned free)1438 NdbEventBuffer::set_eventbuffer_free_percent(unsigned free)
1439 {
1440   m_event_buffer_manager.set_eventbuffer_free_percent(free);
1441 }
1442 
1443 void
add_op()1444 NdbEventBuffer::add_op()
1445 {
1446   /*
1447    * When m_active_op_count is zero, SUB_GCP_COMPLETE_REP is
1448    * ignored and no event data will reach application.
1449    * Positive values will enable event data to reach application.
1450    */
1451   m_active_op_count++;
1452 }
1453 
1454 void
remove_op()1455 NdbEventBuffer::remove_op()
1456 {
1457   m_active_op_count--;
1458   if(m_active_op_count == 0)
1459   {
1460     // Return EventBufData to free list and release GCI ops before clearing.
1461     for(unsigned i = 0; i < m_active_gci.size(); i++)
1462     {
1463       Gci_container& bucket = (Gci_container&)m_active_gci[i];
1464       if (!bucket.m_data.is_empty())
1465       {
1466         free_list(bucket.m_data);
1467       }
1468     }
1469     if (!m_complete_data.m_data.is_empty())
1470     {
1471       free_list(m_complete_data.m_data);
1472     }
1473     init_gci_containers();
1474   }
1475 }
1476 
1477 void
init_gci_containers()1478 NdbEventBuffer::init_gci_containers()
1479 {
1480   m_startup_hack = true;
1481   bzero(&m_complete_data, sizeof(m_complete_data));
1482   m_latest_complete_GCI = m_latestGCI = 0;
1483   m_active_gci.clear();
1484   m_active_gci.fill(3, g_empty_gci_container);
1485   m_min_gci_index = m_max_gci_index = 1;
1486   Uint64 gci = 0;
1487   m_known_gci.clear();
1488   m_known_gci.fill(7, gci);
1489   // Reset cluster failure marker
1490   m_failure_detected= false;
1491 }
1492 
expand(unsigned sz)1493 int NdbEventBuffer::expand(unsigned sz)
1494 {
1495   unsigned alloc_size=
1496     sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
1497   EventBufData_chunk *chunk_data=
1498     (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1499 
1500   chunk_data->sz= sz;
1501   m_allocated_data.push_back(chunk_data);
1502 
1503   EventBufData *data= chunk_data->data;
1504   EventBufData *end_data= data+sz;
1505   EventBufData *last_data= m_free_data;
1506 
1507   bzero((void*)data, sz*sizeof(EventBufData));
1508   for (; data < end_data; data++)
1509   {
1510     data->m_next= last_data;
1511     last_data= data;
1512   }
1513   m_free_data= last_data;
1514 
1515   m_sz+= sz;
1516 #ifdef VM_TRACE
1517   m_free_data_count+= sz;
1518 #endif
1519   return 0;
1520 }
1521 
1522 int
pollEvents(int aMillisecondNumber,Uint64 * highestQueuedEpoch)1523 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *highestQueuedEpoch)
1524 {
1525   if (aMillisecondNumber < 0)
1526   {
1527     g_eventLogger->error("NdbEventBuffer::pollEvents: negative aMillisecondNumber %d 0x%x %s",
1528                          aMillisecondNumber,
1529                          m_ndb->getReference(),
1530                          m_ndb->getNdbObjectName());
1531     return -1;
1532   }
1533   int ret= 1;
1534 #ifdef VM_TRACE
1535   const char *m_latest_command_save= m_latest_command;
1536   m_latest_command= "NdbEventBuffer::pollEvents";
1537 #endif
1538 
1539   NdbMutex_Lock(m_mutex);
1540   NdbEventOperationImpl *ev_op= move_data();
1541   if (unlikely(ev_op == 0 && aMillisecondNumber))
1542   {
1543     NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1544     ev_op= move_data();
1545   }
1546   m_latest_poll_GCI= m_latestGCI;
1547 #ifdef VM_TRACE
1548   if (ev_op)
1549   {
1550     // m_mutex is locked
1551     // update event ops data counters
1552     ev_op->m_data_count-= ev_op->m_data_done_count;
1553     ev_op->m_data_done_count= 0;
1554   }
1555   m_latest_command= m_latest_command_save;
1556 #endif
1557   if (unlikely(ev_op == 0))
1558   {
1559     ret= 0; // applicable for both aMillisecondNumber >= 0
1560     /*
1561       gci's consumed up until m_latest_poll_GCI, so we can free all
1562       dropped event operations stopped up until that gci
1563     */
1564     deleteUsedEventOperations(m_latest_poll_GCI);
1565   }
1566   NdbMutex_Unlock(m_mutex); // we have moved the data
1567 
1568   if (highestQueuedEpoch)
1569     *highestQueuedEpoch= m_latest_poll_GCI;
1570 
1571   return ret;
1572 }
1573 
1574 int
flushIncompleteEvents(Uint64 gci)1575 NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
1576 {
1577   /**
1578    *  Find min complete gci
1579    */
1580   Uint64 * array = m_known_gci.getBase();
1581   Uint32 mask = m_known_gci.size() - 1;
1582   Uint32 minpos = m_min_gci_index;
1583   Uint32 maxpos = m_max_gci_index;
1584 
1585   g_eventLogger->info("Flushing incomplete GCI:s < %u/%u",
1586                       Uint32(gci >> 32), Uint32(gci));
1587   while (minpos != maxpos && array[minpos] < gci)
1588   {
1589     Gci_container* tmp = find_bucket(array[minpos]);
1590     assert(tmp);
1591     assert(maxpos == m_max_gci_index);
1592 
1593     if(!tmp->m_data.is_empty())
1594     {
1595       free_list(tmp->m_data);
1596     }
1597     tmp->~Gci_container();
1598     bzero(tmp, sizeof(Gci_container));
1599     minpos = (minpos + 1) & mask;
1600   }
1601 
1602   m_min_gci_index = minpos;
1603 
1604 #ifdef VM_TRACE
1605   m_flush_gci = gci;
1606 #endif
1607 
1608   return 0;
1609 }
1610 
1611 void
free_consumed_event_data()1612 NdbEventBuffer::free_consumed_event_data()
1613 {
1614   if (m_used_data.m_count > 1024)
1615   {
1616 #ifdef VM_TRACE
1617     m_latest_command= "NdbEventBuffer::free_consumed_event_data (lock)";
1618 #endif
1619     NdbMutex_Lock(m_mutex);
1620     // return m_used_data to m_free_data
1621     free_list(m_used_data);
1622 
1623     NdbMutex_Unlock(m_mutex);
1624   }
1625 }
1626 
1627 void
move_head_event_data_item_to_used_data_queue(EventBufData * data)1628 NdbEventBuffer::move_head_event_data_item_to_used_data_queue(EventBufData *data)
1629 {
1630   // Move first available item to used queue prior to processing
1631   assert(data == m_available_data.m_head);
1632   Uint32 full_count, full_sz;
1633   m_available_data.remove_first(full_count, full_sz);
1634 
1635   m_used_data.append_used_data(data, full_count, full_sz);
1636 
1637   m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1638 }
1639 
1640 EventBufData_list::Gci_ops*
remove_consumed_gci_ops(Uint64 firstKeepGci)1641 NdbEventBuffer::remove_consumed_gci_ops(Uint64 firstKeepGci)
1642 {
1643   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1644   while (gci_ops && gci_ops->m_gci < firstKeepGci)
1645   {
1646     gci_ops = m_available_data.delete_next_gci_ops();
1647   }
1648   return gci_ops;
1649 }
1650 
1651 bool
is_exceptional_epoch(EventBufData * data)1652 NdbEventBuffer::is_exceptional_epoch(EventBufData *data)
1653 {
1654   Uint32 type = SubTableData::getOperation(data->sdata->requestInfo);
1655 
1656   if (type == NdbDictionary::Event::_TE_EMPTY ||
1657       type >= NdbDictionary::Event::_TE_INCONSISTENT)
1658   {
1659     if (type != NdbDictionary::Event::_TE_EMPTY)
1660     {
1661       DBUG_PRINT_EVENT("info",
1662                        ("detected excep. gci %u/%u (%u) 0x%x 0x%x %s",
1663                         Uint32(gci >> 32), Uint32(gci),
1664                         data->sdata->gci_lo|(Uint64(data->sdata->gci_hi) << 32),
1665                         type,
1666                         m_ndb->getReference(), m_ndb->getNdbObjectName()));
1667     }
1668     DBUG_RETURN_EVENT(true);
1669   }
1670   DBUG_RETURN_EVENT(false);
1671 }
1672 
1673 NdbEventOperation *
nextEvent2()1674 NdbEventBuffer::nextEvent2()
1675 {
1676   DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent2");
1677 #ifdef VM_TRACE
1678   const char *m_latest_command_save= m_latest_command;
1679 #endif
1680 
1681   free_consumed_event_data();
1682 
1683 #ifdef VM_TRACE
1684   m_latest_command= "NdbEventBuffer::nextEvent2";
1685 #endif
1686 
1687   EventBufData *data;
1688   while ((data= m_available_data.m_head))
1689   {
1690     move_head_event_data_item_to_used_data_queue(data);
1691 
1692     NdbEventOperationImpl *op= data->m_event_op;
1693 
1694     // all including exceptional event data must have an associated impl
1695     assert(op);
1696 
1697     // set NdbEventOperation data
1698     op->m_data_item= data;
1699     Uint64 gci = op->getGCI();
1700 
1701     if (is_exceptional_epoch(data))
1702     {
1703       DBUG_PRINT_EVENT("info", ("detected inconsistent gci %u 0x%x %s",
1704                                 gci, m_ndb->getReference(),
1705                                 m_ndb->getNdbObjectName()));
1706 
1707       // Remove gci_ops belonging to the epochs consumed already
1708       // to conform with non-exceptional event data handling
1709       remove_consumed_gci_ops(gci);
1710       DBUG_RETURN_EVENT(op->m_facade);
1711     }
1712 
1713     DBUG_PRINT_EVENT("info", ("available data=%p op=%p 0x%x %s",
1714                               data, op, m_ndb->getReference(),
1715                               m_ndb->getNdbObjectName()));
1716 
1717     /*
1718      * If merge is on, blob part sub-events must not be seen on this level.
1719      * If merge is not on, there are no blob part sub-events.
1720      */
1721     assert(op->theMainOp == NULL);
1722 
1723 #ifdef VM_TRACE
1724     op->m_data_done_count++;
1725 #endif
1726 
1727     if (op->m_state == NdbEventOperation::EO_EXECUTING)
1728     {
1729       int r= op->receive_event();
1730       if (r > 0)
1731       {
1732 #ifdef VM_TRACE
1733 	 m_latest_command= m_latest_command_save;
1734 #endif
1735          NdbBlob* tBlob = op->theBlobList;
1736          while (tBlob != NULL)
1737          {
1738            (void)tBlob->atNextEvent();
1739            tBlob = tBlob->theNext;
1740          }
1741 
1742          EventBufData_list::Gci_ops *gci_ops =
1743            remove_consumed_gci_ops(gci);
1744 
1745         if (gci_ops && (gci != gci_ops->m_gci))
1746 	{
1747            ndbout << "nextEvent2: gci_ops->m_gci " << gci_ops->m_gci
1748                   << " (" << Uint32(gci_ops->m_gci >> 32)
1749                   << "/" << Uint32(gci_ops->m_gci) << ") "
1750                   << " gci " << gci
1751                   << " (" << Uint32(gci >> 32)
1752                   << "/" << Uint32(gci) << ") type "
1753                   << hex << op->getEventType2()
1754                   << " data's operation " << hex
1755                   <<  SubTableData::getOperation(data->sdata->requestInfo)
1756                   << " " << m_ndb->getNdbObjectName() << endl;
1757 	}
1758 
1759          assert(gci_ops && (gci == gci_ops->m_gci));
1760          (void) gci_ops; // To avoid compiler warning 'unused variable'
1761 
1762          // to return TE_NUL it should be made into data event
1763          if (SubTableData::getOperation(data->sdata->requestInfo) ==
1764 	   NdbDictionary::Event::_TE_NUL)
1765          {
1766            DBUG_PRINT_EVENT("info", ("skip _TE_NUL 0x%x %s",
1767                                      m_ndb->getReference(),
1768                                      m_ndb->getNdbObjectName()));
1769            continue;
1770          }
1771 	 DBUG_RETURN_EVENT(op->m_facade);
1772        }
1773        // the next event belonged to an event op that is no
1774        // longer valid, skip to next
1775       continue;
1776     }
1777 #ifdef VM_TRACE
1778     m_latest_command= m_latest_command_save;
1779 #endif
1780   }
1781   m_error.code= 0;
1782 #ifdef VM_TRACE
1783   m_latest_command= m_latest_command_save;
1784 #endif
1785 
1786   /*
1787    * gci's consumed up until m_latest_poll_GCI, so
1788    *  - remove remaining gci_ops from the gci_ops list,
1789    *  - free all dropped event operations stopped up until that gci
1790    */
1791   remove_consumed_gci_ops(UINT_MAX64);
1792   if (m_dropped_ev_op)
1793   {
1794     NdbMutex_Lock(m_mutex);
1795     deleteUsedEventOperations(m_latest_poll_GCI);
1796     NdbMutex_Unlock(m_mutex);
1797   }
1798   DBUG_RETURN_EVENT(0);
1799 }
1800 
1801 bool
isConsistent(Uint64 & gci)1802 NdbEventBuffer::isConsistent(Uint64& gci)
1803 {
1804   DBUG_ENTER("NdbEventBuffer::isConsistent");
1805   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1806   while (gci_ops)
1807   {
1808     if (gci_ops->m_error == NdbDictionary::Event::_TE_INCONSISTENT)
1809     {
1810       gci = gci_ops->m_gci;
1811       DBUG_RETURN(false);
1812     }
1813     gci_ops = gci_ops->m_next;
1814   }
1815 
1816   DBUG_RETURN(true);
1817 }
1818 
1819 bool
isConsistentGCI(Uint64 gci)1820 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1821 {
1822   DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
1823   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1824   while (gci_ops)
1825   {
1826     if (gci_ops->m_gci == gci &&
1827         gci_ops->m_error == NdbDictionary::Event::_TE_INCONSISTENT)
1828       DBUG_RETURN(false);
1829     gci_ops = gci_ops->m_next;
1830   }
1831 
1832   DBUG_RETURN(true);
1833 }
1834 
1835 
1836 NdbEventOperationImpl*
getGCIEventOperations(Uint32 * iter,Uint32 * event_types)1837 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1838 {
1839   DBUG_ENTER("NdbEventBuffer::getGCIEventOperations");
1840   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1841   if (*iter < gci_ops->m_gci_op_count)
1842   {
1843     EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
1844     if (event_types != NULL)
1845       *event_types = g.event_types;
1846     DBUG_PRINT("info", ("gci: %u  g.op: 0x%lx  g.event_types: 0x%lx 0x%x %s",
1847                         (unsigned)gci_ops->m_gci, (long) g.op,
1848                         (long) g.event_types, m_ndb->getReference(),
1849                         m_ndb->getNdbObjectName()));
1850     DBUG_RETURN(g.op);
1851   }
1852   DBUG_RETURN(NULL);
1853 }
1854 
1855 void
deleteUsedEventOperations(Uint64 last_consumed_gci)1856 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1857 {
1858   NdbEventOperationImpl *op= m_dropped_ev_op;
1859   while (op && op->m_stop_gci)
1860   {
1861     if (last_consumed_gci > op->m_stop_gci)
1862     {
1863       while (op)
1864       {
1865         NdbEventOperationImpl *next_op= op->m_next;
1866         op->m_stop_gci= 0;
1867         op->m_ref_count--;
1868         if (op->m_ref_count == 0)
1869         {
1870           if (op->m_next)
1871             op->m_next->m_prev = op->m_prev;
1872           if (op->m_prev)
1873             op->m_prev->m_next = op->m_next;
1874           else
1875             m_dropped_ev_op = op->m_next;
1876           delete op->m_facade;
1877         }
1878         op = next_op;
1879       }
1880       break;
1881     }
1882     op = op->m_next;
1883   }
1884 }
1885 
1886 #ifdef VM_TRACE
1887 static
1888 NdbOut&
operator <<(NdbOut & out,const Gci_container & gci)1889 operator<<(NdbOut& out, const Gci_container& gci)
1890 {
1891   out << "[ GCI: " << (gci.m_gci >> 32) << "/" << (gci.m_gci & 0xFFFFFFFF)
1892       << "  state: " << hex << gci.m_state
1893       << "  head: " << hex << gci.m_data.m_head
1894       << "  tail: " << hex << gci.m_data.m_tail
1895 #ifdef VM_TRACE
1896       << "  cnt: " << dec << gci.m_data.m_count
1897 #endif
1898       << " gcp: " << dec << gci.m_gcp_complete_rep_count
1899       << "]";
1900   return out;
1901 }
1902 
1903 static
1904 NdbOut&
operator <<(NdbOut & out,const Gci_container_pod & gci)1905 operator<<(NdbOut& out, const Gci_container_pod& gci)
1906 {
1907   Gci_container* ptr = (Gci_container*)&gci;
1908   out << *ptr;
1909   return out;
1910 }
1911 #endif
1912 
1913 void
resize_known_gci()1914 NdbEventBuffer::resize_known_gci()
1915 {
1916   Uint32 minpos = m_min_gci_index;
1917   Uint32 maxpos = m_max_gci_index;
1918   Uint32 mask = m_known_gci.size() - 1;
1919 
1920   Uint64 fill = 0;
1921   Uint32 newsize = 2 * (mask + 1);
1922   m_known_gci.fill(newsize - 1, fill);
1923   Uint64 * array = m_known_gci.getBase();
1924 
1925   if (0)
1926   {
1927     printf("before (%u): ", minpos);
1928     for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1929       printf("%u/%u ",
1930              Uint32(array[i] >> 32),
1931              Uint32(array[i]));
1932     printf("\n");
1933   }
1934 
1935   Uint32 idx = mask + 1; // Store eveything in "new" part of buffer
1936   if (0) printf("swapping ");
1937   while (minpos != maxpos)
1938   {
1939     if (0) printf("%u-%u ", minpos, idx);
1940     Uint64 tmp = array[idx];
1941     array[idx] = array[minpos];
1942     array[minpos] = tmp;
1943 
1944     idx++;
1945     minpos = (minpos + 1) & mask; // NOTE old mask
1946   }
1947   if (0) printf("\n");
1948 
1949   minpos = m_min_gci_index = mask + 1;
1950   maxpos = m_max_gci_index = idx;
1951   assert(minpos < maxpos);
1952 
1953   if (0)
1954   {
1955     ndbout_c("resize_known_gci from %u to %u", (mask + 1), newsize);
1956     printf("after: ");
1957     for (Uint32 i = minpos; i < maxpos; i++)
1958     {
1959       printf("%u/%u ",
1960              Uint32(array[i] >> 32),
1961              Uint32(array[i]));
1962     }
1963     printf("\n");
1964   }
1965 
1966 #ifdef VM_TRACE
1967   Uint64 gci = array[minpos];
1968   for (Uint32 i = minpos + 1; i<maxpos; i++)
1969   {
1970     assert(array[i] > gci);
1971     gci = array[i];
1972   }
1973 #endif
1974 }
1975 
1976 #ifdef VM_TRACE
1977 void
verify_known_gci(bool allowempty)1978 NdbEventBuffer::verify_known_gci(bool allowempty)
1979 {
1980   Uint32 minpos = m_min_gci_index;
1981   Uint32 maxpos = m_max_gci_index;
1982   Uint32 mask = m_known_gci.size() - 1;
1983 
1984   Uint32 line;
1985 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1986   if (m_min_gci_index == m_max_gci_index)
1987   {
1988     MMASSERT(allowempty);
1989     for (Uint32 i = 0; i<m_active_gci.size(); i++)
1990       MMASSERT(((Gci_container*)(m_active_gci.getBase()+i))->m_gci == 0);
1991     return;
1992   }
1993 
1994   {
1995     Uint64 last = m_known_gci[minpos];
1996     MMASSERT(last > m_latestGCI);
1997     MMASSERT(find_bucket(last) != 0);
1998     MMASSERT(maxpos == m_max_gci_index);
1999 
2000     minpos = (minpos + 1) & mask;
2001     while (minpos != maxpos)
2002     {
2003       MMASSERT(m_known_gci[minpos] > last);
2004       last = m_known_gci[minpos];
2005       MMASSERT(find_bucket(last) != 0);
2006       MMASSERT(maxpos == m_max_gci_index);
2007       minpos = (minpos + 1) & mask;
2008     }
2009   }
2010 
2011   {
2012     Gci_container* bucktets = (Gci_container*)(m_active_gci.getBase());
2013     for (Uint32 i = 0; i<m_active_gci.size(); i++)
2014     {
2015       if (bucktets[i].m_gci)
2016       {
2017         bool found = false;
2018         for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
2019              j = (j + 1) & mask)
2020         {
2021           if (m_known_gci[j] == bucktets[i].m_gci)
2022           {
2023             found = true;
2024             break;
2025           }
2026         }
2027         if (!found)
2028           ndbout_c("%u/%u not found",
2029                    Uint32(bucktets[i].m_gci >> 32),
2030                    Uint32(bucktets[i].m_gci));
2031         MMASSERT(found == true);
2032       }
2033     }
2034   }
2035 
2036   return;
2037 fail:
2038   ndbout_c("assertion at %d", line);
2039   printf("known gci: ");
2040   for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
2041   {
2042     printf("%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
2043   }
2044 
2045   printf("\nContainers");
2046   for (Uint32 i = 0; i<m_active_gci.size(); i++)
2047     ndbout << m_active_gci[i] << endl;
2048   abort();
2049 }
2050 #endif
2051 
2052 Gci_container*
find_bucket_chained(Uint64 gci)2053 NdbEventBuffer::find_bucket_chained(Uint64 gci)
2054 {
2055   if (0)
2056     printf("find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
2057   if (unlikely(gci <= m_latestGCI))
2058   {
2059     /**
2060      * an already complete GCI
2061      */
2062     if (0)
2063       ndbout_c("already complete (%u/%u)",
2064                Uint32(m_latestGCI >> 32),
2065                Uint32(m_latestGCI));
2066     return 0;
2067   }
2068 
2069   if (m_event_buffer_manager.isGcpCompleteToBeDiscarded(gci))
2070   {
2071     return 0; // gci belongs to a gap
2072   }
2073 
2074   if (unlikely(m_total_buckets == 0))
2075   {
2076     return 0;
2077   }
2078 
2079   Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
2080   Uint32 size = m_active_gci.size();
2081   Gci_container *buckets = (Gci_container*)(m_active_gci.getBase());
2082   while (pos < size)
2083   {
2084     Uint64 cmp = (buckets + pos)->m_gci;
2085     if (cmp == gci)
2086     {
2087       if (0)
2088         ndbout_c("found pos: %u", pos);
2089       return buckets + pos;
2090     }
2091 
2092     if (cmp == 0)
2093     {
2094       if (0)
2095         ndbout_c("empty(%u) ", pos);
2096       Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
2097       while (search < size)
2098       {
2099         if ((buckets + search)->m_gci == gci)
2100         {
2101           memcpy(buckets + pos, buckets + search, sizeof(Gci_container));
2102           bzero(buckets + search, sizeof(Gci_container));
2103           if (0)
2104             printf("moved from %u to %u", search, pos);
2105           if (search == size - 1)
2106           {
2107             m_active_gci.erase(search);
2108             if (0)
2109               ndbout_c(" shrink");
2110           }
2111           else
2112           {
2113             if (0)
2114               printf("\n");
2115           }
2116           return buckets + pos;
2117         }
2118         search += ACTIVE_GCI_DIRECTORY_SIZE;
2119       }
2120       goto newbucket;
2121     }
2122     pos += ACTIVE_GCI_DIRECTORY_SIZE;
2123   }
2124 
2125   /**
2126    * This is a new bucket...likely close to start
2127    */
2128   if (0)
2129     ndbout_c("new (with expand) ");
2130   m_active_gci.fill(pos, g_empty_gci_container);
2131   buckets = (Gci_container*)(m_active_gci.getBase());
2132 newbucket:
2133   Gci_container* bucket = buckets + pos;
2134   bucket->m_gci = gci;
2135   bucket->m_gcp_complete_rep_count = m_total_buckets;
2136 
2137   Uint32 mask = m_known_gci.size() - 1;
2138   Uint64 * array = m_known_gci.getBase();
2139 
2140   Uint32 minpos = m_min_gci_index;
2141   Uint32 maxpos = m_max_gci_index;
2142   bool full = ((maxpos + 1) & mask) == minpos;
2143   if (unlikely(full))
2144   {
2145     resize_known_gci();
2146     minpos = m_min_gci_index;
2147     maxpos = m_max_gci_index;
2148     mask = m_known_gci.size() - 1;
2149     array = m_known_gci.getBase();
2150   }
2151 
2152   Uint32 maxindex = (maxpos - 1) & mask;
2153   Uint32 newmaxpos = (maxpos + 1) & mask;
2154   m_max_gci_index = newmaxpos;
2155   if (likely(minpos == maxpos || gci > array[maxindex]))
2156   {
2157     array[maxpos] = gci;
2158 #ifdef VM_TRACE
2159     verify_known_gci(false);
2160 #endif
2161     return bucket;
2162   }
2163 
2164   for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
2165   {
2166     if (array[pos] > gci)
2167       break;
2168   }
2169 
2170   if (0)
2171     ndbout_c("insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
2172              Uint32(gci >> 32),
2173              Uint32(gci),
2174              Uint32(array[maxindex] >> 32),
2175              Uint32(array[maxindex]),
2176              pos,
2177              m_min_gci_index, m_max_gci_index);
2178 
2179   assert(pos != maxpos);
2180   Uint64 oldgci;
2181   do {
2182     oldgci = array[pos];
2183     array[pos] = gci;
2184     gci = oldgci;
2185     pos = (pos + 1) & mask;
2186   } while (pos != maxpos);
2187   array[pos] = gci;
2188 
2189 #ifdef VM_TRACE
2190   verify_known_gci(false);
2191 #endif
2192   return bucket;
2193 }
2194 
2195 void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container * bucket,const SubGcpCompleteRep * const rep,Uint32 replen,Uint32 remcnt,Uint32 repcnt) const2196 NdbEventBuffer::crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
2197 				      const SubGcpCompleteRep * const rep,
2198                                       Uint32 replen,
2199                                       Uint32 remcnt,
2200                                       Uint32 repcnt) const
2201 {
2202   ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
2203   // SubGcpCompleteRep
2204   ndbout_c("signal length: %u", replen);
2205   ndbout_c("gci: %u/%u", rep->gci_hi, rep->gci_lo);
2206   ndbout_c("senderRef: x%x", rep->senderRef);
2207   ndbout_c("count: %u", rep->gcp_complete_rep_count);
2208   ndbout_c("flags: x%x", rep->flags);
2209   if (rep->flags & rep->ON_DISK) ndbout_c("\tON_DISK");
2210   if (rep->flags & rep->IN_MEMORY) ndbout_c("\tIN_MEMORY");
2211   if (rep->flags & rep->MISSING_DATA) ndbout_c("\tMISSING_DATA");
2212   if (rep->flags & rep->ADD_CNT) ndbout_c("\tADD_CNT %u", rep->flags>>16);
2213   if (rep->flags & rep->SUB_CNT) ndbout_c("\tSUB_CNT %u", rep->flags>>16);
2214   if (rep->flags & rep->SUB_DATA_STREAMS_IN_SIGNAL)
2215   {
2216     ndbout_c("\tSUB_DATA_STREAMS_IN_SIGNAL");
2217     // Expected signal size with two stream id per word
2218     const Uint32 explen = rep->SignalLength + (rep->gcp_complete_rep_count + 1)/2;
2219     if (replen != explen)
2220     {
2221       ndbout_c("ERROR: Signal length %d words does not match expected %d! Corrupt signal?", replen, explen);
2222     }
2223     // Protect against corrupt signal length, max signal size is 25 words
2224     if (replen > 25) replen = 25;
2225     if (replen > rep->SignalLength)
2226     {
2227       const int words = replen - rep->SignalLength;
2228       for (int i=0; i < words; i++)
2229       {
2230         ndbout_c("\t\t%04x\t%04x", Uint32(rep->sub_data_streams[i]), Uint32(rep->sub_data_streams[i]>>16));
2231       }
2232     }
2233   }
2234   ndbout_c("remaining count: %u", remcnt);
2235   ndbout_c("report count (without duplicates): %u", repcnt);
2236   // Gci_container
2237   ndbout_c("bucket gci: %u/%u", Uint32(bucket->m_gci>>32), Uint32(bucket->m_gci));
2238   ndbout_c("bucket state: x%x", bucket->m_state);
2239   if (bucket->m_state & bucket->GC_COMPLETE) ndbout_c("\tGC_COMPLETE");
2240   if (bucket->m_state & bucket->GC_INCONSISTENT) ndbout_c("\tGC_INCONSISTENT");
2241   if (bucket->m_state & bucket->GC_CHANGE_CNT) ndbout_c("\tGC_CHANGE_CNT");
2242   if (bucket->m_state & bucket->GC_OUT_OF_MEMORY) ndbout_c("\tGC_OUT_OF_MEMORY");
2243   ndbout_c("bucket remain count: %u", bucket->m_gcp_complete_rep_count);
2244   ndbout_c("total buckets: %u", m_total_buckets);
2245   ndbout_c("startup hack: %u", m_startup_hack);
2246   for (int i=0; i < MAX_SUB_DATA_STREAMS; i++)
2247   {
2248     Uint16 id = m_sub_data_streams[i];
2249     if (id == 0) continue;
2250     ndbout_c("stream: idx %u, id %04x, counted %d", i, id, bucket->m_gcp_complete_rep_sub_data_streams.get(i));
2251   }
2252   abort();
2253 }
2254 
2255 void
complete_empty_bucket_using_exceptional_event(Uint64 gci,Uint32 type)2256 NdbEventBuffer::complete_empty_bucket_using_exceptional_event(Uint64 gci,
2257                                                               Uint32 type)
2258 {
2259   EventBufData *dummy_data= alloc_data();
2260   dummy_data->m_event_op = 0;
2261 
2262   /** Add gci and event type to the inconsistent epoch event data,
2263    * such that nextEvent handles it correctly and makes it visible
2264    * to the consumer, such that consumer will be able to handle it.
2265    */
2266   LinearSectionPtr ptr[3];
2267   for (int i = 0; i < 3; i++)
2268   {
2269     ptr[i].p = NULL;
2270     ptr[i].sz = 0;
2271   }
2272   alloc_mem(dummy_data, ptr, 0);
2273 //  dummy_data = ((Uint32*)NdbMem_Allocate(sizeof(SubTableData) + 3) >> 2);
2274 
2275   SubTableData *sdata = (SubTableData*) dummy_data->memory;
2276   assert(dummy_data->memory);
2277   sdata->tableId = ~0;
2278   sdata->requestInfo = 0;
2279   sdata->gci_hi = Uint32(gci >> 32);
2280   sdata->gci_lo = Uint32(gci);
2281   SubTableData::setOperation(sdata->requestInfo, type);
2282 
2283   // Add the first EventOperationImpl such that
2284   // the exceptional data can be interpreted by consumers
2285   NdbEventOperation* op = m_ndb->getEventOperation(0);
2286   dummy_data->m_event_op = &op->m_impl;
2287   assert(op != NULL);
2288 
2289   // Add gci_ops for error epoch events to make the search for
2290   // inconsistent(Uint64& gci) to be effective (backward compatibility)
2291   {
2292     EventBufData_list dummy_event_list;
2293     dummy_event_list.append_used_data(dummy_data);
2294     dummy_event_list.m_is_not_multi_list = true;
2295     m_complete_data.m_data.append_list(&dummy_event_list, gci);
2296     assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
2297     if (type >= NdbDictionary::Event::_TE_INCONSISTENT)
2298     {
2299       m_complete_data.m_data.m_gci_ops_list_tail->m_error = type;
2300     }
2301   }
2302 }
2303 
2304 void
discard_events_from_bucket(Gci_container * bucket)2305 NdbEventBuffer::discard_events_from_bucket(Gci_container* bucket)
2306 {
2307   // Empty the gci_op(s) list of the epoch from the bucket
2308   DBUG_PRINT_EVENT("info", ("discard_events_from_bucket: deleting m_gci_op_list: %p",
2309                             bucket->m_data.m_gci_op_list));
2310   assert (bucket->m_data.m_is_not_multi_list); // bucket contains only one epoch
2311   delete [] bucket->m_data.m_gci_op_list;
2312 
2313   bucket->m_data.m_gci_op_count = 0;
2314   bucket->m_data.m_gci_op_list = 0;
2315   bucket->m_data.m_gci_op_alloc = 0;
2316 
2317   // Empty the event data from the bucket and return it to m_free_data
2318   free_list(bucket->m_data);
2319 }
2320 
2321 void
complete_bucket(Gci_container * bucket)2322 NdbEventBuffer::complete_bucket(Gci_container* bucket)
2323 {
2324   const Uint64 gci = bucket->m_gci;
2325   Gci_container* buckets = (Gci_container*)m_active_gci.getBase();
2326 
2327   if (0)
2328     ndbout_c("complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
2329              Uint32(bucket - buckets));
2330 
2331 #ifdef VM_TRACE
2332   verify_known_gci(false);
2333 #endif
2334 
2335   bool bucket_empty = bucket->m_data.is_empty();
2336   Uint32 type = 0;
2337   if (bucket->m_state & Gci_container::GC_INCONSISTENT)
2338   {
2339     type = NdbDictionary::Event::_TE_INCONSISTENT;
2340   }
2341   else if (bucket->m_state & Gci_container::GC_OUT_OF_MEMORY)
2342   {
2343     type = NdbDictionary::Event::_TE_OUT_OF_MEMORY;
2344   }
2345   else if (bucket_empty)
2346   {
2347     assert(!bucket->m_data.m_is_not_multi_list);
2348     assert(bucket->m_data.first_gci_ops() == 0);
2349     type = NdbDictionary::Event::_TE_EMPTY;
2350   }
2351 
2352   if(!bucket_empty)
2353   {
2354 #ifdef VM_TRACE
2355     assert(bucket->m_data.m_count);
2356 #endif
2357 
2358     if (bucket->hasError())
2359     {
2360       /*
2361        * Bucket marked as possibly missing data, probably due to
2362        * kernel running out of event_buffer during node failure.
2363        * Discard the partially-received event data.
2364        */
2365       discard_events_from_bucket(bucket);
2366       bucket_empty = true;
2367     }
2368   }
2369 
2370   if (bucket_empty)
2371   {
2372     assert(type > 0);
2373     complete_empty_bucket_using_exceptional_event(gci, type);
2374   }
2375   else
2376   {
2377     // bucket is complete and consistent: add it to complete_data list
2378     m_complete_data.m_data.append_list(&bucket->m_data, gci);
2379   }
2380 
2381   Uint32 minpos = m_min_gci_index;
2382   Uint32 mask = m_known_gci.size() - 1;
2383   assert((mask & (mask + 1)) == 0);
2384 
2385   bzero(bucket, sizeof(Gci_container));
2386 
2387   m_min_gci_index = (minpos + 1) & mask;
2388 
2389 #ifdef VM_TRACE
2390   verify_known_gci(true);
2391 #endif
2392 }
2393 
2394 void
execSUB_START_CONF(const SubStartConf * const rep,Uint32 len)2395 NdbEventBuffer::execSUB_START_CONF(const SubStartConf * const rep,
2396                                    Uint32 len)
2397 {
2398   Uint32 buckets;
2399   if (len >= SubStartConf::SignalLength)
2400   {
2401     buckets = rep->bucketCount;
2402   }
2403   else
2404   {
2405     /*
2406      * Pre-7.0 kernel nodes do not return the number of buckets
2407      * Assume it's == theNoOfDBnodes as was the case in 6.3
2408      */
2409     buckets = m_ndb->theImpl->theNoOfDBnodes;
2410   }
2411 
2412   set_total_buckets(buckets);
2413 
2414   add_op();
2415 }
2416 
2417 void
execSUB_STOP_CONF(const SubStopConf * const rep,Uint32 len)2418 NdbEventBuffer::execSUB_STOP_CONF(const SubStopConf * const rep,
2419                                    Uint32 len)
2420 {
2421   remove_op();
2422 }
2423 
2424 void
execSUB_STOP_REF(const SubStopRef * const rep,Uint32 len)2425 NdbEventBuffer::execSUB_STOP_REF(const SubStopRef * const rep,
2426                                  Uint32 len)
2427 {
2428   remove_op();
2429 }
2430 
2431 void
execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,Uint32 len,int complete_cluster_failure)2432 NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,
2433                                          Uint32 len, int complete_cluster_failure)
2434 {
2435   Uint32 gci_hi = rep->gci_hi;
2436   Uint32 gci_lo = rep->gci_lo;
2437 
2438   if (unlikely(len < SubGcpCompleteRep::SignalLength))
2439   {
2440     gci_lo = 0;
2441   }
2442 
2443   const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2444   if (gci > m_highest_sub_gcp_complete_GCI)
2445     m_highest_sub_gcp_complete_GCI = gci;
2446 
2447   if (!complete_cluster_failure)
2448   {
2449     m_alive_node_bit_mask.set(refToNode(rep->senderRef));
2450     // Reset cluster failure marker
2451     m_failure_detected= false;
2452 
2453     if (unlikely(m_active_op_count == 0))
2454     {
2455       return;
2456     }
2457   }
2458 
2459   DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
2460 
2461   Uint32 cnt= rep->gcp_complete_rep_count;
2462 
2463   Gci_container *bucket = find_bucket(gci);
2464 
2465   if (0)
2466     ndbout_c("execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
2467              Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
2468              rep->flags);
2469 
2470   if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
2471                              SubGcpCompleteRep::SUB_CNT)))
2472   {
2473     handle_change_nodegroup(rep);
2474   }
2475 
2476   if (unlikely(bucket == 0))
2477   {
2478     if (unlikely(gci <= m_latestGCI))
2479     {
2480     /**
2481      * Already completed GCI...
2482      *   Possible in case of resend during NF handling
2483      */
2484 #ifdef VM_TRACE
2485     Uint64 minGCI = m_known_gci[m_min_gci_index];
2486     ndbout_c("bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2487              Uint32(gci >> 32), Uint32(gci),
2488              Uint32(minGCI >> 32), Uint32(minGCI),
2489              Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2490     ndbout << " complete: " << m_complete_data << endl;
2491     for(Uint32 i = 0; i<m_active_gci.size(); i++)
2492     {
2493       if (((Gci_container*)(&m_active_gci[i]))->m_gci)
2494         ndbout << i << " - " << m_active_gci[i] << endl;
2495     }
2496 #endif
2497     }
2498     else
2499     {
2500       DBUG_PRINT_EVENT("info", ("bucket == 0 due to an ongoing gap, completed epoch: %u/%u (%llu)",
2501                                 Uint32(gci >> 32), Uint32(gci), gci));
2502     }
2503     DBUG_VOID_RETURN_EVENT;
2504   }
2505 
2506   if (rep->flags & SubGcpCompleteRep::SUB_DATA_STREAMS_IN_SIGNAL)
2507   {
2508     Uint32 already_counted = 0;
2509     for(Uint32 i = 0; i < cnt; i ++)
2510     {
2511       Uint16 sub_data_stream;
2512       if ((i & 1) == 0)
2513       {
2514         sub_data_stream = rep->sub_data_streams[i / 2] & 0xFFFF;
2515       }
2516       else
2517       {
2518         sub_data_stream = (rep->sub_data_streams[i / 2] >> 16);
2519       }
2520       Uint32 sub_data_stream_number = find_sub_data_stream_number(sub_data_stream);
2521       if (bucket->m_gcp_complete_rep_sub_data_streams.get(sub_data_stream_number))
2522       {
2523         // Received earlier. This must be a duplicate from the takeover node.
2524         already_counted ++;
2525       }
2526       else
2527       {
2528         bucket->m_gcp_complete_rep_sub_data_streams.set(sub_data_stream_number);
2529       }
2530     }
2531     assert(already_counted <= cnt);
2532     if (already_counted <= cnt)
2533     {
2534       cnt -= already_counted;
2535       if (cnt == 0)
2536       {
2537         // All sub data streams are already reported as completed for epoch
2538         // So data for all streams reported in this signal have been sent
2539         // twice but from two different nodes.  Ignore this duplicate report.
2540         DBUG_VOID_RETURN_EVENT;
2541       }
2542     }
2543   }
2544 
2545   if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
2546   {
2547     bucket->m_state = Gci_container::GC_INCONSISTENT;
2548   }
2549 
2550   Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
2551   if(unlikely(old_cnt == ~(Uint32)0))
2552   {
2553     old_cnt = m_total_buckets;
2554   }
2555 
2556   //assert(old_cnt >= cnt);
2557   if (unlikely(! (old_cnt >= cnt)))
2558   {
2559     crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, len, old_cnt, cnt);
2560   }
2561   bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2562 
2563   if(old_cnt == cnt)
2564   {
2565     Uint64 minGCI = m_known_gci[m_min_gci_index];
2566     if(likely(minGCI == 0 || gci == minGCI))
2567     {
2568   do_complete:
2569       m_startup_hack = false;
2570        bool gapBegins = false;
2571 
2572       // if there is a gap, mark the gap boundary
2573        if (m_event_buffer_manager.onEpochCompleted(gci, gapBegins))
2574         reportStatus();
2575 
2576       // if a new gap begins, mark the bucket.
2577        if (gapBegins)
2578          bucket->m_state |= Gci_container::GC_OUT_OF_MEMORY;
2579 
2580       complete_bucket(bucket);
2581       m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
2582       reportStatus();
2583 
2584       if(unlikely(m_latest_complete_GCI > gci))
2585       {
2586 	complete_outof_order_gcis();
2587       }
2588 
2589       // signal that somethings happened
2590 
2591       NdbCondition_Signal(p_cond);
2592     }
2593     else
2594     {
2595       if (unlikely(m_startup_hack))
2596       {
2597         flushIncompleteEvents(gci);
2598         bucket = find_bucket(gci);
2599         assert(bucket);
2600         assert(bucket->m_gci == gci);
2601         goto do_complete;
2602       }
2603       /** out of order something */
2604       g_eventLogger->info("out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2605                           (int)(bucket-(Gci_container*)m_active_gci.getBase()),
2606                           Uint32(gci >> 32), Uint32(gci),
2607                           Uint32(minGCI >> 32), Uint32(minGCI),
2608                           Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2609       bucket->m_state = Gci_container::GC_COMPLETE;
2610       bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
2611       m_latest_complete_GCI = gci;
2612     }
2613   }
2614 
2615   DBUG_VOID_RETURN_EVENT;
2616 }
2617 
2618 void
complete_outof_order_gcis()2619 NdbEventBuffer::complete_outof_order_gcis()
2620 {
2621 #ifdef VM_TRACE
2622   verify_known_gci(false);
2623 #endif
2624 
2625   Uint64 * array = m_known_gci.getBase();
2626   Uint32 mask = m_known_gci.size() - 1;
2627   Uint32 minpos = m_min_gci_index;
2628   Uint32 maxpos = m_max_gci_index;
2629   Uint64 stop_gci = m_latest_complete_GCI;
2630 
2631   Uint64 start_gci = array[minpos];
2632   g_eventLogger->info("complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2633                       Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2634                       Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2635 
2636   assert(start_gci <= stop_gci);
2637   do
2638   {
2639     start_gci = array[minpos];
2640     Gci_container* bucket = find_bucket(start_gci);
2641     assert(bucket);
2642     assert(maxpos == m_max_gci_index);
2643     if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
2644     {
2645 #ifdef VM_TRACE
2646       verify_known_gci(false);
2647 #endif
2648       return;
2649     }
2650 
2651 #ifdef VM_TRACE
2652     ndbout_c("complete_outof_order_gcis - completing %u/%u rows: %u",
2653              Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2654 #else
2655     ndbout_c("complete_outof_order_gcis - completing %u/%u",
2656              Uint32(start_gci >> 32), Uint32(start_gci));
2657 #endif
2658 
2659     complete_bucket(bucket);
2660     m_latestGCI = m_complete_data.m_gci = start_gci;
2661 
2662 #ifdef VM_TRACE
2663     verify_known_gci(true);
2664 #endif
2665     minpos = (minpos + 1) & mask;
2666   } while (start_gci != stop_gci);
2667 }
2668 
2669 void
insert_event(NdbEventOperationImpl * impl,SubTableData & data,LinearSectionPtr * ptr,Uint32 & oid_ref)2670 NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
2671                              SubTableData &data,
2672                              LinearSectionPtr *ptr,
2673                              Uint32 &oid_ref)
2674 {
2675   DBUG_PRINT("info", ("gci{hi/lo}: %u/%u 0x%x %s",
2676                       data.gci_hi, data.gci_lo, m_ndb->getReference(),
2677                       m_ndb->getNdbObjectName()));
2678   do
2679   {
2680     if (impl->m_stop_gci == ~Uint64(0))
2681     {
2682       oid_ref = impl->m_oid;
2683       insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2684     }
2685     NdbEventOperationImpl* blob_op = impl->theBlobOpList;
2686     while (blob_op != NULL)
2687     {
2688       if (blob_op->m_stop_gci == ~Uint64(0))
2689       {
2690         oid_ref = blob_op->m_oid;
2691         insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2692       }
2693       blob_op = blob_op->m_next;
2694     }
2695   } while((impl = impl->m_next));
2696 }
2697 
2698 bool
find_max_known_gci(Uint64 * res) const2699 NdbEventBuffer::find_max_known_gci(Uint64 * res) const
2700 {
2701   const Uint64 * array = m_known_gci.getBase();
2702   Uint32 mask = m_known_gci.size() - 1;
2703   Uint32 minpos = m_min_gci_index;
2704   Uint32 maxpos = m_max_gci_index;
2705 
2706   if (minpos == maxpos)
2707     return false;
2708 
2709   if (res)
2710   {
2711     * res = array[(maxpos - 1) & mask];
2712   }
2713 
2714   return true;
2715 }
2716 
2717 void
handle_change_nodegroup(const SubGcpCompleteRep * rep)2718 NdbEventBuffer::handle_change_nodegroup(const SubGcpCompleteRep* rep)
2719 {
2720   Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2721   Uint32 cnt = (rep->flags >> 16);
2722   Uint64 * array = m_known_gci.getBase();
2723   Uint32 mask = m_known_gci.size() - 1;
2724   Uint32 minpos = m_min_gci_index;
2725   Uint32 maxpos = m_max_gci_index;
2726 
2727   if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2728   {
2729     ndbout_c("handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2730              cnt, Uint32(gci >> 32), Uint32(gci));
2731 
2732     Uint32 found = 0;
2733     Uint32 pos = minpos;
2734     for (; pos != maxpos; pos = (pos + 1) & mask)
2735     {
2736       if (array[pos] == gci)
2737       {
2738         Gci_container* tmp = find_bucket(array[pos]);
2739         if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2740         {
2741           found = 1;
2742           ndbout_c(" - gci %u/%u already marked complete",
2743                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2744           break;
2745         }
2746         else
2747         {
2748           found = 2;
2749           ndbout_c(" - gci %u/%u marking (and increasing)",
2750                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2751           tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2752           tmp->m_gcp_complete_rep_count += cnt;
2753           break;
2754         }
2755       }
2756       else
2757       {
2758         ndbout_c(" - ignore %u/%u",
2759                  Uint32(array[pos] >> 32), Uint32(array[pos]));
2760       }
2761     }
2762 
2763     if (found == 0)
2764     {
2765       ndbout_c(" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2766       return;
2767     }
2768 
2769     if (found == 1)
2770     {
2771       return; // Nothing todo
2772     }
2773 
2774     m_total_buckets += cnt;
2775 
2776     pos = (pos + 1) & mask;
2777     for (; pos != maxpos; pos = (pos + 1) & mask)
2778     {
2779       assert(array[pos] > gci);
2780       Gci_container* tmp = find_bucket(array[pos]);
2781       assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2782       tmp->m_gcp_complete_rep_count += cnt;
2783       ndbout_c(" - increasing cnt on %u/%u by %u",
2784                Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2785     }
2786   }
2787   else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2788   {
2789     ndbout_c("handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2790              cnt, Uint32(gci >> 32), Uint32(gci));
2791 
2792     Uint32 found = 0;
2793     Uint32 pos = minpos;
2794     for (; pos != maxpos; pos = (pos + 1) & mask)
2795     {
2796       if (array[pos] == gci)
2797       {
2798         Gci_container* tmp = find_bucket(array[pos]);
2799         if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2800         {
2801           found = 1;
2802           ndbout_c(" - gci %u/%u already marked complete",
2803                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2804           break;
2805         }
2806         else
2807         {
2808           found = 2;
2809           ndbout_c(" - gci %u/%u marking",
2810                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2811           tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2812           break;
2813         }
2814       }
2815       else
2816       {
2817         ndbout_c(" - ignore %u/%u",
2818                  Uint32(array[pos] >> 32), Uint32(array[pos]));
2819       }
2820     }
2821 
2822     if (found == 0)
2823     {
2824       ndbout_c(" - NOT FOUND");
2825       return;
2826     }
2827 
2828     if (found == 1)
2829     {
2830       return; // Nothing todo
2831     }
2832 
2833     m_total_buckets -= cnt;
2834 
2835     pos = (pos + 1) & mask;
2836     for (; pos != maxpos; pos = (pos + 1) & mask)
2837     {
2838       assert(array[pos] > gci);
2839       Gci_container* tmp = find_bucket(array[pos]);
2840       assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2841       tmp->m_gcp_complete_rep_count -= cnt;
2842       ndbout_c(" - decreasing cnt on %u/%u by %u to: %u",
2843                Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2844                cnt,
2845                tmp->m_gcp_complete_rep_count);
2846     }
2847   }
2848 }
2849 
2850 Uint16
find_sub_data_stream_number(Uint16 sub_data_stream)2851 NdbEventBuffer::find_sub_data_stream_number(Uint16 sub_data_stream)
2852 {
2853   /*
2854    * The stream_index calculated will be the one returned unless
2855    * Suma have been changed to calculate stream identifiers in a
2856    * non compatible way.  In that case a linear search in the
2857    * fixed size hash table will resolve the correct index.
2858    */
2859   const Uint16 stream_index = (sub_data_stream % 256) + MAX_SUB_DATA_STREAMS_PER_GROUP * (sub_data_stream / 256 - 1);
2860   const Uint16 num0 = stream_index % NDB_ARRAY_SIZE(m_sub_data_streams);
2861   Uint32 num = num0;
2862   while (m_sub_data_streams[num] != sub_data_stream)
2863   {
2864     if (m_sub_data_streams[num] == 0)
2865     {
2866       m_sub_data_streams[num] = sub_data_stream;
2867       break;
2868     }
2869     num = (num + 1) % NDB_ARRAY_SIZE(m_sub_data_streams);
2870     require(num != num0);
2871   }
2872   return num;
2873 }
2874 
2875 void
set_total_buckets(Uint32 cnt)2876 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2877 {
2878   if (m_total_buckets == cnt)
2879     return;
2880 
2881   assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2882   m_total_buckets = cnt;
2883 
2884   Uint64 * array = m_known_gci.getBase();
2885   Uint32 mask = m_known_gci.size() - 1;
2886   Uint32 minpos = m_min_gci_index;
2887   Uint32 maxpos = m_max_gci_index;
2888 
2889   bool found = false;
2890   Uint32 pos = minpos;
2891   for (; pos != maxpos; pos = (pos + 1) & mask)
2892   {
2893     Gci_container* tmp = find_bucket(array[pos]);
2894     if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2895     {
2896       found = true;
2897       if (0)
2898         ndbout_c("set_total_buckets(%u) complete %u/%u",
2899                  cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2900       tmp->m_gcp_complete_rep_count = 0;
2901       complete_bucket(tmp);
2902     }
2903     else
2904     {
2905       assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2906       tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2907     }
2908   }
2909   if (found)
2910   {
2911     NdbCondition_Signal(p_cond);
2912   }
2913 }
2914 
2915 void
report_node_failure_completed(Uint32 node_id)2916 NdbEventBuffer::report_node_failure_completed(Uint32 node_id)
2917 {
2918   assert(node_id < 32 * m_alive_node_bit_mask.Size); // only data-nodes
2919   if (! (node_id < 32 * m_alive_node_bit_mask.Size))
2920     return;
2921 
2922   m_alive_node_bit_mask.clear(node_id);
2923 
2924   NdbEventOperation* op= m_ndb->getEventOperation(0);
2925   if (op == 0)
2926     return;
2927 
2928   DBUG_ENTER("NdbEventBuffer::report_node_failure_completed");
2929   SubTableData data;
2930   LinearSectionPtr ptr[3];
2931   bzero(&data, sizeof(data));
2932   bzero(ptr, sizeof(ptr));
2933 
2934   data.tableId = ~0;
2935   data.requestInfo = 0;
2936   SubTableData::setOperation(data.requestInfo,
2937 			     NdbDictionary::Event::_TE_NODE_FAILURE);
2938   SubTableData::setReqNodeId(data.requestInfo, node_id);
2939   SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2940   data.flags = SubTableData::LOG;
2941 
2942   Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2943   find_max_known_gci(&gci);
2944 
2945   data.gci_hi = Uint32(gci >> 32);
2946   data.gci_lo = Uint32(gci);
2947 
2948   /**
2949    * Insert this event for each operation
2950    */
2951   // no need to lock()/unlock(), receive thread calls this
2952   insert_event(&op->m_impl, data, ptr, data.senderData);
2953 
2954   if (!m_alive_node_bit_mask.isclear())
2955     DBUG_VOID_RETURN;
2956 
2957   /*
2958    * Cluster failure
2959    */
2960 
2961   DBUG_PRINT("info", ("Cluster failure 0x%x %s", m_ndb->getReference(),
2962                       m_ndb->getNdbObjectName()));
2963 
2964   gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2965   bool found = find_max_known_gci(&gci);
2966 
2967   Uint64 * array = m_known_gci.getBase();
2968   Uint32 mask = m_known_gci.size() - 1;
2969   Uint32 minpos = m_min_gci_index;
2970   Uint32 maxpos = m_max_gci_index;
2971 
2972   while (minpos != maxpos && array[minpos] != gci)
2973   {
2974     Gci_container* tmp = find_bucket(array[minpos]);
2975     assert(tmp);
2976     assert(maxpos == m_max_gci_index);
2977 
2978     if(!tmp->m_data.is_empty())
2979     {
2980       free_list(tmp->m_data);
2981     }
2982     tmp->~Gci_container();
2983     bzero(tmp, sizeof(Gci_container));
2984 
2985     minpos = (minpos + 1) & mask;
2986   }
2987   m_min_gci_index = minpos;
2988   if (found)
2989   {
2990     assert(((minpos + 1) & mask) == maxpos);
2991   }
2992   else
2993   {
2994     assert(minpos == maxpos);
2995   }
2996 
2997   /**
2998    * Inject new event
2999    */
3000   data.tableId = ~0;
3001   data.requestInfo = 0;
3002   SubTableData::setOperation(data.requestInfo,
3003 			     NdbDictionary::Event::_TE_CLUSTER_FAILURE);
3004 
3005   /**
3006    * Insert this event for each operation
3007    */
3008   // no need to lock()/unlock(), receive thread calls this
3009   insert_event(&op->m_impl, data, ptr, data.senderData);
3010 
3011   /**
3012    * Mark that event buffer is containing a failure event
3013    */
3014   m_failure_detected= true;
3015 
3016 #ifdef VM_TRACE
3017   m_flush_gci = 0;
3018 #endif
3019 
3020   /**
3021    * And finally complete this GCI
3022    */
3023   Gci_container* tmp = find_bucket(gci);
3024   assert(tmp);
3025   if (found)
3026   {
3027     assert(m_max_gci_index == maxpos); // shouldnt have changed...
3028   }
3029   else
3030   {
3031     assert(m_max_gci_index == ((maxpos + 1) & mask));
3032   }
3033   Uint32 cnt = tmp->m_gcp_complete_rep_count;
3034 
3035   SubGcpCompleteRep rep;
3036   rep.gci_hi= (Uint32)(gci >> 32);
3037   rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
3038   rep.gcp_complete_rep_count= cnt;
3039   rep.flags = 0;
3040   execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength, 1);
3041 
3042   DBUG_VOID_RETURN;
3043 }
3044 
3045 Uint64
getLatestGCI()3046 NdbEventBuffer::getLatestGCI()
3047 {
3048   /*
3049    * TODO: Fix data race with m_latestGCI.
3050    * m_latestGCI is changed by receiver thread, and getLatestGCI
3051    * is called from application thread.
3052    */
3053   return m_latestGCI;
3054 }
3055 
3056 Uint64
getHighestQueuedEpoch()3057 NdbEventBuffer::getHighestQueuedEpoch()
3058 {
3059   return m_latest_poll_GCI;
3060 }
3061 
3062 int
insertDataL(NdbEventOperationImpl * op,const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3])3063 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
3064 			    const SubTableData * const sdata,
3065                             Uint32 len,
3066 			    LinearSectionPtr ptr[3])
3067 {
3068   DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
3069   const Uint32 ri = sdata->requestInfo;
3070   const Uint32 operation = SubTableData::getOperation(ri);
3071   Uint32 gci_hi = sdata->gci_hi;
3072   Uint32 gci_lo = sdata->gci_lo;
3073 
3074   if (unlikely(len < SubTableData::SignalLength))
3075   {
3076     gci_lo = 0;
3077   }
3078 
3079   Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
3080   const bool is_data_event =
3081     operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
3082 
3083   if (!is_data_event)
3084   {
3085     if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
3086     {
3087       /*
3088         Mark event as stopping.  Subsequent dropEventOperation
3089         will add the event to the dropped list for delete
3090       */
3091       op->m_stop_gci = gci;
3092     }
3093     else if (operation == NdbDictionary::Event::_TE_ACTIVE)
3094     {
3095       // internal event, do not relay to user
3096       DBUG_PRINT("info",
3097                  ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u 0x%x %s",
3098                   op->m_ref_count, op, SubTableData::getNdbdNodeId(ri),
3099                   m_ndb->getReference(), m_ndb->getNdbObjectName()));
3100       DBUG_RETURN_EVENT(0);
3101     }
3102     else if (operation == NdbDictionary::Event::_TE_STOP)
3103     {
3104       // internal event, do not relay to user
3105       DBUG_PRINT("info",
3106                  ("_TE_STOP: m_ref_count: %u for op: %p id: %u 0x%x %s",
3107                   op->m_ref_count, op, SubTableData::getNdbdNodeId(ri),
3108                   m_ndb->getReference(), m_ndb->getNdbObjectName()));
3109       DBUG_RETURN_EVENT(0);
3110     }
3111   }
3112 
3113   Uint32 memory_usage = (m_max_alloc  == 0) ? 0 :
3114     (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_max_alloc);
3115 
3116   if (m_event_buffer_manager.onEventDataReceived(memory_usage, gci))
3117     reportStatus(true/*force_report*/);
3118 
3119   if (m_event_buffer_manager.isEventDataToBeDiscarded(gci))
3120   {
3121     DBUG_RETURN_EVENT(0);
3122   }
3123 
3124   if ( likely((Uint32)op->mi_type & (1U << operation)))
3125   {
3126     Gci_container* bucket= find_bucket(gci);
3127 
3128     DBUG_PRINT_EVENT("info", ("data insertion in eventId %d 0x%x %s",
3129                               op->m_eventId, m_ndb->getReference(),
3130                               m_ndb->getNdbObjectName()));
3131     DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
3132                               sdata->gci, sdata->tableId,
3133 			      SubTableData::getOperation(sdata->requestInfo),
3134                               SubTableData::getReqNodeId(sdata->requestInfo)));
3135 
3136     if (unlikely(bucket == 0))
3137     {
3138       /**
3139        * Already completed GCI...
3140        *   Possible in case of resend during NF handling
3141        */
3142       DBUG_RETURN_EVENT(0);
3143     }
3144 
3145     const bool is_blob_event = (op->theMainOp != NULL);
3146     const bool use_hash =  op->m_mergeEvents && is_data_event;
3147 
3148     if (! is_data_event && is_blob_event)
3149     {
3150       // currently subscribed to but not used
3151       DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table 0x%x %s",
3152                                 m_ndb->getReference(), m_ndb->getNdbObjectName()));
3153       DBUG_RETURN_EVENT(0);
3154     }
3155 
3156     // find position in bucket hash table
3157     EventBufData* data = 0;
3158     EventBufData_hash::Pos hpos;
3159     if (use_hash)
3160     {
3161       bucket->m_data_hash.search(hpos, op, ptr);
3162       data = hpos.data;
3163     }
3164 
3165     if (data == 0)
3166     {
3167       // allocate new result buffer
3168       data = alloc_data();  // alloc_data crashes if allocation fails.
3169 
3170       m_event_buffer_manager.onBufferingEpoch(gci);
3171 
3172       // Initialize m_event_op, in case copy_data fails due to insufficient memory
3173       data->m_event_op = 0;
3174       if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
3175       {
3176         crashMemAllocError("insertDataL : copy_data failed.");
3177       }
3178       data->m_event_op = op;
3179       if (! is_blob_event || ! is_data_event)
3180       {
3181         bucket->m_data.append_data(data);
3182       }
3183       else
3184       {
3185         // find or create main event for this blob event
3186         EventBufData_hash::Pos main_hpos;
3187         int ret = get_main_data(bucket, main_hpos, data);
3188         if (ret == -1)
3189         {
3190           crashMemAllocError("insertDataL : get_main_data failed.");
3191         }
3192 
3193         EventBufData* main_data = main_hpos.data;
3194         if (ret != 0) // main event was created
3195         {
3196           main_data->m_event_op = op->theMainOp;
3197           bucket->m_data.append_data(main_data);
3198           if (use_hash)
3199           {
3200             main_data->m_pkhash = main_hpos.pkhash;
3201             bucket->m_data_hash.append(main_hpos, main_data);
3202           }
3203         }
3204         // link blob event under main event
3205         add_blob_data(bucket, main_data, data);
3206       }
3207       if (use_hash)
3208       {
3209         data->m_pkhash = hpos.pkhash;
3210         bucket->m_data_hash.append(hpos, data);
3211       }
3212 #ifdef VM_TRACE
3213       op->m_data_count++;
3214 #endif
3215     }
3216     else
3217     {
3218       // event with same op, PK found, merge into old buffer
3219       if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
3220       {
3221         crashMemAllocError("insertDataL : merge_data failed.");
3222       }
3223 
3224       // merge is on so we do not report blob part events
3225       if (! is_blob_event) {
3226         // report actual operation and the composite
3227         // there is no way to "fix" the flags for a composite op
3228         // since the flags represent multiple ops on multiple PKs
3229         // XXX fix by doing merge at end of epoch (extra mem cost)
3230         {
3231           EventBufData_list::Gci_op g = { op, (1U << operation) };
3232           bucket->m_data.add_gci_op(g);
3233         }
3234         {
3235           EventBufData_list::Gci_op
3236 	    g = { op,
3237 		  (1U << SubTableData::getOperation(data->sdata->requestInfo))};
3238           bucket->m_data.add_gci_op(g);
3239         }
3240       }
3241     }
3242 #ifdef NDB_EVENT_VERIFY_SIZE
3243     verify_size(bucket->m_data);
3244 #endif
3245     DBUG_RETURN_EVENT(0);
3246   }
3247 
3248 #ifdef VM_TRACE
3249   if ((Uint32)op->m_eventImpl->mi_type & (1U << operation))
3250   {
3251     DBUG_PRINT_EVENT("info",("Data arrived before ready eventId %d 0x%x %s",
3252                              op->m_eventId, m_ndb->getReference(),
3253                              m_ndb->getNdbObjectName()));
3254     DBUG_RETURN_EVENT(0);
3255   }
3256   else {
3257     DBUG_PRINT_EVENT("info",("skipped 0x%x %s", m_ndb->getReference(),
3258                              m_ndb->getNdbObjectName()));
3259     DBUG_RETURN_EVENT(0);
3260   }
3261 #else
3262   DBUG_RETURN_EVENT(0);
3263 #endif
3264 }
3265 
3266 void
crashMemAllocError(const char * error_text)3267 NdbEventBuffer::crashMemAllocError(const char *error_text)
3268 {
3269   g_eventLogger->error("Ndb Event Buffer 0x%x %s", m_ndb->getReference(),
3270 	  m_ndb->getNdbObjectName());
3271   g_eventLogger->error("Ndb Event Buffer : %s", error_text);
3272   g_eventLogger->error("Ndb Event Buffer : Fatal error.");
3273   exit(-1);
3274 }
3275 
3276 // allocate EventBufData
3277 EventBufData*
alloc_data()3278 NdbEventBuffer::alloc_data()
3279 {
3280   DBUG_ENTER_EVENT("alloc_data");
3281   EventBufData* data = m_free_data;
3282 
3283   if (unlikely(data == 0))
3284   {
3285 #ifdef VM_TRACE
3286     assert(m_free_data_count == 0);
3287     assert(m_free_data_sz == 0);
3288 #endif
3289     expand(4000);
3290     reportStatus();
3291 
3292     data = m_free_data;
3293     if (unlikely(data == 0))
3294     {
3295 #ifdef VM_TRACE
3296       printf("m_latest_command: %s 0x%x %s\n",
3297              m_latest_command, m_ndb->getReference(), m_ndb->getNdbObjectName());
3298       printf("no free data, m_latestGCI %u/%u\n",
3299              (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
3300       printf("m_free_data_count %d\n", m_free_data_count);
3301       printf("m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
3302              m_available_data.m_count,
3303              m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
3304              m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
3305              m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
3306              m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
3307       printf("m_used_data_count %d\n", m_used_data.m_count);
3308 #endif
3309       crashMemAllocError("alloc_data : Allocation of meta data failed.");
3310     }
3311   }
3312 
3313   // remove data from free list
3314   if (data->m_next_blob == 0)
3315     m_free_data = data->m_next;
3316   else {
3317     EventBufData* data2 = data->m_next_blob;
3318     if (data2->m_next == 0) {
3319       data->m_next_blob = data2->m_next_blob;
3320       data = data2;
3321     } else {
3322       EventBufData* data3 = data2->m_next;
3323       data2->m_next = data3->m_next;
3324       data = data3;
3325     }
3326   }
3327   data->m_next = 0;
3328   data->m_next_blob = 0;
3329 #ifdef VM_TRACE
3330   m_free_data_count--;
3331   assert(m_free_data_sz >= data->sz);
3332 #endif
3333   m_free_data_sz -= data->sz;
3334   DBUG_RETURN_EVENT(data);
3335 }
3336 
3337 // allocate initial or bigger memory area in EventBufData
3338 // takes sizes from given ptr and sets up data->ptr
3339 int
alloc_mem(EventBufData * data,LinearSectionPtr ptr[3],Uint32 * change_sz)3340 NdbEventBuffer::alloc_mem(EventBufData* data,
3341                           LinearSectionPtr ptr[3],
3342                           Uint32 * change_sz)
3343 {
3344   DBUG_ENTER("NdbEventBuffer::alloc_mem");
3345   DBUG_PRINT("info", ("ptr sz %u + %u + %u 0x%x %s",
3346                       ptr[0].sz, ptr[1].sz, ptr[2].sz, m_ndb->getReference(),
3347                       m_ndb->getNdbObjectName()));
3348   const Uint32 min_alloc_size = 128;
3349 
3350   Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
3351   Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
3352   if (alloc_size < min_alloc_size)
3353     alloc_size = min_alloc_size;
3354 
3355   if (data->sz < alloc_size)
3356   {
3357     Uint32 add_sz = alloc_size - data->sz;
3358 
3359     NdbMem_Free((char*)data->memory);
3360     assert(m_total_alloc >= data->sz);
3361     data->memory = 0;
3362 
3363     data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
3364     if (data->memory == 0)
3365     {
3366       goto out_of_mem_err;
3367     }
3368     data->sz = alloc_size;
3369     m_total_alloc += add_sz;
3370 
3371     if (change_sz != NULL)
3372       *change_sz += add_sz;
3373   }
3374   {
3375   Uint32* memptr = data->memory;
3376   memptr += sz4;
3377   int i;
3378   for (i = 0; i <= 2; i++)
3379   {
3380     data->ptr[i].p = memptr;
3381     data->ptr[i].sz = ptr[i].sz;
3382     memptr += ptr[i].sz;
3383   }
3384   }
3385   DBUG_RETURN(0);
3386 
3387 out_of_mem_err:
3388   // Dealloc succeeded, but alloc bigger size failed
3389   crashMemAllocError("Attempt to allocate memory from OS failed");
3390   DBUG_RETURN(-1);
3391 }
3392 
3393 void
dealloc_mem(EventBufData * data,Uint32 * change_sz)3394 NdbEventBuffer::dealloc_mem(EventBufData* data,
3395                             Uint32 * change_sz)
3396 {
3397   NdbMem_Free((char*)data->memory);
3398   assert(m_total_alloc >= data->sz);
3399   m_total_alloc -= data->sz;
3400   if (change_sz != NULL) {
3401     assert(*change_sz >= data->sz);
3402     *change_sz -= data->sz;
3403   }
3404   data->memory = 0;
3405   data->sz = 0;
3406 }
3407 
3408 int
copy_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3],EventBufData * data,Uint32 * change_sz)3409 NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
3410                           LinearSectionPtr ptr[3],
3411                           EventBufData* data,
3412                           Uint32 * change_sz)
3413 {
3414   DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
3415 
3416   if (alloc_mem(data, ptr, change_sz) != 0)
3417     DBUG_RETURN_EVENT(-1);
3418   memcpy(data->sdata, sdata, sizeof(SubTableData));
3419 
3420   if (unlikely(len < SubTableData::SignalLength))
3421   {
3422     data->sdata->gci_lo = 0;
3423   }
3424   if (len < SubTableData::SignalLengthWithTransId)
3425   {
3426     /* No TransId, set to uninit value */
3427     data->sdata->transId1 = ~Uint32(0);
3428     data->sdata->transId2 = ~Uint32(0);
3429   }
3430 
3431   int i;
3432   for (i = 0; i <= 2; i++)
3433     memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
3434   DBUG_RETURN_EVENT(0);
3435 }
3436 
3437 static struct Ev_t {
3438   enum {
3439     enum_INS = NdbDictionary::Event::_TE_INSERT,
3440     enum_DEL = NdbDictionary::Event::_TE_DELETE,
3441     enum_UPD = NdbDictionary::Event::_TE_UPDATE,
3442     enum_NUL = NdbDictionary::Event::_TE_NUL,
3443     enum_IDM = 254,     // idempotent op possibly allowed on NF
3444     enum_ERR = 255      // always impossible
3445   };
3446   int t1, t2, t3;
3447 } ev_t[] = {
3448   { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
3449   { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok
3450   { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok
3451   { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok
3452   { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
3453   { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
3454   { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
3455   { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok
3456   { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD }  //ok
3457 };
3458 
3459 /*
3460  *   | INS            | DEL              | UPD
3461  * 0 | pk ah + all ah | pk ah            | pk ah + new ah
3462  * 1 | pk ad + all ad | old pk ad        | new pk ad + new ad
3463  * 2 | empty          | old non-pk ah+ad | old ah+ad
3464  */
3465 
3466 static AttributeHeader
copy_head(Uint32 & i1,Uint32 * p1,Uint32 & i2,const Uint32 * p2,Uint32 flags)3467 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
3468           Uint32 flags)
3469 {
3470   AttributeHeader ah(p2[i2]);
3471   bool do_copy = (flags & 1);
3472   if (do_copy)
3473     p1[i1] = p2[i2];
3474   i1++;
3475   i2++;
3476   return ah;
3477 }
3478 
3479 static void
copy_attr(AttributeHeader ah,Uint32 & j1,Uint32 * p1,Uint32 & j2,const Uint32 * p2,Uint32 flags)3480 copy_attr(AttributeHeader ah,
3481           Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
3482           Uint32 flags)
3483 {
3484   bool do_copy = (flags & 1);
3485   bool with_head = (flags & 2);
3486   Uint32 n = with_head + ah.getDataSize();
3487   if (do_copy)
3488   {
3489     Uint32 k;
3490     for (k = 0; k < n; k++)
3491       p1[j1 + k] = p2[j2 + k];
3492   }
3493   j1 += n;
3494   j2 += n;
3495 }
3496 
3497 int
merge_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr2[3],EventBufData * data,Uint32 * change_sz)3498 NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
3499                            LinearSectionPtr ptr2[3],
3500                            EventBufData* data,
3501                            Uint32 * change_sz)
3502 {
3503   DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
3504 
3505   /* TODO : Consider how/if to merge multiple events/key with different
3506    * transid
3507    * Same consideration probably applies to AnyValue!
3508    */
3509 
3510   Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
3511 
3512   int t1 = SubTableData::getOperation(data->sdata->requestInfo);
3513   int t2 = SubTableData::getOperation(sdata->requestInfo);
3514   if (t1 == Ev_t::enum_NUL)
3515     DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
3516 
3517   Ev_t* tp = 0;
3518   int i;
3519   for (i = 0; (uint) i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
3520     if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
3521       tp = &ev_t[i];
3522       break;
3523     }
3524   }
3525   assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
3526 
3527   if (tp->t3 == Ev_t::enum_IDM) {
3528     LinearSectionPtr (&ptr1)[3] = data->ptr;
3529 
3530     /*
3531      * TODO
3532      * - can get data in INS ptr2[2] which is supposed to be empty
3533      * - can get extra data in DEL ptr2[2]
3534      * - why does DBUG_PRINT not work in this file ???
3535      *
3536      * replication + bug#19872 can ignore this since merge is on
3537      * only for tables with explicit PK and before data is not used
3538      */
3539     const int maxsec = 1; // ignore section 2
3540 
3541     int i;
3542     for (i = 0; i <= maxsec; i++) {
3543       if (ptr1[i].sz != ptr2[i].sz ||
3544           memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
3545         DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d 0x%x %s",
3546                             tp->t1, tp->t2, i, m_ndb->getReference(),
3547                             m_ndb->getNdbObjectName()));
3548         assert(false);
3549         DBUG_RETURN_EVENT(-1);
3550       }
3551     }
3552     DBUG_PRINT("info", ("idempotent op %d*%d data ok 0x%x %s",
3553                         tp->t1, tp->t2, m_ndb->getReference(),
3554                         m_ndb->getNdbObjectName()));
3555     DBUG_RETURN_EVENT(0);
3556   }
3557 
3558   // TODO: use old data items, avoid malloc/free on each merge
3559 
3560   // save old data
3561   EventBufData olddata = *data;
3562   data->memory = 0;
3563   data->sz = 0;
3564 
3565   // compose ptr1 o ptr2 = ptr
3566   LinearSectionPtr (&ptr1)[3] = olddata.ptr;
3567   LinearSectionPtr (&ptr)[3] = data->ptr;
3568 
3569   // loop twice where first loop only sets sizes
3570   int loop;
3571   int result = 0;
3572   for (loop = 0; loop <= 1; loop++)
3573   {
3574     if (loop == 1)
3575     {
3576       if (alloc_mem(data, ptr, change_sz) != 0)
3577       {
3578         result = -1;
3579         goto end;
3580       }
3581       *data->sdata = *sdata;
3582       SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
3583     }
3584 
3585     ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
3586 
3587     // copy pk from new version
3588     {
3589       AttributeHeader ah;
3590       Uint32 i = 0;
3591       Uint32 j = 0;
3592       Uint32 i2 = 0;
3593       Uint32 j2 = 0;
3594       while (i < nkey)
3595       {
3596         ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
3597         copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3598       }
3599       ptr[0].sz = i;
3600       ptr[1].sz = j;
3601     }
3602 
3603     // merge after values, new version overrides
3604     if (tp->t3 != Ev_t::enum_DEL)
3605     {
3606       AttributeHeader ah;
3607       Uint32 i = ptr[0].sz;
3608       Uint32 j = ptr[1].sz;
3609       Uint32 i1 = 0;
3610       Uint32 j1 = 0;
3611       Uint32 i2 = nkey;
3612       Uint32 j2 = ptr[1].sz;
3613       while (i1 < nkey)
3614       {
3615         j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
3616       }
3617       while (1)
3618       {
3619         bool b1 = (i1 < ptr1[0].sz);
3620         bool b2 = (i2 < ptr2[0].sz);
3621         if (b1 && b2)
3622         {
3623           Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
3624           Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
3625           if (id1 < id2)
3626             b2 = false;
3627           else if (id1 > id2)
3628             b1 = false;
3629           else
3630           {
3631             j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
3632             b1 = false;
3633           }
3634         }
3635         if (b1)
3636         {
3637           ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
3638           copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
3639         }
3640         else if (b2)
3641         {
3642           ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
3643           copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3644         }
3645         else
3646           break;
3647       }
3648       ptr[0].sz = i;
3649       ptr[1].sz = j;
3650     }
3651 
3652     // merge before values, old version overrides
3653     if (tp->t3 != Ev_t::enum_INS)
3654     {
3655       AttributeHeader ah;
3656       Uint32 k = 0;
3657       Uint32 k1 = 0;
3658       Uint32 k2 = 0;
3659       while (1)
3660       {
3661         bool b1 = (k1 < ptr1[2].sz);
3662         bool b2 = (k2 < ptr2[2].sz);
3663         if (b1 && b2)
3664         {
3665           Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
3666           Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
3667           if (id1 < id2)
3668             b2 = false;
3669           else if (id1 > id2)
3670             b1 = false;
3671           else
3672           {
3673             k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
3674             b2 = false;
3675           }
3676         }
3677         if (b1)
3678         {
3679           ah = AttributeHeader(ptr1[2].p[k1]);
3680           copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3681         }
3682         else if (b2)
3683         {
3684           ah = AttributeHeader(ptr2[2].p[k2]);
3685           copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3686         }
3687         else
3688           break;
3689       }
3690       ptr[2].sz = k;
3691     }
3692   }
3693 
3694 end:
3695   dealloc_mem(&olddata, change_sz);
3696   DBUG_RETURN_EVENT(result);
3697 }
3698 
3699 /*
3700  * Given blob part event, find main table event on inline part.  It
3701  * should exist (force in TUP) but may arrive later.  If so, create
3702  * NUL event on main table.  The real event replaces it later.
3703  */
3704 
3705 int
get_main_data(Gci_container * bucket,EventBufData_hash::Pos & hpos,EventBufData * blob_data)3706 NdbEventBuffer::get_main_data(Gci_container* bucket,
3707                               EventBufData_hash::Pos& hpos,
3708                               EventBufData* blob_data)
3709 {
3710   DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
3711 
3712   int blobVersion = blob_data->m_event_op->theBlobVersion;
3713   assert(blobVersion == 1 || blobVersion == 2);
3714 
3715   NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
3716   assert(main_op != NULL);
3717   const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3718 
3719   // create LinearSectionPtr for main table key
3720   LinearSectionPtr ptr[3];
3721 
3722   Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3723   Uint32* pk_data = blob_data->ptr[1].p;
3724   Uint32 pk_size = 0;
3725 
3726   if (unlikely(blobVersion == 1)) {
3727     /*
3728      * Blob PK attribute 0 is concatenated table PK null padded
3729      * to fixed maximum size.  The actual size and attributes of
3730      * table PK must be discovered.
3731      */
3732     Uint32 max_size = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3733 
3734     Uint32 sz = 0; // words parsed so far
3735     Uint32 n = 0;
3736     Uint32 i;
3737     for (i = 0; n < mainTable->m_noOfKeys; i++) {
3738       const NdbColumnImpl* c = mainTable->getColumn(i);
3739       assert(c != NULL);
3740       if (! c->m_pk)
3741         continue;
3742 
3743       Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3744       Uint32 lb, len;
3745       require(sz < max_size);
3746       bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_data[sz],
3747                                            bytesize, lb, len);
3748       if (!ok)
3749       {
3750         DBUG_RETURN_EVENT(-1);
3751       }
3752 
3753       AttributeHeader ah(i, lb + len);
3754       pk_ah[n] = ah.m_value;
3755       sz += ah.getDataSize();
3756       n++;
3757     }
3758     assert(n == mainTable->m_noOfKeys);
3759     require(sz <= max_size);
3760     pk_size = sz;
3761   } else {
3762     /*
3763      * Blob PK starts with separate table PKs.  Total size must be
3764      * counted and blob attribute ids changed to table attribute ids.
3765      */
3766     Uint32 sz = 0; // count size
3767     Uint32 n = 0;
3768     Uint32 i;
3769     for (i = 0; n < mainTable->m_noOfKeys; i++) {
3770       const NdbColumnImpl* c = mainTable->getColumn(i);
3771       assert(c != NULL);
3772       if (! c->m_pk)
3773         continue;
3774 
3775       AttributeHeader ah(blob_data->ptr[0].p[n]);
3776       ah.setAttributeId(i);
3777       pk_ah[n] = ah.m_value;
3778       sz += ah.getDataSize();
3779       n++;
3780     }
3781     assert(n == mainTable->m_noOfKeys);
3782     pk_size = sz;
3783   }
3784 
3785   ptr[0].sz = mainTable->m_noOfKeys;
3786   ptr[0].p = pk_ah;
3787   ptr[1].sz = pk_size;
3788   ptr[1].p = pk_data;
3789   ptr[2].sz = 0;
3790   ptr[2].p = 0;
3791 
3792   DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3793   DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3794 
3795   // search for main event buffer
3796   bucket->m_data_hash.search(hpos, main_op, ptr);
3797   if (hpos.data != NULL)
3798     DBUG_RETURN_EVENT(0);
3799 
3800   // not found, create a place-holder
3801   EventBufData* main_data = alloc_data();
3802   if (main_data == NULL)
3803     DBUG_RETURN_EVENT(-1);
3804   SubTableData sdata = *blob_data->sdata;
3805   sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3806   SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3807   if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3808     DBUG_RETURN_EVENT(-1);
3809   hpos.data = main_data;
3810 
3811   DBUG_RETURN_EVENT(1);
3812 }
3813 
3814 void
add_blob_data(Gci_container * bucket,EventBufData * main_data,EventBufData * blob_data)3815 NdbEventBuffer::add_blob_data(Gci_container* bucket,
3816                               EventBufData* main_data,
3817                               EventBufData* blob_data)
3818 {
3819   DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
3820   DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p 0x%x %s",
3821                             main_data, blob_data, m_ndb->getReference(),
3822                             m_ndb->getNdbObjectName()));
3823   EventBufData* head;
3824   head = main_data->m_next_blob;
3825   while (head != NULL)
3826   {
3827     if (head->m_event_op == blob_data->m_event_op)
3828       break;
3829     head = head->m_next_blob;
3830   }
3831   if (head == NULL)
3832   {
3833     head = blob_data;
3834     head->m_next_blob = main_data->m_next_blob;
3835     main_data->m_next_blob = head;
3836   }
3837   else
3838   {
3839     blob_data->m_next = head->m_next;
3840     head->m_next = blob_data;
3841   }
3842   // adjust data list size
3843   bucket->m_data.m_count += 1;
3844   bucket->m_data.m_sz += blob_data->sz;
3845   DBUG_VOID_RETURN_EVENT;
3846 }
3847 
3848 NdbEventOperationImpl *
move_data()3849 NdbEventBuffer::move_data()
3850 {
3851   // handle received data
3852   if (!m_complete_data.m_data.is_empty())
3853   {
3854     // move this list to last in m_available_data
3855     m_available_data.append_list(&m_complete_data.m_data, 0);
3856 
3857     bzero(&m_complete_data, sizeof(m_complete_data));
3858   }
3859 
3860   // handle used data
3861   if (!m_used_data.is_empty())
3862   {
3863     // return m_used_data to m_free_data
3864     free_list(m_used_data);
3865   }
3866   if (!m_available_data.is_empty())
3867   {
3868     DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
3869 #ifdef VM_TRACE
3870     DBUG_PRINT_EVENT("exit",("m_available_data_count %u 0x%x %s",
3871                              m_available_data.m_count,
3872                              m_ndb->getReference(), m_ndb->getNdbObjectName()));
3873 #endif
3874     DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3875   }
3876   return 0;
3877 }
3878 
3879 void
free_list(EventBufData_list & list)3880 NdbEventBuffer::free_list(EventBufData_list &list)
3881 {
3882 #ifdef NDB_EVENT_VERIFY_SIZE
3883   verify_size(list);
3884 #endif
3885   // return list to m_free_data
3886   list.m_tail->m_next= m_free_data;
3887   m_free_data= list.m_head;
3888 #ifdef VM_TRACE
3889   m_free_data_count+= list.m_count;
3890 #endif
3891   m_free_data_sz+= list.m_sz;
3892 
3893   list.m_head = list.m_tail = NULL;
3894   list.m_count = list.m_sz = 0;
3895 
3896   list.~EventBufData_list(); // free gci ops
3897 }
3898 
append_list(EventBufData_list * list,Uint64 gci)3899 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
3900 {
3901 #ifdef NDB_EVENT_VERIFY_SIZE
3902   NdbEventBuffer::verify_size(*list);
3903 #endif
3904   move_gci_ops(list, gci);
3905 
3906   if (m_tail)
3907     m_tail->m_next= list->m_head;
3908   else
3909     m_head= list->m_head;
3910   m_tail= list->m_tail;
3911   m_count+= list->m_count;
3912   m_sz+= list->m_sz;
3913 
3914   list->m_head = list->m_tail = NULL;
3915   list->m_count = list->m_sz = 0;
3916 }
3917 
3918 void
add_gci_op(Gci_op g)3919 EventBufData_list::add_gci_op(Gci_op g)
3920 {
3921   DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
3922   DBUG_PRINT_EVENT("info", ("p.op: %p  g.event_types: %x", g.op, g.event_types));
3923   assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
3924   Uint32 i;
3925   for (i = 0; i < m_gci_op_count; i++) {
3926     if (m_gci_op_list[i].op == g.op)
3927       break;
3928   }
3929   if (i < m_gci_op_count) {
3930     m_gci_op_list[i].event_types |= g.event_types;
3931   } else {
3932     if (m_gci_op_count == m_gci_op_alloc) {
3933       Uint32 n = 1 + 2 * m_gci_op_alloc;
3934       Gci_op* old_list = m_gci_op_list;
3935       m_gci_op_list = new Gci_op [n];
3936       if (m_gci_op_alloc != 0) {
3937         Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
3938         memcpy(m_gci_op_list, old_list, bytes);
3939         DBUG_PRINT_EVENT("info", ("this: %p  delete m_gci_op_list: %p",
3940                                   this, old_list));
3941         delete [] old_list;
3942       }
3943       else
3944         assert(old_list == 0);
3945       DBUG_PRINT_EVENT("info", ("this: %p  new m_gci_op_list: %p",
3946                                 this, m_gci_op_list));
3947       m_gci_op_alloc = n;
3948     }
3949     assert(m_gci_op_count < m_gci_op_alloc);
3950 #ifndef NDEBUG
3951     i = m_gci_op_count;
3952 #endif
3953     m_gci_op_list[m_gci_op_count++] = g;
3954   }
3955   DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3956   DBUG_VOID_RETURN_EVENT;
3957 }
3958 
3959 void
move_gci_ops(EventBufData_list * list,Uint64 gci)3960 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
3961 {
3962   DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops");
3963   DBUG_PRINT_EVENT("info", ("this: %p  list: %p  gci: %u/%u",
3964                             this, list, (Uint32)(gci >> 32), (Uint32)gci));
3965   assert(!m_is_not_multi_list);
3966   if (!list->m_is_not_multi_list)
3967   {
3968     assert(gci == 0);
3969     if (m_gci_ops_list_tail)
3970       m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3971     else
3972     {
3973       m_gci_ops_list =  list->m_gci_ops_list;
3974     }
3975     m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3976     goto end;
3977   }
3978   {
3979     Gci_ops *new_gci_ops = new Gci_ops;
3980     DBUG_PRINT_EVENT("info", ("this: %p  m_gci_op_list: %p",
3981                         new_gci_ops, list->m_gci_op_list));
3982     if (m_gci_ops_list_tail)
3983       m_gci_ops_list_tail->m_next = new_gci_ops;
3984     else
3985     {
3986       assert(m_gci_ops_list == 0);
3987       m_gci_ops_list = new_gci_ops;
3988     }
3989     m_gci_ops_list_tail = new_gci_ops;
3990 
3991     new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3992     new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3993     new_gci_ops->m_gci = gci;
3994     new_gci_ops->m_next = 0;
3995   }
3996 end:
3997   list->m_gci_op_list = 0;
3998   list->m_gci_ops_list_tail = 0;
3999   list->m_gci_op_alloc = 0;
4000   DBUG_VOID_RETURN_EVENT;
4001 }
4002 
4003 void
clear_event_queue()4004 NdbEventBuffer::clear_event_queue()
4005 {
4006   if(!m_available_data.is_empty())
4007   {
4008     free_list(m_available_data);
4009   }
4010   else
4011   {
4012     // no event data found, remove any lingering gci_ops
4013     // belonging to consumed epochs
4014     m_available_data.~EventBufData_list();
4015   }
4016 }
4017 
4018 NdbEventOperation*
createEventOperation(const char * eventName,NdbError & theError)4019 NdbEventBuffer::createEventOperation(const char* eventName,
4020 				     NdbError &theError)
4021 {
4022   DBUG_ENTER("NdbEventBuffer::createEventOperation");
4023 
4024   if (m_ndb->theImpl->m_ev_op == NULL)
4025   {
4026     clear_event_queue();
4027   }
4028 
4029   NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
4030   if (tOp == 0)
4031   {
4032     theError.code= 4000;
4033     DBUG_RETURN(NULL);
4034   }
4035   if (tOp->getState() != NdbEventOperation::EO_CREATED) {
4036     theError.code= tOp->getNdbError().code;
4037     delete tOp;
4038     DBUG_RETURN(NULL);
4039   }
4040   // add user reference
4041   // removed in dropEventOperation
4042   getEventOperationImpl(tOp)->m_ref_count = 1;
4043   DBUG_PRINT("info", ("m_ref_count: %u for op: %p 0x%x %s",
4044                       getEventOperationImpl(tOp)->m_ref_count,
4045                       getEventOperationImpl(tOp), m_ndb->getReference(),
4046                       m_ndb->getNdbObjectName()));
4047   DBUG_RETURN(tOp);
4048 }
4049 
4050 NdbEventOperationImpl*
createEventOperationImpl(NdbEventImpl & evnt,NdbError & theError)4051 NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
4052                                          NdbError &theError)
4053 {
4054   DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
4055   NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
4056   if (tOp == 0)
4057   {
4058     theError.code= 4000;
4059     DBUG_RETURN(NULL);
4060   }
4061   if (tOp->getState() != NdbEventOperation::EO_CREATED) {
4062     theError.code= tOp->getNdbError().code;
4063     delete tOp;
4064     DBUG_RETURN(NULL);
4065   }
4066   DBUG_RETURN(tOp);
4067 }
4068 
4069 void
dropEventOperation(NdbEventOperation * tOp)4070 NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
4071 {
4072   DBUG_ENTER("NdbEventBuffer::dropEventOperation");
4073   NdbEventOperationImpl* op= getEventOperationImpl(tOp);
4074 
4075   op->stop();
4076   // stop blob event ops
4077   if (op->theMainOp == NULL)
4078   {
4079     Uint64 max_stop_gci = op->m_stop_gci;
4080     NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
4081     while (tBlobOp != NULL)
4082     {
4083       tBlobOp->stop();
4084       Uint64 stop_gci = tBlobOp->m_stop_gci;
4085       if (stop_gci > max_stop_gci)
4086         max_stop_gci = stop_gci;
4087       tBlobOp = tBlobOp->m_next;
4088     }
4089     tBlobOp = op->theBlobOpList;
4090     while (tBlobOp != NULL)
4091     {
4092       tBlobOp->m_stop_gci = max_stop_gci;
4093       tBlobOp = tBlobOp->m_next;
4094     }
4095     op->m_stop_gci = max_stop_gci;
4096   }
4097 
4098   /**
4099    * Needs mutex lock as report_node_XXX accesses list...
4100    */
4101   NdbMutex_Lock(m_mutex);
4102 
4103   // release blob handles now, further access is user error
4104   if (op->theMainOp == NULL)
4105   {
4106     while (op->theBlobList != NULL)
4107     {
4108       NdbBlob* tBlob = op->theBlobList;
4109       op->theBlobList = tBlob->theNext;
4110       m_ndb->releaseNdbBlob(tBlob);
4111     }
4112   }
4113 
4114   if (op->m_next)
4115     op->m_next->m_prev= op->m_prev;
4116   if (op->m_prev)
4117     op->m_prev->m_next= op->m_next;
4118   else
4119     m_ndb->theImpl->m_ev_op= op->m_next;
4120 
4121   assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
4122 
4123   assert(op->m_ref_count > 0);
4124   // remove user reference
4125   // added in createEventOperation
4126   // user error to use reference after this
4127   op->m_ref_count--;
4128   DBUG_PRINT("info", ("m_ref_count: %u for op: %p 0x%x %s",
4129                       op->m_ref_count, op, m_ndb->getReference(),
4130                       m_ndb->getNdbObjectName()));
4131   if (op->m_ref_count == 0)
4132   {
4133     NdbMutex_Unlock(m_mutex);
4134     DBUG_PRINT("info", ("deleting op: %p 0x%x %s",
4135                         op, m_ndb->getReference(), m_ndb->getNdbObjectName()));
4136     delete op->m_facade;
4137   }
4138   else
4139   {
4140     op->m_next= m_dropped_ev_op;
4141     op->m_prev= 0;
4142     if (m_dropped_ev_op)
4143       m_dropped_ev_op->m_prev= op;
4144     m_dropped_ev_op= op;
4145 
4146     NdbMutex_Unlock(m_mutex);
4147   }
4148   DBUG_VOID_RETURN;
4149 }
4150 
4151 void
reportStatus(bool force_report)4152 NdbEventBuffer::reportStatus(bool force_report)
4153 {
4154   EventBufData *apply_buf= m_available_data.m_head;
4155   Uint64 apply_gci, latest_gci= m_latestGCI;
4156   if (apply_buf == 0)
4157     apply_buf= m_complete_data.m_data.m_head;
4158   if (apply_buf && apply_buf->sdata)
4159   {
4160     Uint32 gci_hi = apply_buf->sdata->gci_hi;
4161     Uint32 gci_lo = apply_buf->sdata->gci_lo;
4162     apply_gci= gci_lo | (Uint64(gci_hi) << 32);
4163   }
4164   else
4165     apply_gci= latest_gci;
4166 
4167   if (force_report)
4168       goto send_report;
4169 
4170   if (m_free_thresh)
4171   {
4172     if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
4173         m_total_alloc > 1024*1024)
4174     {
4175       /* report less free buffer than m_free_thresh,
4176          next report when more free than 2 * m_free_thresh
4177       */
4178       m_min_free_thresh= 0;
4179       m_max_free_thresh= 2 * m_free_thresh;
4180       goto send_report;
4181     }
4182 
4183     if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
4184         m_total_alloc > 1024*1024)
4185     {
4186       /* report more free than 2 * m_free_thresh
4187          next report when less free than m_free_thresh
4188       */
4189       m_min_free_thresh= m_free_thresh;
4190       m_max_free_thresh= 100;
4191       goto send_report;
4192     }
4193   }
4194   if (m_gci_slip_thresh &&
4195       (latest_gci-apply_gci >= m_gci_slip_thresh))
4196   {
4197     goto send_report;
4198   }
4199   return;
4200 
4201 send_report:
4202   Uint32 data[8];
4203   data[0]= NDB_LE_EventBufferStatus;
4204   data[1]= m_total_alloc-m_free_data_sz;
4205   data[2]= m_total_alloc;
4206   data[3]= m_max_alloc;
4207   data[4]= (Uint32)(apply_gci);
4208   data[5]= (Uint32)(apply_gci >> 32);
4209   data[6]= (Uint32)(latest_gci);
4210   data[7]= (Uint32)(latest_gci >> 32);
4211   Ndb_internal::send_event_report(true, m_ndb, data,8);
4212 #ifdef VM_TRACE
4213   assert(m_total_alloc >= m_free_data_sz);
4214 #endif
4215 }
4216 
4217 void
get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage & usage)4218 NdbEventBuffer::get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage& usage)
4219 {
4220   usage.allocated_bytes = m_total_alloc;
4221   usage.used_bytes = m_total_alloc - m_free_data_sz;
4222 
4223   // If there's no configured max limit then
4224   // the percentage is a fraction of the total allocated.
4225 
4226   Uint32 ret = 0;
4227   // m_max_alloc == 0 ==> unlimited usage,
4228   // m_total_alloc  >= m_free_data_sz always.
4229   if (m_max_alloc > 0)
4230     ret = (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_max_alloc);
4231   else if (m_total_alloc > 0)
4232     ret = (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_total_alloc);
4233 
4234   usage.usage_percent = ret;
4235 }
4236 
4237 #ifdef VM_TRACE
4238 void
verify_size(const EventBufData * data,Uint32 count,Uint32 sz)4239 NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
4240 {
4241 #if 0
4242   Uint32 tmp_count = 0;
4243   Uint32 tmp_sz = 0;
4244   while (data != 0) {
4245     Uint32 full_count, full_sz;
4246     data->get_full_size(full_count, full_sz);
4247     tmp_count += full_count;
4248     tmp_sz += full_sz;
4249     data = data->m_next;
4250   }
4251   assert(tmp_count == count);
4252   assert(tmp_sz == sz);
4253 #endif
4254 }
4255 void
verify_size(const EventBufData_list & list)4256 NdbEventBuffer::verify_size(const EventBufData_list & list)
4257 {
4258 #if 0
4259   verify_size(list.m_head, list.m_count, list.m_sz);
4260 #endif
4261 }
4262 #endif
4263 
4264 // hash table routines
4265 
4266 // could optimize the all-fixed case
4267 Uint32
getpkhash(NdbEventOperationImpl * op,LinearSectionPtr ptr[3])4268 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
4269 {
4270   DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
4271   DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
4272   DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
4273 
4274   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
4275 
4276   // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
4277   // for pk update (to equivalent pk) post/pre values give same hash
4278   Uint32 nkey = tab->m_noOfKeys;
4279   assert(nkey != 0 && nkey <= ptr[0].sz);
4280   const Uint32* hptr = ptr[0].p;
4281   const uchar* dptr = (uchar*)ptr[1].p;
4282 
4283   // hash registers
4284   ulong nr1 = 0;
4285   ulong nr2 = 0;
4286   while (nkey-- != 0)
4287   {
4288     AttributeHeader ah(*hptr++);
4289     Uint32 bytesize = ah.getByteSize();
4290     assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
4291 
4292     Uint32 i = ah.getAttributeId();
4293     const NdbColumnImpl* col = tab->getColumn(i);
4294     require(col != 0);
4295 
4296     Uint32 lb, len;
4297     bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
4298     require(ok);
4299 
4300     CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
4301     (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
4302     dptr += ((bytesize + 3) / 4) * 4;
4303   }
4304   DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
4305   DBUG_RETURN_EVENT(nr1);
4306 }
4307 
4308 bool
getpkequal(NdbEventOperationImpl * op,LinearSectionPtr ptr1[3],LinearSectionPtr ptr2[3])4309 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
4310 {
4311   DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
4312   DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
4313   DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
4314   DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
4315   DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
4316 
4317   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
4318 
4319   Uint32 nkey = tab->m_noOfKeys;
4320   assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
4321   const Uint32* hptr1 = ptr1[0].p;
4322   const Uint32* hptr2 = ptr2[0].p;
4323   const uchar* dptr1 = (uchar*)ptr1[1].p;
4324   const uchar* dptr2 = (uchar*)ptr2[1].p;
4325 
4326   bool equal = true;
4327 
4328   while (nkey-- != 0)
4329   {
4330     AttributeHeader ah1(*hptr1++);
4331     AttributeHeader ah2(*hptr2++);
4332     // sizes can differ on update of varchar endspace
4333     Uint32 bytesize1 = ah1.getByteSize();
4334     Uint32 bytesize2 = ah2.getByteSize();
4335     assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
4336     assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
4337 
4338     assert(ah1.getAttributeId() == ah2.getAttributeId());
4339     Uint32 i = ah1.getAttributeId();
4340     const NdbColumnImpl* col = tab->getColumn(i);
4341     assert(col != 0);
4342 
4343     Uint32 lb1, len1;
4344     bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
4345     Uint32 lb2, len2;
4346     bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
4347     require(ok1 && ok2 && lb1 == lb2);
4348 
4349     CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
4350     int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
4351     if (res != 0)
4352     {
4353       equal = false;
4354       break;
4355     }
4356     dptr1 += ((bytesize1 + 3) / 4) * 4;
4357     dptr2 += ((bytesize2 + 3) / 4) * 4;
4358   }
4359 
4360   DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
4361   DBUG_RETURN_EVENT(equal);
4362 }
4363 
4364 void
search(Pos & hpos,NdbEventOperationImpl * op,LinearSectionPtr ptr[3])4365 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
4366 {
4367   DBUG_ENTER_EVENT("EventBufData_hash::search");
4368   Uint32 pkhash = getpkhash(op, ptr);
4369   Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
4370   EventBufData* data = m_hash[index];
4371   while (data != 0)
4372   {
4373     if (data->m_event_op == op &&
4374         data->m_pkhash == pkhash &&
4375         getpkequal(op, data->ptr, ptr))
4376       break;
4377     data = data->m_next_hash;
4378   }
4379   hpos.index = index;
4380   hpos.data = data;
4381   hpos.pkhash = pkhash;
4382   DBUG_PRINT_EVENT("info", ("search result=%p", data));
4383   DBUG_VOID_RETURN_EVENT;
4384 }
4385 
4386 template class Vector<Gci_container_pod>;
4387 template class Vector<NdbEventBuffer::EventBufData_chunk*>;
4388