1 /*
2    Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 #include <kernel_types.h>
28 
29 #include "API.hpp"
30 #include <NdbOut.hpp>
31 
32 #include <signaldata/CreateEvnt.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <SimpleProperties.hpp>
35 #include <Bitmask.hpp>
36 #include <AttributeHeader.hpp>
37 #include <AttributeList.hpp>
38 #include <NdbError.hpp>
39 #include <BaseString.hpp>
40 #include <UtilBuffer.hpp>
41 #include <portlib/NdbMem.h>
42 #include <signaldata/AlterTable.hpp>
43 #include "ndb_internal.hpp"
44 
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47 
48 #define TOTAL_BUCKETS_INIT (1 << 15)
49 static Gci_container_pod g_empty_gci_container;
50 
51 #if defined(VM_TRACE) && defined(NOT_USED)
52 static void
print_std(const SubTableData * sdata,LinearSectionPtr ptr[3])53 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
54 {
55   printf("addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (void*)sdata,
56          sdata->gci_hi, sdata->gci_lo,
57 	 SubTableData::getOperation(sdata->requestInfo));
58   for (int i = 0; i <= 2; i++) {
59     printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
60     for (int j = 0; (uint) j < ptr[i].sz; j++)
61       printf("%08x ", ptr[i].p[j]);
62     printf("\n");
63   }
64 }
65 #endif
66 
67 // EventBufData
68 
69 void
add_part_size(Uint32 & full_count,Uint32 & full_sz) const70 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
71 {
72   Uint32 tmp_count = 0;
73   Uint32 tmp_sz = 0;
74   const EventBufData* data2 = m_next_blob;
75   while (data2 != 0) {
76     tmp_count++;
77     tmp_sz += data2->sz;
78     const EventBufData* data3 = data2->m_next;
79     while (data3 != 0) {
80       tmp_count++;
81       tmp_sz += data3->sz;
82       data3 = data3->m_next;
83     }
84     data2 = data2->m_next_blob;
85   }
86   full_count += tmp_count;
87   full_sz += tmp_sz;
88 }
89 
90 /*
91  * Class NdbEventOperationImpl
92  *
93  *
94  */
95 
96 // todo handle several ndb objects
97 // todo free allocated data when closing NdbEventBuffer
98 
NdbEventOperationImpl(NdbEventOperation & f,Ndb * theNdb,const char * eventName)99 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f,
100 					     Ndb *theNdb,
101 					     const char* eventName) :
102   NdbEventOperation(*this),
103   m_facade(&f),
104   m_ndb(theNdb),
105   m_state(EO_ERROR),
106   m_oid(~(Uint32)0)
107 {
108   DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
109 
110   assert(m_ndb != NULL);
111   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
112   assert(myDict != NULL);
113 
114   const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
115   if (!myEvnt)
116   {
117     m_error.code= myDict->getNdbError().code;
118     DBUG_VOID_RETURN;
119   }
120 
121   init(myEvnt->m_impl);
122   DBUG_VOID_RETURN;
123 }
124 
NdbEventOperationImpl(Ndb * theNdb,NdbEventImpl & evnt)125 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb,
126                                              NdbEventImpl& evnt) :
127   NdbEventOperation(*this),
128   m_facade(this),
129   m_ndb(theNdb),
130   m_state(EO_ERROR),
131   m_oid(~(Uint32)0)
132 {
133   DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
134   init(evnt);
135   DBUG_VOID_RETURN;
136 }
137 
138 void
init(NdbEventImpl & evnt)139 NdbEventOperationImpl::init(NdbEventImpl& evnt)
140 {
141   DBUG_ENTER("NdbEventOperationImpl::init");
142 
143   m_magic_number = 0;
144   mi_type = 0;
145   m_change_mask = 0;
146 #ifdef VM_TRACE
147   m_data_done_count = 0;
148   m_data_count = 0;
149 #endif
150   m_next = 0;
151   m_prev = 0;
152 
153   m_eventId = 0;
154   theFirstPkAttrs[0] = NULL;
155   theCurrentPkAttrs[0] = NULL;
156   theFirstPkAttrs[1] = NULL;
157   theCurrentPkAttrs[1] = NULL;
158   theFirstDataAttrs[0] = NULL;
159   theCurrentDataAttrs[0] = NULL;
160   theFirstDataAttrs[1] = NULL;
161   theCurrentDataAttrs[1] = NULL;
162 
163   theBlobList = NULL;
164   theBlobOpList = NULL;
165   theMainOp = NULL;
166   theBlobVersion = 0;
167 
168   m_data_item= NULL;
169   m_eventImpl = NULL;
170 
171   m_custom_data= 0;
172   m_has_error= 1;
173 
174   // we should lookup id in Dictionary, TODO
175   // also make sure we only have one listener on each event
176 
177   m_eventImpl = &evnt;
178 
179   m_eventId = m_eventImpl->m_eventId;
180 
181   m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
182 
183   m_state= EO_CREATED;
184 
185   m_stop_gci = 0;
186 #ifdef ndb_event_stores_merge_events_flag
187   m_mergeEvents = m_eventImpl->m_mergeEvents;
188 #else
189   m_mergeEvents = false;
190 #endif
191   m_ref_count = 0;
192   DBUG_PRINT("info", ("m_ref_count = 0 for op: 0x%lx", (long) this));
193 
194   m_has_error= 0;
195 
196   DBUG_PRINT("exit",("this: 0x%lx  oid: %u", (long) this, m_oid));
197   DBUG_VOID_RETURN;
198 }
199 
~NdbEventOperationImpl()200 NdbEventOperationImpl::~NdbEventOperationImpl()
201 {
202   DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
203   m_magic_number= 0;
204 
205   if (m_oid == ~(Uint32)0)
206     DBUG_VOID_RETURN;
207 
208   stop();
209 
210   if (theMainOp == NULL)
211   {
212     NdbEventOperationImpl* tBlobOp = theBlobOpList;
213     while (tBlobOp != NULL)
214     {
215       NdbEventOperationImpl *op = tBlobOp;
216       tBlobOp = tBlobOp->m_next;
217       delete op;
218     }
219   }
220 
221   m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
222   DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p",
223              this, m_facade, m_oid, theMainOp));
224 
225   if (m_eventImpl)
226   {
227     delete m_eventImpl->m_facade;
228     m_eventImpl= 0;
229   }
230 
231   DBUG_VOID_RETURN;
232 }
233 
234 NdbEventOperation::State
getState()235 NdbEventOperationImpl::getState()
236 {
237   return m_state;
238 }
239 
240 NdbRecAttr*
getValue(const char * colName,char * aValue,int n)241 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
242 {
243   DBUG_ENTER("NdbEventOperationImpl::getValue");
244   if (m_state != EO_CREATED) {
245     ndbout_c("NdbEventOperationImpl::getValue may only be called between "
246 	     "instantiation and execute()");
247     DBUG_RETURN(NULL);
248   }
249 
250   NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
251 
252   if (tAttrInfo == NULL) {
253     ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
254     DBUG_RETURN(NULL);
255   }
256 
257   DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
258 }
259 
260 NdbRecAttr*
getValue(const NdbColumnImpl * tAttrInfo,char * aValue,int n)261 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
262 {
263   DBUG_ENTER("NdbEventOperationImpl::getValue");
264   // Insert Attribute Id into ATTRINFO part.
265 
266   NdbRecAttr **theFirstAttr;
267   NdbRecAttr **theCurrentAttr;
268 
269   if (tAttrInfo->getPrimaryKey())
270   {
271     theFirstAttr = &theFirstPkAttrs[n];
272     theCurrentAttr = &theCurrentPkAttrs[n];
273   }
274   else
275   {
276     theFirstAttr = &theFirstDataAttrs[n];
277     theCurrentAttr = &theCurrentDataAttrs[n];
278   }
279 
280   /************************************************************************
281    *	Get a Receive Attribute object and link it into the operation object.
282    ************************************************************************/
283   NdbRecAttr *tAttr = m_ndb->getRecAttr();
284   if (tAttr == NULL) {
285     exit(-1);
286     //setErrorCodeAbort(4000);
287     DBUG_RETURN(NULL);
288   }
289 
290   /**********************************************************************
291    * Now set the attribute identity and the pointer to the data in
292    * the RecAttr object
293    * Also set attribute size, array size and attribute type
294    ********************************************************************/
295   if (tAttr->setup(tAttrInfo, aValue)) {
296     //setErrorCodeAbort(4000);
297     m_ndb->releaseRecAttr(tAttr);
298     exit(-1);
299     DBUG_RETURN(NULL);
300   }
301   //theErrorLine++;
302 
303   tAttr->setUNDEFINED();
304 
305   // We want to keep the list sorted to make data insertion easier later
306 
307   if (*theFirstAttr == NULL) {
308     *theFirstAttr = tAttr;
309     *theCurrentAttr = tAttr;
310     tAttr->next(NULL);
311   } else {
312     Uint32 tAttrId = tAttrInfo->m_attrId;
313     if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
314       (*theCurrentAttr)->next(tAttr);
315       tAttr->next(NULL);
316       *theCurrentAttr = tAttr;
317     } else if ((*theFirstAttr)->next() == NULL ||    // only one in list
318 	       (*theFirstAttr)->attrId() > tAttrId) {// or first
319       tAttr->next(*theFirstAttr);
320       *theFirstAttr = tAttr;
321     } else { // at least 2 in list and not first and not last
322       NdbRecAttr *p = *theFirstAttr;
323       NdbRecAttr *p_next = p->next();
324       while (tAttrId > p_next->attrId()) {
325 	p = p_next;
326 	p_next = p->next();
327       }
328       if (tAttrId == p_next->attrId()) { // Using same attribute twice
329 	tAttr->release(); // do I need to do this?
330 	m_ndb->releaseRecAttr(tAttr);
331 	exit(-1);
332 	DBUG_RETURN(NULL);
333       }
334       // this is it, between p and p_next
335       p->next(tAttr);
336       tAttr->next(p_next);
337     }
338   }
339   DBUG_RETURN(tAttr);
340 }
341 
342 NdbBlob*
getBlobHandle(const char * colName,int n)343 NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
344 {
345   DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
346 
347   assert(m_mergeEvents);
348 
349   if (m_state != EO_CREATED) {
350     ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
351 	     "instantiation and execute()");
352     DBUG_RETURN(NULL);
353   }
354 
355   NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
356 
357   if (tAttrInfo == NULL) {
358     ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
359     DBUG_RETURN(NULL);
360   }
361 
362   NdbBlob* bh = getBlobHandle(tAttrInfo, n);
363   DBUG_RETURN(bh);
364 }
365 
366 NdbBlob*
getBlobHandle(const NdbColumnImpl * tAttrInfo,int n)367 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
368 {
369   DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
370   DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
371 
372   // as in NdbOperation, create only one instance
373   NdbBlob* tBlob = theBlobList;
374   NdbBlob* tLastBlob = NULL;
375   while (tBlob != NULL) {
376     if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
377       DBUG_RETURN(tBlob);
378     tLastBlob = tBlob;
379     tBlob = tBlob->theNext;
380   }
381 
382   NdbEventOperationImpl* tBlobOp = NULL;
383 
384   const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
385   assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
386 
387   if (! is_tinyblob) {
388     // blob event name
389     char bename[MAX_TAB_NAME_SIZE];
390     NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
391 
392     // find blob event op if any (it serves both post and pre handles)
393     tBlobOp = theBlobOpList;
394     NdbEventOperationImpl* tLastBlopOp = NULL;
395     while (tBlobOp != NULL) {
396       if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
397         break;
398       }
399       tLastBlopOp = tBlobOp;
400       tBlobOp = tBlobOp->m_next;
401     }
402 
403     DBUG_PRINT("info", ("%s blob event op for %s",
404                         tBlobOp ? " reuse" : " create", bename));
405 
406     // create blob event op if not found
407     if (tBlobOp == NULL) {
408       // get blob event
409       NdbDictionaryImpl& dict =
410         NdbDictionaryImpl::getImpl(*m_ndb->getDictionary());
411       NdbEventImpl* blobEvnt =
412         dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
413       if (blobEvnt == NULL) {
414         m_error.code = dict.m_error.code;
415         DBUG_RETURN(NULL);
416       }
417 
418       // create blob event operation
419       tBlobOp =
420         m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
421       if (tBlobOp == NULL)
422         DBUG_RETURN(NULL);
423 
424       // pointer to main table op
425       tBlobOp->theMainOp = this;
426       tBlobOp->m_mergeEvents = m_mergeEvents;
427       tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
428 
429       // to hide blob op it is linked under main op, not under m_ndb
430       if (tLastBlopOp == NULL)
431         theBlobOpList = tBlobOp;
432       else
433         tLastBlopOp->m_next = tBlobOp;
434       tBlobOp->m_next = NULL;
435     }
436   }
437 
438   tBlob = m_ndb->getNdbBlob();
439   if (tBlob == NULL) {
440     m_error.code = m_ndb->getNdbError().code;
441     DBUG_RETURN(NULL);
442   }
443 
444   // calls getValue on inline and blob part
445   if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
446     m_error.code = tBlob->getNdbError().code;
447     m_ndb->releaseNdbBlob(tBlob);
448     DBUG_RETURN(NULL);
449   }
450 
451   // add to list end
452   if (tLastBlob == NULL)
453     theBlobList = tBlob;
454   else
455     tLastBlob->theNext = tBlob;
456   tBlob->theNext = NULL;
457   DBUG_RETURN(tBlob);
458 }
459 
460 Uint32
get_blob_part_no(bool hasDist)461 NdbEventOperationImpl::get_blob_part_no(bool hasDist)
462 {
463   assert(theBlobVersion == 1 || theBlobVersion == 2);
464   assert(theMainOp != NULL);
465   const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
466   assert(m_data_item != NULL);
467   LinearSectionPtr (&ptr)[3] = m_data_item->ptr;
468 
469   uint pos = 0; // PK and possibly DIST to skip
470 
471   if (unlikely(theBlobVersion == 1)) {
472     pos += AttributeHeader(ptr[0].p[0]).getDataSize();
473     assert(hasDist);
474     pos += AttributeHeader(ptr[0].p[1]).getDataSize();
475   } else {
476     uint n = mainTable->m_noOfKeys;
477     uint i;
478     for (i = 0; i < n; i++) {
479       pos += AttributeHeader(ptr[0].p[i]).getDataSize();
480     }
481     if (hasDist)
482       pos += AttributeHeader(ptr[0].p[n]).getDataSize();
483   }
484 
485   assert(pos < ptr[1].sz);
486   Uint32 no = ptr[1].p[pos];
487   return no;
488 }
489 
490 int
readBlobParts(char * buf,NdbBlob * blob,Uint32 part,Uint32 count,Uint16 * lenLoc)491 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
492                                      Uint32 part, Uint32 count, Uint16* lenLoc)
493 {
494   DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
495   DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
496                       part, count, blob->theEventBlobVersion));
497 
498   NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
499   const bool hasDist = (blob->theStripeSize != 0);
500 
501   EventBufData* main_data = m_data_item;
502   DBUG_PRINT_EVENT("info", ("main_data=%p", main_data));
503   assert(main_data != NULL);
504 
505   // search for blob parts list head
506   EventBufData* head;
507   assert(m_data_item != NULL);
508   head = m_data_item->m_next_blob;
509   while (head != NULL)
510   {
511     if (head->m_event_op == blob_op)
512     {
513       DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
514       break;
515     }
516     head = head->m_next_blob;
517   }
518 
519   Uint32 nparts = 0;
520   Uint32 noutside = 0;
521   EventBufData* data = head;
522   // XXX optimize using part no ordering
523   while (data != NULL)
524   {
525     /*
526      * Hack part no directly out of buffer since it is not returned
527      * in pre data (PK buglet).  For part data use receive_event().
528      * This means extra copy. XXX fix
529      */
530     blob_op->m_data_item = data;
531     int r = blob_op->receive_event();
532     assert(r > 0);
533     // XXX should be: no = blob->theBlobEventPartValue
534     Uint32 no = blob_op->get_blob_part_no(hasDist);
535 
536     DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part", data, no));
537 
538     if (part <= no && no < part + count)
539     {
540       DBUG_PRINT_EVENT("info", ("part within read range"));
541 
542       const char* src = blob->theBlobEventDataBuf.data;
543       Uint32 sz = 0;
544       if (blob->theFixedDataFlag) {
545         sz = blob->thePartSize;
546       } else {
547         const uchar* p = (const uchar*)blob->theBlobEventDataBuf.data;
548         sz = p[0] + (p[1] << 8);
549         src += 2;
550       }
551       memcpy(buf + (no - part) * sz, src, sz);
552       nparts++;
553       if (lenLoc != NULL) {
554         assert(count == 1);
555         *lenLoc = sz;
556       } else {
557         assert(sz == blob->thePartSize);
558       }
559     }
560     else
561     {
562       DBUG_PRINT_EVENT("info", ("part outside read range"));
563       noutside++;
564     }
565     data = data->m_next;
566   }
567   if (unlikely(nparts != count))
568   {
569     ndbout_c("nparts: %u count: %u noutside: %u", nparts, count, noutside);
570   }
571   assert(nparts == count);
572 
573   DBUG_RETURN_EVENT(0);
574 }
575 
576 int
execute()577 NdbEventOperationImpl::execute()
578 {
579   DBUG_ENTER("NdbEventOperationImpl::execute");
580   m_ndb->theEventBuffer->add_drop_lock();
581   int r = execute_nolock();
582   m_ndb->theEventBuffer->add_drop_unlock();
583   DBUG_RETURN(r);
584 }
585 
586 int
execute_nolock()587 NdbEventOperationImpl::execute_nolock()
588 {
589   DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
590   DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
591 
592   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
593   if (!myDict) {
594     m_error.code= m_ndb->getNdbError().code;
595     DBUG_RETURN(-1);
596   }
597 
598   bool schemaTrans = false;
599   if (m_ndb->theEventBuffer->m_total_buckets == TOTAL_BUCKETS_INIT)
600   {
601     int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(false);
602     if (res != 0)
603     {
604       switch(myDict->getNdbError().code){
605       case 711:
606       case 763:
607         // ignore;
608         break;
609       default:
610         m_error.code= myDict->getNdbError().code;
611         DBUG_RETURN(-1);
612       }
613     }
614     else
615     {
616       schemaTrans = true;
617     }
618   }
619 
620   if (theFirstPkAttrs[0] == NULL &&
621       theFirstDataAttrs[0] == NULL) { // defaults to get all
622   }
623 
624   m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
625   m_state= EO_EXECUTING;
626   mi_type= m_eventImpl->mi_type;
627   m_ndb->theEventBuffer->add_op();
628   // add kernel reference
629   // removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
630   m_ref_count++;
631   m_stop_gci= ~(Uint64)0;
632   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
633   Uint32 buckets = 0;
634   int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this,
635                                                                    buckets);
636   if (r == 0)
637   {
638     /* Pre-7.0 kernel nodes do not return the number of buckets
639      * Assume it's == theNoOfDBnodes as was the case in 6.3
640      */
641     if (buckets == ~ (Uint32)0)
642       buckets = m_ndb->theImpl->theNoOfDBnodes;
643 
644     m_ndb->theEventBuffer->set_total_buckets(buckets);
645     if (schemaTrans)
646     {
647       schemaTrans = false;
648       myDict->endSchemaTrans(1);
649     }
650 
651     if (theMainOp == NULL) {
652       DBUG_PRINT("info", ("execute blob ops"));
653       NdbEventOperationImpl* blob_op = theBlobOpList;
654       while (blob_op != NULL) {
655         r = blob_op->execute_nolock();
656         if (r != 0) {
657           // since main op is running and possibly some blob ops as well
658           // we can't just reset the main op.  Instead return with error,
659           // main op (and blob ops) will be cleaned up when user calls
660           // dropEventOperation
661           m_error.code= myDict->getNdbError().code;
662           DBUG_RETURN(r);
663         }
664         blob_op = blob_op->m_next;
665       }
666     }
667     if (r == 0)
668     {
669       DBUG_RETURN(0);
670     }
671   }
672   // Error
673   // remove kernel reference
674   // added above
675   m_ref_count--;
676   m_stop_gci = 0;
677   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
678   m_state= EO_ERROR;
679   mi_type= 0;
680   m_magic_number= 0;
681   m_error.code= myDict->getNdbError().code;
682   m_ndb->theEventBuffer->remove_op();
683 
684   if (schemaTrans)
685   {
686     schemaTrans = false;
687     myDict->endSchemaTrans(1);
688   }
689 
690   DBUG_RETURN(r);
691 }
692 
693 int
stop()694 NdbEventOperationImpl::stop()
695 {
696   DBUG_ENTER("NdbEventOperationImpl::stop");
697   int i;
698 
699   for (i=0 ; i<2; i++) {
700     NdbRecAttr *p = theFirstPkAttrs[i];
701     while (p) {
702       NdbRecAttr *p_next = p->next();
703       m_ndb->releaseRecAttr(p);
704       p = p_next;
705     }
706     theFirstPkAttrs[i]= 0;
707   }
708   for (i=0 ; i<2; i++) {
709     NdbRecAttr *p = theFirstDataAttrs[i];
710     while (p) {
711       NdbRecAttr *p_next = p->next();
712       m_ndb->releaseRecAttr(p);
713       p = p_next;
714     }
715     theFirstDataAttrs[i]= 0;
716   }
717 
718   if (m_state != EO_EXECUTING)
719   {
720     DBUG_RETURN(-1);
721   }
722 
723   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
724   if (!myDict) {
725     m_error.code= m_ndb->getNdbError().code;
726     DBUG_RETURN(-1);
727   }
728 
729   m_ndb->theEventBuffer->add_drop_lock();
730   int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
731   m_ndb->theEventBuffer->remove_op();
732   m_state= EO_DROPPED;
733   mi_type= 0;
734   if (r == 0) {
735     if (m_stop_gci == 0)
736     {
737       // response from old kernel
738       Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
739       if (gci)
740       {
741         // calculate a "safe" gci in the future to remove event op.
742         gci += Uint64(3) << 32;
743       }
744       else
745       {
746         // set highest value to ensure that operation does not get dropped
747         // too early. Note '-1' as ~Uint64(0) indicates active event
748         gci = ~Uint64(0)-1;
749       }
750       m_stop_gci = gci;
751     }
752     m_ndb->theEventBuffer->add_drop_unlock();
753     DBUG_RETURN(0);
754   }
755   //Error
756   m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
757   m_state= EO_ERROR;
758   m_ndb->theEventBuffer->add_drop_unlock();
759   DBUG_RETURN(r);
760 }
761 
tableNameChanged() const762 bool NdbEventOperationImpl::tableNameChanged() const
763 {
764   return (bool)AlterTableReq::getNameFlag(m_change_mask);
765 }
766 
tableFrmChanged() const767 bool NdbEventOperationImpl::tableFrmChanged() const
768 {
769   return (bool)AlterTableReq::getFrmFlag(m_change_mask);
770 }
771 
tableFragmentationChanged() const772 bool NdbEventOperationImpl::tableFragmentationChanged() const
773 {
774   return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
775 }
776 
tableRangeListChanged() const777 bool NdbEventOperationImpl::tableRangeListChanged() const
778 {
779   return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
780 }
781 
782 Uint64
getGCI()783 NdbEventOperationImpl::getGCI()
784 {
785   Uint32 gci_hi = m_data_item->sdata->gci_hi;
786   Uint32 gci_lo = m_data_item->sdata->gci_lo;
787   return gci_lo | (Uint64(gci_hi) << 32);
788 }
789 
790 Uint32
getAnyValue() const791 NdbEventOperationImpl::getAnyValue() const
792 {
793   return m_data_item->sdata->anyValue;
794 }
795 
796 Uint64
getLatestGCI()797 NdbEventOperationImpl::getLatestGCI()
798 {
799   return m_ndb->theEventBuffer->getLatestGCI();
800 }
801 
802 Uint64
getTransId() const803 NdbEventOperationImpl::getTransId() const
804 {
805   /* Return 64 bit composite */
806   Uint32 transId1 = m_data_item->sdata->transId1;
807   Uint32 transId2 = m_data_item->sdata->transId2;
808   return Uint64(transId1) << 32 | transId2;
809 }
810 
811 bool
execSUB_TABLE_DATA(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])812 NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal,
813                                           const LinearSectionPtr ptr[3])
814 {
815   DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
816   const SubTableData * const sdata=
817     CAST_CONSTPTR(SubTableData, signal->getDataPtr());
818 
819   if(signal->isFirstFragment()){
820     m_fragmentId = signal->getFragmentId();
821     m_buffer.grow(4 * sdata->totalLen);
822   } else {
823     if(m_fragmentId != signal->getFragmentId()){
824       abort();
825     }
826   }
827 
828   const Uint32 i = SubTableData::DICT_TAB_INFO;
829   DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
830                       4 * ptr[i].sz, m_fragmentId));
831   m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
832 
833   if(!signal->isLastFragment()){
834     DBUG_RETURN(FALSE);
835   }
836 
837   DBUG_RETURN(TRUE);
838 }
839 
840 
841 int
receive_event()842 NdbEventOperationImpl::receive_event()
843 {
844   Uint32 operation=
845     SubTableData::getOperation(m_data_item->sdata->requestInfo);
846   if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
847   {
848     DBUG_ENTER("NdbEventOperationImpl::receive_event");
849     DBUG_PRINT("info",("sdata->operation %u  this: %p", operation, this));
850     m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
851     if (operation == NdbDictionary::Event::_TE_ALTER)
852     {
853       // Parse the new table definition and
854       // create a table object
855       NdbDictInterface::Tx tx_unused;
856       NdbError error;
857       int warn;
858       NdbDictInterface dif(tx_unused, error, warn);
859       NdbTableImpl *at;
860       m_change_mask = m_data_item->sdata->changeMask;
861       error.code = dif.parseTableInfo(&at,
862                                       (Uint32*)m_buffer.get_data(),
863                                       m_buffer.length() / 4,
864                                       true);
865       m_buffer.clear();
866       if (unlikely(error.code))
867       {
868         DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
869                                   error.code));
870         ndbout_c("Failed to parse DictTabInfo error %u", error.code);
871         DBUG_RETURN(1);
872       }
873       at->buildColumnHash();
874 
875       NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
876       m_eventImpl->m_tableImpl = at;
877 
878       DBUG_PRINT("info", ("switching table impl 0x%lx -> 0x%lx",
879                           (long) tmp_table_impl, (long) at));
880 
881       // change the rec attrs to refer to the new table object
882       int i;
883       for (i = 0; i < 2; i++)
884       {
885         NdbRecAttr *p = theFirstPkAttrs[i];
886         while (p)
887         {
888           int no = p->getColumn()->getColumnNo();
889           NdbColumnImpl *tAttrInfo = at->getColumn(no);
890           DBUG_PRINT("info", ("rec_attr: 0x%lx  "
891                               "switching column impl 0x%lx -> 0x%lx",
892                               (long) p, (long) p->m_column, (long) tAttrInfo));
893           p->m_column = tAttrInfo;
894           p = p->next();
895         }
896       }
897       for (i = 0; i < 2; i++)
898       {
899         NdbRecAttr *p = theFirstDataAttrs[i];
900         while (p)
901         {
902           int no = p->getColumn()->getColumnNo();
903           NdbColumnImpl *tAttrInfo = at->getColumn(no);
904           DBUG_PRINT("info", ("rec_attr: 0x%lx  "
905                               "switching column impl 0x%lx -> 0x%lx",
906                               (long) p, (long) p->m_column, (long) tAttrInfo));
907           p->m_column = tAttrInfo;
908           p = p->next();
909         }
910       }
911       // change the blobHandle's to refer to the new table object.
912       NdbBlob *p = theBlobList;
913       while (p)
914       {
915         int no = p->getColumn()->getColumnNo();
916         NdbColumnImpl *tAttrInfo = at->getColumn(no);
917         DBUG_PRINT("info", ("blob_handle: 0x%lx  "
918                             "switching column impl 0x%lx -> 0x%lx",
919                             (long) p, (long) p->theColumn, (long) tAttrInfo));
920         p->theColumn = tAttrInfo;
921         p = p->next();
922       }
923       if (tmp_table_impl)
924         delete tmp_table_impl;
925     }
926     DBUG_RETURN(1);
927   }
928 
929   DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
930   DBUG_PRINT_EVENT("info",("sdata->operation %u  this: %p", operation, this));
931   // now move the data into the RecAttrs
932   m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
933 
934   int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
935 
936   Uint32 *aAttrPtr = m_data_item->ptr[0].p;
937   Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
938   Uint32 *aDataPtr = m_data_item->ptr[1].p;
939 
940   DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
941   DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
942 
943   // copy data into the RecAttr's
944   // we assume that the respective attribute lists are sorted
945 
946   // first the pk's
947   {
948     NdbRecAttr *tAttr= theFirstPkAttrs[0];
949     NdbRecAttr *tAttr1= theFirstPkAttrs[1];
950     while(tAttr)
951     {
952       assert(aAttrPtr < aAttrEndPtr);
953       unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
954       assert(tAttr->attrId() ==
955 	     AttributeHeader(*aAttrPtr).getAttributeId());
956       receive_data(tAttr, aDataPtr, tDataSz);
957       if (!is_insert)
958 	receive_data(tAttr1, aDataPtr, tDataSz);
959       else
960         tAttr1->setUNDEFINED(); // do not leave unspecified
961       tAttr1= tAttr1->next();
962       // next
963       aAttrPtr++;
964       aDataPtr+= (tDataSz + 3) >> 2;
965       tAttr= tAttr->next();
966     }
967   }
968 
969   NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
970 
971   Uint32 tRecAttrId;
972   Uint32 tAttrId;
973   Uint32 tDataSz;
974   int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE);
975   while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
976     tRecAttrId = tWorkingRecAttr->attrId();
977     tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
978     tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
979 
980     while (tAttrId > tRecAttrId) {
981       DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
982                                tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
983       tWorkingRecAttr->setUNDEFINED();
984       tWorkingRecAttr = tWorkingRecAttr->next();
985       if (tWorkingRecAttr == NULL)
986 	break;
987       tRecAttrId = tWorkingRecAttr->attrId();
988     }
989     if (tWorkingRecAttr == NULL)
990       break;
991 
992     if (tAttrId == tRecAttrId) {
993       hasSomeData=1;
994 
995       DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
996                                tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
997 
998       receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
999       tWorkingRecAttr = tWorkingRecAttr->next();
1000     }
1001     aAttrPtr++;
1002     aDataPtr += (tDataSz + 3) >> 2;
1003   }
1004 
1005   while (tWorkingRecAttr != NULL) {
1006     tRecAttrId = tWorkingRecAttr->attrId();
1007     //printf("set undefined [%u] %u %u [%u]\n",
1008     //       tAttrId, tDataSz, *aDataPtr, tRecAttrId);
1009     tWorkingRecAttr->setUNDEFINED();
1010     tWorkingRecAttr = tWorkingRecAttr->next();
1011   }
1012 
1013   tWorkingRecAttr = theFirstDataAttrs[1];
1014   aDataPtr = m_data_item->ptr[2].p;
1015   Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1016   while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1017     tRecAttrId = tWorkingRecAttr->attrId();
1018     tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
1019     tDataSz = AttributeHeader(*aDataPtr).getByteSize();
1020     aDataPtr++;
1021     while (tAttrId > tRecAttrId) {
1022       tWorkingRecAttr->setUNDEFINED();
1023       tWorkingRecAttr = tWorkingRecAttr->next();
1024       if (tWorkingRecAttr == NULL)
1025 	break;
1026       tRecAttrId = tWorkingRecAttr->attrId();
1027     }
1028     if (tWorkingRecAttr == NULL)
1029       break;
1030     if (tAttrId == tRecAttrId) {
1031       assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
1032       hasSomeData=1;
1033 
1034       receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1035       tWorkingRecAttr = tWorkingRecAttr->next();
1036     }
1037     aDataPtr += (tDataSz + 3) >> 2;
1038   }
1039   while (tWorkingRecAttr != NULL) {
1040     tWorkingRecAttr->setUNDEFINED();
1041     tWorkingRecAttr = tWorkingRecAttr->next();
1042   }
1043 
1044   if (hasSomeData)
1045   {
1046     DBUG_RETURN_EVENT(1);
1047   }
1048 
1049   DBUG_RETURN_EVENT(0);
1050 }
1051 
1052 NdbDictionary::Event::TableEvent
getEventType()1053 NdbEventOperationImpl::getEventType()
1054 {
1055   return (NdbDictionary::Event::TableEvent)
1056     (1 << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1057 }
1058 
1059 
1060 
1061 void
print()1062 NdbEventOperationImpl::print()
1063 {
1064   int i;
1065   ndbout << "EventId " << m_eventId << "\n";
1066 
1067   for (i = 0; i < 2; i++) {
1068     NdbRecAttr *p = theFirstPkAttrs[i];
1069     ndbout << " %u " << i;
1070     while (p) {
1071       ndbout << " : " << p->attrId() << " = " << *p;
1072       p = p->next();
1073     }
1074     ndbout << "\n";
1075   }
1076   for (i = 0; i < 2; i++) {
1077     NdbRecAttr *p = theFirstDataAttrs[i];
1078     ndbout << " %u " << i;
1079     while (p) {
1080       ndbout << " : " << p->attrId() << " = " << *p;
1081       p = p->next();
1082     }
1083     ndbout << "\n";
1084   }
1085 }
1086 
1087 void
printAll()1088 NdbEventOperationImpl::printAll()
1089 {
1090   Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1091   Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1092   Uint32 *aDataPtr = m_data_item->ptr[1].p;
1093 
1094   //tRecAttr->setup(tAttrInfo, aValue)) {
1095 
1096   Uint32 tAttrId;
1097   Uint32 tDataSz;
1098   for (; aAttrPtr < aAttrEndPtr; ) {
1099     tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1100     tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
1101 
1102     aAttrPtr++;
1103     aDataPtr += tDataSz;
1104   }
1105 }
1106 
1107 /*
1108  * Class NdbEventBuffer
1109  * Each Ndb object has a Object.
1110  */
NdbEventBuffer(Ndb * ndb)1111 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
1112   m_total_buckets(TOTAL_BUCKETS_INIT),
1113   m_min_gci_index(0),
1114   m_max_gci_index(0),
1115   m_ndb(ndb),
1116   m_latestGCI(0), m_latest_complete_GCI(0),
1117   m_highest_sub_gcp_complete_GCI(0),
1118   m_latest_poll_GCI(0),
1119   m_total_alloc(0),
1120   m_free_thresh(0),
1121   m_min_free_thresh(0),
1122   m_max_free_thresh(0),
1123   m_gci_slip_thresh(0),
1124   m_dropped_ev_op(0),
1125   m_active_op_count(0),
1126   m_add_drop_mutex(0)
1127 {
1128 #ifdef VM_TRACE
1129   m_latest_command= "NdbEventBuffer::NdbEventBuffer";
1130   m_flush_gci = 0;
1131 #endif
1132 
1133   if ((p_cond = NdbCondition_Create()) ==  NULL) {
1134     ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
1135     exit(-1);
1136   }
1137   m_mutex = 0; // Set in Ndb::init()
1138 
1139   // ToDo set event buffer size
1140   // pre allocate event data array
1141   m_sz= 0;
1142 #ifdef VM_TRACE
1143   m_free_data_count= 0;
1144 #endif
1145   m_free_data= 0;
1146   m_free_data_sz= 0;
1147 
1148   // get reference to mutex managed by current connection
1149   m_add_drop_mutex=
1150     m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1151 
1152   // initialize lists
1153   bzero(&g_empty_gci_container, sizeof(Gci_container));
1154   init_gci_containers();
1155 
1156   m_alive_node_bit_mask.clear();
1157 }
1158 
~NdbEventBuffer()1159 NdbEventBuffer::~NdbEventBuffer()
1160 {
1161   // todo lock?  what if receive thread writes here?
1162   NdbEventOperationImpl* op= m_dropped_ev_op;
1163   while ((op = m_dropped_ev_op))
1164   {
1165     m_dropped_ev_op = m_dropped_ev_op->m_next;
1166     delete op->m_facade;
1167   }
1168 
1169   unsigned j;
1170   Uint32 sz= m_active_gci.size();
1171   Gci_container* array = (Gci_container*)m_active_gci.getBase();
1172   for(j = 0; j < sz; j++)
1173   {
1174     array[j].~Gci_container();
1175   }
1176 
1177   for (j= 0; j < m_allocated_data.size(); j++)
1178   {
1179     unsigned sz= m_allocated_data[j]->sz;
1180     EventBufData *data= m_allocated_data[j]->data;
1181     EventBufData *end_data= data+sz;
1182     for (; data < end_data; data++)
1183     {
1184       if (data->sdata)
1185 	NdbMem_Free(data->sdata);
1186     }
1187     NdbMem_Free((char*)m_allocated_data[j]);
1188   }
1189 
1190   NdbCondition_Destroy(p_cond);
1191 }
1192 
1193 void
add_op()1194 NdbEventBuffer::add_op()
1195 {
1196   if(m_active_op_count == 0)
1197   {
1198     init_gci_containers();
1199   }
1200   m_active_op_count++;
1201 }
1202 
1203 void
remove_op()1204 NdbEventBuffer::remove_op()
1205 {
1206   m_active_op_count--;
1207 }
1208 
1209 void
init_gci_containers()1210 NdbEventBuffer::init_gci_containers()
1211 {
1212   m_startup_hack = true;
1213   bzero(&m_complete_data, sizeof(m_complete_data));
1214   m_latest_complete_GCI = m_latestGCI = m_latest_poll_GCI = 0;
1215   m_active_gci.clear();
1216   m_active_gci.fill(3, g_empty_gci_container);
1217   m_min_gci_index = m_max_gci_index = 1;
1218   Uint64 gci = 0;
1219   m_known_gci.clear();
1220   m_known_gci.fill(7, gci);
1221 }
1222 
expand(unsigned sz)1223 int NdbEventBuffer::expand(unsigned sz)
1224 {
1225   unsigned alloc_size=
1226     sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
1227   EventBufData_chunk *chunk_data=
1228     (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1229 
1230   chunk_data->sz= sz;
1231   m_allocated_data.push_back(chunk_data);
1232 
1233   EventBufData *data= chunk_data->data;
1234   EventBufData *end_data= data+sz;
1235   EventBufData *last_data= m_free_data;
1236 
1237   bzero((void*)data, sz*sizeof(EventBufData));
1238   for (; data < end_data; data++)
1239   {
1240     data->m_next= last_data;
1241     last_data= data;
1242   }
1243   m_free_data= last_data;
1244 
1245   m_sz+= sz;
1246 #ifdef VM_TRACE
1247   m_free_data_count+= sz;
1248 #endif
1249   return 0;
1250 }
1251 
1252 int
pollEvents(int aMillisecondNumber,Uint64 * latestGCI)1253 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
1254 {
1255   int ret= 1;
1256 #ifdef VM_TRACE
1257   const char *m_latest_command_save= m_latest_command;
1258   m_latest_command= "NdbEventBuffer::pollEvents";
1259 #endif
1260 
1261   NdbMutex_Lock(m_mutex);
1262   NdbEventOperationImpl *ev_op= move_data();
1263   if (unlikely(ev_op == 0 && aMillisecondNumber))
1264   {
1265     NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1266     ev_op= move_data();
1267     if (unlikely(ev_op == 0))
1268       ret= 0;
1269   }
1270   m_latest_poll_GCI= m_latestGCI;
1271 #ifdef VM_TRACE
1272   if (ev_op)
1273   {
1274     // m_mutex is locked
1275     // update event ops data counters
1276     ev_op->m_data_count-= ev_op->m_data_done_count;
1277     ev_op->m_data_done_count= 0;
1278   }
1279   m_latest_command= m_latest_command_save;
1280 #endif
1281   if (unlikely(ev_op == 0))
1282   {
1283     /*
1284       gci's consumed up until m_latest_poll_GCI, so we can free all
1285       dropped event operations stopped up until that gci
1286     */
1287     deleteUsedEventOperations(m_latest_poll_GCI);
1288   }
1289   NdbMutex_Unlock(m_mutex); // we have moved the data
1290 
1291   if (latestGCI)
1292     *latestGCI= m_latest_poll_GCI;
1293 
1294   return ret;
1295 }
1296 
1297 int
flushIncompleteEvents(Uint64 gci)1298 NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
1299 {
1300   /**
1301    *  Find min complete gci
1302    */
1303   Uint64 * array = m_known_gci.getBase();
1304   Uint32 mask = m_known_gci.size() - 1;
1305   Uint32 minpos = m_min_gci_index;
1306   Uint32 maxpos = m_max_gci_index;
1307 
1308   g_eventLogger->info("Flushing incomplete GCI:s < %u/%u",
1309                       Uint32(gci >> 32), Uint32(gci));
1310   while (minpos != maxpos && array[minpos] < gci)
1311   {
1312     Gci_container* tmp = find_bucket(array[minpos]);
1313     assert(tmp);
1314     assert(maxpos == m_max_gci_index);
1315 
1316     if(!tmp->m_data.is_empty())
1317     {
1318       free_list(tmp->m_data);
1319     }
1320     tmp->~Gci_container();
1321     bzero(tmp, sizeof(Gci_container));
1322     minpos = (minpos + 1) & mask;
1323   }
1324 
1325   m_min_gci_index = minpos;
1326 
1327 #ifdef VM_TRACE
1328   m_flush_gci = gci;
1329 #endif
1330 
1331   return 0;
1332 }
1333 
1334 NdbEventOperation *
nextEvent()1335 NdbEventBuffer::nextEvent()
1336 {
1337   DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent");
1338 #ifdef VM_TRACE
1339   const char *m_latest_command_save= m_latest_command;
1340 #endif
1341 
1342   if (m_used_data.m_count > 1024)
1343   {
1344 #ifdef VM_TRACE
1345     m_latest_command= "NdbEventBuffer::nextEvent (lock)";
1346 #endif
1347     NdbMutex_Lock(m_mutex);
1348     // return m_used_data to m_free_data
1349     free_list(m_used_data);
1350 
1351     NdbMutex_Unlock(m_mutex);
1352   }
1353 #ifdef VM_TRACE
1354   m_latest_command= "NdbEventBuffer::nextEvent";
1355 #endif
1356 
1357   EventBufData *data;
1358   Uint64 gci= 0;
1359   while ((data= m_available_data.m_head))
1360   {
1361     NdbEventOperationImpl *op= data->m_event_op;
1362 
1363     /*
1364      * The data was not associated with an event operation,
1365      * possibly a dummy event list marking missing data
1366      */
1367     if (!op && !isConsistent(gci))
1368     {
1369       DBUG_PRINT_EVENT("info", ("detected inconsistent gci %u", gci));
1370       DBUG_RETURN_EVENT(0);
1371     }
1372 
1373     DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
1374 
1375     /*
1376      * If merge is on, blob part sub-events must not be seen on this level.
1377      * If merge is not on, there are no blob part sub-events.
1378      */
1379     assert(op->theMainOp == NULL);
1380 
1381     // set NdbEventOperation data
1382     op->m_data_item= data;
1383 
1384     // remove item from m_available_data and return size
1385     Uint32 full_count, full_sz;
1386     m_available_data.remove_first(full_count, full_sz);
1387 
1388     // add it to used list
1389     m_used_data.append_used_data(data, full_count, full_sz);
1390 
1391     m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1392 
1393 #ifdef VM_TRACE
1394     op->m_data_done_count++;
1395 #endif
1396 
1397     if (op->m_state == NdbEventOperation::EO_EXECUTING)
1398     {
1399       int r= op->receive_event();
1400       if (r > 0)
1401       {
1402 #ifdef VM_TRACE
1403 	 m_latest_command= m_latest_command_save;
1404 #endif
1405          NdbBlob* tBlob = op->theBlobList;
1406          while (tBlob != NULL)
1407          {
1408            (void)tBlob->atNextEvent();
1409            tBlob = tBlob->theNext;
1410          }
1411          EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1412          while (gci_ops && op->getGCI() > gci_ops->m_gci)
1413          {
1414            gci_ops = m_available_data.delete_next_gci_ops();
1415          }
1416          if (!gci_ops->m_consistent)
1417            DBUG_RETURN_EVENT(0);
1418          assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
1419          // to return TE_NUL it should be made into data event
1420          if (SubTableData::getOperation(data->sdata->requestInfo) ==
1421 	   NdbDictionary::Event::_TE_NUL)
1422          {
1423            DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
1424            continue;
1425          }
1426 	 DBUG_RETURN_EVENT(op->m_facade);
1427        }
1428        // the next event belonged to an event op that is no
1429        // longer valid, skip to next
1430       continue;
1431     }
1432 #ifdef VM_TRACE
1433     m_latest_command= m_latest_command_save;
1434 #endif
1435   }
1436   m_error.code= 0;
1437 #ifdef VM_TRACE
1438   m_latest_command= m_latest_command_save;
1439 #endif
1440 
1441   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1442   while (gci_ops)
1443   {
1444     gci_ops = m_available_data.delete_next_gci_ops();
1445   }
1446   /*
1447     gci's consumed up until m_latest_poll_GCI, so we can free all
1448     dropped event operations stopped up until that gci
1449   */
1450   if (m_dropped_ev_op)
1451   {
1452     NdbMutex_Lock(m_mutex);
1453     deleteUsedEventOperations(m_latest_poll_GCI);
1454     NdbMutex_Unlock(m_mutex);
1455   }
1456   DBUG_RETURN_EVENT(0);
1457 }
1458 
1459 bool
isConsistent(Uint64 & gci)1460 NdbEventBuffer::isConsistent(Uint64& gci)
1461 {
1462   DBUG_ENTER("NdbEventBuffer::isConsistent");
1463   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1464   while (gci_ops)
1465   {
1466     if (!gci_ops->m_consistent)
1467     {
1468       gci = gci_ops->m_gci;
1469       DBUG_RETURN(false);
1470     }
1471     gci_ops = gci_ops->m_next;
1472   }
1473 
1474   DBUG_RETURN(true);
1475 }
1476 
1477 bool
isConsistentGCI(Uint64 gci)1478 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1479 {
1480   DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
1481   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1482   while (gci_ops)
1483   {
1484     if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
1485       DBUG_RETURN(false);
1486     gci_ops = gci_ops->m_next;
1487   }
1488 
1489   DBUG_RETURN(true);
1490 }
1491 
1492 
1493 NdbEventOperationImpl*
getGCIEventOperations(Uint32 * iter,Uint32 * event_types)1494 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1495 {
1496   DBUG_ENTER("NdbEventBuffer::getGCIEventOperations");
1497   EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1498   if (*iter < gci_ops->m_gci_op_count)
1499   {
1500     EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
1501     if (event_types != NULL)
1502       *event_types = g.event_types;
1503     DBUG_PRINT("info", ("gci: %u  g.op: 0x%lx  g.event_types: 0x%lx",
1504                         (unsigned)gci_ops->m_gci, (long) g.op,
1505                         (long) g.event_types));
1506     DBUG_RETURN(g.op);
1507   }
1508   DBUG_RETURN(NULL);
1509 }
1510 
1511 void
deleteUsedEventOperations(Uint64 last_consumed_gci)1512 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1513 {
1514   NdbEventOperationImpl *op= m_dropped_ev_op;
1515   while (op && op->m_stop_gci)
1516   {
1517     if (last_consumed_gci > op->m_stop_gci)
1518     {
1519       while (op)
1520       {
1521         NdbEventOperationImpl *next_op= op->m_next;
1522         op->m_stop_gci= 0;
1523         op->m_ref_count--;
1524         if (op->m_ref_count == 0)
1525         {
1526           if (op->m_next)
1527             op->m_next->m_prev = op->m_prev;
1528           if (op->m_prev)
1529             op->m_prev->m_next = op->m_next;
1530           else
1531             m_dropped_ev_op = op->m_next;
1532           delete op->m_facade;
1533         }
1534         op = next_op;
1535       }
1536       break;
1537     }
1538     op = op->m_next;
1539   }
1540 }
1541 
1542 #ifdef VM_TRACE
1543 static
1544 NdbOut&
operator <<(NdbOut & out,const Gci_container & gci)1545 operator<<(NdbOut& out, const Gci_container& gci)
1546 {
1547   out << "[ GCI: " << (gci.m_gci >> 32) << "/" << (gci.m_gci & 0xFFFFFFFF)
1548       << "  state: " << hex << gci.m_state
1549       << "  head: " << hex << gci.m_data.m_head
1550       << "  tail: " << hex << gci.m_data.m_tail
1551 #ifdef VM_TRACE
1552       << "  cnt: " << dec << gci.m_data.m_count
1553 #endif
1554       << " gcp: " << dec << gci.m_gcp_complete_rep_count
1555       << "]";
1556   return out;
1557 }
1558 
1559 static
1560 NdbOut&
operator <<(NdbOut & out,const Gci_container_pod & gci)1561 operator<<(NdbOut& out, const Gci_container_pod& gci)
1562 {
1563   Gci_container* ptr = (Gci_container*)&gci;
1564   out << *ptr;
1565   return out;
1566 }
1567 #endif
1568 
1569 void
resize_known_gci()1570 NdbEventBuffer::resize_known_gci()
1571 {
1572   Uint32 minpos = m_min_gci_index;
1573   Uint32 maxpos = m_max_gci_index;
1574   Uint32 mask = m_known_gci.size() - 1;
1575 
1576   Uint64 fill = 0;
1577   Uint32 newsize = 2 * (mask + 1);
1578   m_known_gci.fill(newsize - 1, fill);
1579   Uint64 * array = m_known_gci.getBase();
1580 
1581   if (0)
1582   {
1583     printf("before (%u): ", minpos);
1584     for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1585       printf("%u/%u ",
1586              Uint32(array[i] >> 32),
1587              Uint32(array[i]));
1588     printf("\n");
1589   }
1590 
1591   Uint32 idx = mask + 1; // Store eveything in "new" part of buffer
1592   if (0) printf("swapping ");
1593   while (minpos != maxpos)
1594   {
1595     if (0) printf("%u-%u ", minpos, idx);
1596     Uint64 tmp = array[idx];
1597     array[idx] = array[minpos];
1598     array[minpos] = tmp;
1599 
1600     idx++;
1601     minpos = (minpos + 1) & mask; // NOTE old mask
1602   }
1603   if (0) printf("\n");
1604 
1605   minpos = m_min_gci_index = mask + 1;
1606   maxpos = m_max_gci_index = idx;
1607   assert(minpos < maxpos);
1608 
1609   if (0)
1610   {
1611     ndbout_c("resize_known_gci from %u to %u", (mask + 1), newsize);
1612     printf("after: ");
1613     for (Uint32 i = minpos; i < maxpos; i++)
1614     {
1615       printf("%u/%u ",
1616              Uint32(array[i] >> 32),
1617              Uint32(array[i]));
1618     }
1619     printf("\n");
1620   }
1621 
1622 #ifdef VM_TRACE
1623   Uint64 gci = array[minpos];
1624   for (Uint32 i = minpos + 1; i<maxpos; i++)
1625   {
1626     assert(array[i] > gci);
1627     gci = array[i];
1628   }
1629 #endif
1630 }
1631 
1632 #ifdef VM_TRACE
1633 void
verify_known_gci(bool allowempty)1634 NdbEventBuffer::verify_known_gci(bool allowempty)
1635 {
1636   Uint32 minpos = m_min_gci_index;
1637   Uint32 maxpos = m_max_gci_index;
1638   Uint32 mask = m_known_gci.size() - 1;
1639 
1640   Uint32 line;
1641 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1642   if (m_min_gci_index == m_max_gci_index)
1643   {
1644     MMASSERT(allowempty);
1645     for (Uint32 i = 0; i<m_active_gci.size(); i++)
1646       MMASSERT(((Gci_container*)(m_active_gci.getBase()+i))->m_gci == 0);
1647     return;
1648   }
1649 
1650   {
1651     Uint64 last = m_known_gci[minpos];
1652     MMASSERT(last > m_latestGCI);
1653     MMASSERT(find_bucket(last) != 0);
1654     MMASSERT(maxpos == m_max_gci_index);
1655 
1656     minpos = (minpos + 1) & mask;
1657     while (minpos != maxpos)
1658     {
1659       MMASSERT(m_known_gci[minpos] > last);
1660       last = m_known_gci[minpos];
1661       MMASSERT(find_bucket(last) != 0);
1662       MMASSERT(maxpos == m_max_gci_index);
1663       minpos = (minpos + 1) & mask;
1664     }
1665   }
1666 
1667   {
1668     Gci_container* bucktets = (Gci_container*)(m_active_gci.getBase());
1669     for (Uint32 i = 0; i<m_active_gci.size(); i++)
1670     {
1671       if (bucktets[i].m_gci)
1672       {
1673         bool found = false;
1674         for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
1675              j = (j + 1) & mask)
1676         {
1677           if (m_known_gci[j] == bucktets[i].m_gci)
1678           {
1679             found = true;
1680             break;
1681           }
1682         }
1683         if (!found)
1684           ndbout_c("%u/%u not found",
1685                    Uint32(bucktets[i].m_gci >> 32),
1686                    Uint32(bucktets[i].m_gci));
1687         MMASSERT(found == true);
1688       }
1689     }
1690   }
1691 
1692   return;
1693 fail:
1694   ndbout_c("assertion at %d", line);
1695   printf("known gci: ");
1696   for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
1697   {
1698     printf("%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
1699   }
1700 
1701   printf("\nContainers");
1702   for (Uint32 i = 0; i<m_active_gci.size(); i++)
1703     ndbout << m_active_gci[i] << endl;
1704   abort();
1705 }
1706 #endif
1707 
1708 Gci_container*
find_bucket_chained(Uint64 gci)1709 NdbEventBuffer::find_bucket_chained(Uint64 gci)
1710 {
1711   if (0)
1712     printf("find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
1713   if (unlikely(gci <= m_latestGCI))
1714   {
1715     /**
1716      * an already complete GCI
1717      */
1718     if (0)
1719       ndbout_c("already complete (%u/%u)",
1720                Uint32(m_latestGCI >> 32),
1721                Uint32(m_latestGCI));
1722     return 0;
1723   }
1724 
1725   if (unlikely(m_total_buckets == 0))
1726   {
1727     return 0;
1728   }
1729 
1730   Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
1731   Uint32 size = m_active_gci.size();
1732   Gci_container *buckets = (Gci_container*)(m_active_gci.getBase());
1733   while (pos < size)
1734   {
1735     Uint64 cmp = (buckets + pos)->m_gci;
1736     if (cmp == gci)
1737     {
1738       if (0)
1739         ndbout_c("found pos: %u", pos);
1740       return buckets + pos;
1741     }
1742 
1743     if (cmp == 0)
1744     {
1745       if (0)
1746         ndbout_c("empty(%u) ", pos);
1747       Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
1748       while (search < size)
1749       {
1750         if ((buckets + search)->m_gci == gci)
1751         {
1752           memcpy(buckets + pos, buckets + search, sizeof(Gci_container));
1753           bzero(buckets + search, sizeof(Gci_container));
1754           if (0)
1755             printf("moved from %u to %u", search, pos);
1756           if (search == size - 1)
1757           {
1758             m_active_gci.erase(search);
1759             if (0)
1760               ndbout_c(" shrink");
1761           }
1762           else
1763           {
1764             if (0)
1765               printf("\n");
1766           }
1767           return buckets + pos;
1768         }
1769         search += ACTIVE_GCI_DIRECTORY_SIZE;
1770       }
1771       goto newbucket;
1772     }
1773     pos += ACTIVE_GCI_DIRECTORY_SIZE;
1774   }
1775 
1776   /**
1777    * This is a new bucket...likely close to start
1778    */
1779   if (0)
1780     ndbout_c("new (with expand) ");
1781   m_active_gci.fill(pos, g_empty_gci_container);
1782   buckets = (Gci_container*)(m_active_gci.getBase());
1783 newbucket:
1784   Gci_container* bucket = buckets + pos;
1785   bucket->m_gci = gci;
1786   bucket->m_gcp_complete_rep_count = m_total_buckets;
1787 
1788   Uint32 mask = m_known_gci.size() - 1;
1789   Uint64 * array = m_known_gci.getBase();
1790 
1791   Uint32 minpos = m_min_gci_index;
1792   Uint32 maxpos = m_max_gci_index;
1793   bool full = ((maxpos + 1) & mask) == minpos;
1794   if (unlikely(full))
1795   {
1796     resize_known_gci();
1797     minpos = m_min_gci_index;
1798     maxpos = m_max_gci_index;
1799     mask = m_known_gci.size() - 1;
1800     array = m_known_gci.getBase();
1801   }
1802 
1803   Uint32 maxindex = (maxpos - 1) & mask;
1804   Uint32 newmaxpos = (maxpos + 1) & mask;
1805   m_max_gci_index = newmaxpos;
1806   if (likely(minpos == maxpos || gci > array[maxindex]))
1807   {
1808     array[maxpos] = gci;
1809 #ifdef VM_TRACE
1810     verify_known_gci(false);
1811 #endif
1812     return bucket;
1813   }
1814 
1815   for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
1816   {
1817     if (array[pos] > gci)
1818       break;
1819   }
1820 
1821   if (0)
1822     ndbout_c("insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
1823              Uint32(gci >> 32),
1824              Uint32(gci),
1825              Uint32(array[maxindex] >> 32),
1826              Uint32(array[maxindex]),
1827              pos,
1828              m_min_gci_index, m_max_gci_index);
1829 
1830   assert(pos != maxpos);
1831   Uint64 oldgci;
1832   do {
1833     oldgci = array[pos];
1834     array[pos] = gci;
1835     gci = oldgci;
1836     pos = (pos + 1) & mask;
1837   } while (pos != maxpos);
1838   array[pos] = gci;
1839 
1840 #ifdef VM_TRACE
1841   verify_known_gci(false);
1842 #endif
1843   return bucket;
1844 }
1845 
1846 static
1847 void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container * bucket,const SubGcpCompleteRep * const rep,Uint32 buckets)1848 crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
1849 				      const SubGcpCompleteRep * const rep,
1850 				      Uint32 buckets)
1851 {
1852   Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1853 
1854   ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
1855   ndbout_c("gci_hi: %u", rep->gci_hi);
1856   ndbout_c("gci_lo: %u", rep->gci_lo);
1857   ndbout_c("sender: %x", rep->senderRef);
1858   ndbout_c("count: %d", rep->gcp_complete_rep_count);
1859   ndbout_c("bucket count: %u", old_cnt);
1860   ndbout_c("total buckets: %u", buckets);
1861   abort();
1862 }
1863 
1864 void
complete_bucket(Gci_container * bucket)1865 NdbEventBuffer::complete_bucket(Gci_container* bucket)
1866 {
1867   Uint64 gci = bucket->m_gci;
1868   Gci_container* buckets = (Gci_container*)m_active_gci.getBase();
1869 
1870   if (0)
1871     ndbout_c("complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
1872              Uint32(bucket - buckets));
1873 
1874 #ifdef VM_TRACE
1875   verify_known_gci(false);
1876 #endif
1877 
1878   /**
1879    * Copy data
1880    */
1881   if(!bucket->m_data.is_empty())
1882   {
1883 #ifdef VM_TRACE
1884     assert(bucket->m_data.m_count);
1885 #endif
1886     m_complete_data.m_data.append_list(&bucket->m_data, gci);
1887     if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1888     {
1889       /*
1890        * Bucket marked as possibly missing data, probably due to
1891        * kernel running out of event_buffer during node failure.
1892        * Mark newly appended event list as inconsistent.
1893        */
1894       assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1895       m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1896     }
1897   }
1898   else // if (bucket->m_data.is_empty())
1899   {
1900     if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1901     {
1902       /*
1903        * Bucket marked as possibly missing data, probably due to
1904        * kernel running out of event_buffer during node failure
1905        * Bucket contained no data so we must add a dummy event list
1906        * as inconsistency marker.
1907        */
1908       EventBufData *dummy_data= alloc_data();
1909       EventBufData_list *dummy_event_list = new EventBufData_list;
1910       dummy_event_list->append_used_data(dummy_data);
1911       dummy_event_list->m_is_not_multi_list = true;
1912       m_complete_data.m_data.append_list(dummy_event_list, gci);
1913       assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1914       m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1915     }
1916   }
1917 
1918   Uint32 minpos = m_min_gci_index;
1919   Uint32 mask = m_known_gci.size() - 1;
1920   assert((mask & (mask + 1)) == 0);
1921 
1922   bzero(bucket, sizeof(Gci_container));
1923 
1924   m_min_gci_index = (minpos + 1) & mask;
1925 
1926 #ifdef VM_TRACE
1927   verify_known_gci(true);
1928 #endif
1929 }
1930 
1931 void
execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,Uint32 len,int complete_cluster_failure)1932 NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,
1933                                          Uint32 len, int complete_cluster_failure)
1934 {
1935   Uint32 gci_hi = rep->gci_hi;
1936   Uint32 gci_lo = rep->gci_lo;
1937 
1938   if (unlikely(len < SubGcpCompleteRep::SignalLength))
1939   {
1940     gci_lo = 0;
1941   }
1942 
1943   const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
1944   if (gci > m_highest_sub_gcp_complete_GCI)
1945     m_highest_sub_gcp_complete_GCI = gci;
1946 
1947   if (!complete_cluster_failure)
1948   {
1949     m_alive_node_bit_mask.set(refToNode(rep->senderRef));
1950 
1951     if (unlikely(m_active_op_count == 0))
1952     {
1953       return;
1954     }
1955   }
1956 
1957   DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
1958 
1959   const Uint32 cnt= rep->gcp_complete_rep_count;
1960 
1961   Gci_container *bucket = find_bucket(gci);
1962 
1963   if (0)
1964     ndbout_c("execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
1965              Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
1966              rep->flags);
1967 
1968   if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
1969                              SubGcpCompleteRep::SUB_CNT)))
1970   {
1971     handle_change_nodegroup(rep);
1972   }
1973 
1974   if (unlikely(bucket == 0))
1975   {
1976     /**
1977      * Already completed GCI...
1978      *   Possible in case of resend during NF handling
1979      */
1980 #ifdef VM_TRACE
1981     Uint64 minGCI = m_known_gci[m_min_gci_index];
1982     ndbout_c("bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
1983              Uint32(gci >> 32), Uint32(gci),
1984              Uint32(minGCI >> 32), Uint32(minGCI),
1985              Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
1986     ndbout << " complete: " << m_complete_data << endl;
1987     for(Uint32 i = 0; i<m_active_gci.size(); i++)
1988     {
1989       if (((Gci_container*)(&m_active_gci[i]))->m_gci)
1990         ndbout << i << " - " << m_active_gci[i] << endl;
1991     }
1992 #endif
1993     DBUG_VOID_RETURN_EVENT;
1994   }
1995 
1996   if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
1997   {
1998     bucket->m_state = Gci_container::GC_INCONSISTENT;
1999   }
2000 
2001   Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
2002   if(unlikely(old_cnt == ~(Uint32)0))
2003   {
2004     old_cnt = m_total_buckets;
2005   }
2006 
2007   //assert(old_cnt >= cnt);
2008   if (unlikely(! (old_cnt >= cnt)))
2009   {
2010     crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_total_buckets);
2011   }
2012   bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2013 
2014   if(old_cnt == cnt)
2015   {
2016     Uint64 minGCI = m_known_gci[m_min_gci_index];
2017     if(likely(minGCI == 0 || gci == minGCI))
2018     {
2019   do_complete:
2020       m_startup_hack = false;
2021       complete_bucket(bucket);
2022       m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
2023       reportStatus();
2024 
2025       if(unlikely(m_latest_complete_GCI > gci))
2026       {
2027 	complete_outof_order_gcis();
2028       }
2029 
2030       // signal that somethings happened
2031 
2032       NdbCondition_Signal(p_cond);
2033     }
2034     else
2035     {
2036       if (unlikely(m_startup_hack))
2037       {
2038         flushIncompleteEvents(gci);
2039         bucket = find_bucket(gci);
2040         assert(bucket);
2041         assert(bucket->m_gci == gci);
2042         goto do_complete;
2043       }
2044       /** out of order something */
2045       g_eventLogger->info("out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2046                           (int)(bucket-(Gci_container*)m_active_gci.getBase()),
2047                           Uint32(gci >> 32), Uint32(gci),
2048                           Uint32(minGCI >> 32), Uint32(minGCI),
2049                           Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2050       bucket->m_state = Gci_container::GC_COMPLETE;
2051       bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
2052       m_latest_complete_GCI = gci;
2053     }
2054   }
2055 
2056   DBUG_VOID_RETURN_EVENT;
2057 }
2058 
2059 void
complete_outof_order_gcis()2060 NdbEventBuffer::complete_outof_order_gcis()
2061 {
2062 #ifdef VM_TRACE
2063   verify_known_gci(false);
2064 #endif
2065 
2066   Uint64 * array = m_known_gci.getBase();
2067   Uint32 mask = m_known_gci.size() - 1;
2068   Uint32 minpos = m_min_gci_index;
2069   Uint32 maxpos = m_max_gci_index;
2070   Uint64 stop_gci = m_latest_complete_GCI;
2071 
2072   Uint64 start_gci = array[minpos];
2073   g_eventLogger->info("complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2074                       Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2075                       Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2076 
2077   assert(start_gci <= stop_gci);
2078   do
2079   {
2080     start_gci = array[minpos];
2081     Gci_container* bucket = find_bucket(start_gci);
2082     assert(bucket);
2083     assert(maxpos == m_max_gci_index);
2084     if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
2085     {
2086 #ifdef VM_TRACE
2087       verify_known_gci(false);
2088 #endif
2089       return;
2090     }
2091 
2092 #ifdef VM_TRACE
2093     ndbout_c("complete_outof_order_gcis - completing %u/%u rows: %u",
2094              Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2095 #else
2096     ndbout_c("complete_outof_order_gcis - completing %u/%u",
2097              Uint32(start_gci >> 32), Uint32(start_gci));
2098 #endif
2099 
2100     complete_bucket(bucket);
2101     m_latestGCI = m_complete_data.m_gci = start_gci;
2102 
2103 #ifdef VM_TRACE
2104     verify_known_gci(true);
2105 #endif
2106     minpos = (minpos + 1) & mask;
2107   } while (start_gci != stop_gci);
2108 }
2109 
2110 void
insert_event(NdbEventOperationImpl * impl,SubTableData & data,LinearSectionPtr * ptr,Uint32 & oid_ref)2111 NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
2112                              SubTableData &data,
2113                              LinearSectionPtr *ptr,
2114                              Uint32 &oid_ref)
2115 {
2116   DBUG_PRINT("info", ("gci{hi/lo}: %u/%u", data.gci_hi, data.gci_lo));
2117   do
2118   {
2119     if (impl->m_stop_gci == ~Uint64(0))
2120     {
2121       oid_ref = impl->m_oid;
2122       insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2123     }
2124     NdbEventOperationImpl* blob_op = impl->theBlobOpList;
2125     while (blob_op != NULL)
2126     {
2127       if (blob_op->m_stop_gci == ~Uint64(0))
2128       {
2129         oid_ref = blob_op->m_oid;
2130         insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2131       }
2132       blob_op = blob_op->m_next;
2133     }
2134   } while((impl = impl->m_next));
2135 }
2136 
2137 bool
find_max_known_gci(Uint64 * res) const2138 NdbEventBuffer::find_max_known_gci(Uint64 * res) const
2139 {
2140   const Uint64 * array = m_known_gci.getBase();
2141   Uint32 mask = m_known_gci.size() - 1;
2142   Uint32 minpos = m_min_gci_index;
2143   Uint32 maxpos = m_max_gci_index;
2144 
2145   if (minpos == maxpos)
2146     return false;
2147 
2148   if (res)
2149   {
2150     * res = array[(maxpos - 1) & mask];
2151   }
2152 
2153   return true;
2154 }
2155 
2156 void
handle_change_nodegroup(const SubGcpCompleteRep * rep)2157 NdbEventBuffer::handle_change_nodegroup(const SubGcpCompleteRep* rep)
2158 {
2159   Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2160   Uint32 cnt = (rep->flags >> 16);
2161   Uint64 * array = m_known_gci.getBase();
2162   Uint32 mask = m_known_gci.size() - 1;
2163   Uint32 minpos = m_min_gci_index;
2164   Uint32 maxpos = m_max_gci_index;
2165 
2166   if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2167   {
2168     ndbout_c("handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2169              cnt, Uint32(gci >> 32), Uint32(gci));
2170 
2171     Uint32 found = 0;
2172     Uint32 pos = minpos;
2173     for (; pos != maxpos; pos = (pos + 1) & mask)
2174     {
2175       if (array[pos] == gci)
2176       {
2177         Gci_container* tmp = find_bucket(array[pos]);
2178         if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2179         {
2180           found = 1;
2181           ndbout_c(" - gci %u/%u already marked complete",
2182                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2183           break;
2184         }
2185         else
2186         {
2187           found = 2;
2188           ndbout_c(" - gci %u/%u marking (and increasing)",
2189                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2190           tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2191           tmp->m_gcp_complete_rep_count += cnt;
2192           break;
2193         }
2194       }
2195       else
2196       {
2197         ndbout_c(" - ignore %u/%u",
2198                  Uint32(array[pos] >> 32), Uint32(array[pos]));
2199       }
2200     }
2201 
2202     if (found == 0)
2203     {
2204       ndbout_c(" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2205       return;
2206     }
2207 
2208     if (found == 1)
2209     {
2210       return; // Nothing todo
2211     }
2212 
2213     m_total_buckets += cnt;
2214 
2215     pos = (pos + 1) & mask;
2216     for (; pos != maxpos; pos = (pos + 1) & mask)
2217     {
2218       assert(array[pos] > gci);
2219       Gci_container* tmp = find_bucket(array[pos]);
2220       assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2221       tmp->m_gcp_complete_rep_count += cnt;
2222       ndbout_c(" - increasing cnt on %u/%u by %u",
2223                Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2224     }
2225   }
2226   else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2227   {
2228     ndbout_c("handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2229              cnt, Uint32(gci >> 32), Uint32(gci));
2230 
2231     Uint32 found = 0;
2232     Uint32 pos = minpos;
2233     for (; pos != maxpos; pos = (pos + 1) & mask)
2234     {
2235       if (array[pos] == gci)
2236       {
2237         Gci_container* tmp = find_bucket(array[pos]);
2238         if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2239         {
2240           found = 1;
2241           ndbout_c(" - gci %u/%u already marked complete",
2242                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2243           break;
2244         }
2245         else
2246         {
2247           found = 2;
2248           ndbout_c(" - gci %u/%u marking",
2249                    Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2250           tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2251           break;
2252         }
2253       }
2254       else
2255       {
2256         ndbout_c(" - ignore %u/%u",
2257                  Uint32(array[pos] >> 32), Uint32(array[pos]));
2258       }
2259     }
2260 
2261     if (found == 0)
2262     {
2263       ndbout_c(" - NOT FOUND");
2264       return;
2265     }
2266 
2267     if (found == 1)
2268     {
2269       return; // Nothing todo
2270     }
2271 
2272     m_total_buckets -= cnt;
2273 
2274     pos = (pos + 1) & mask;
2275     for (; pos != maxpos; pos = (pos + 1) & mask)
2276     {
2277       assert(array[pos] > gci);
2278       Gci_container* tmp = find_bucket(array[pos]);
2279       assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2280       tmp->m_gcp_complete_rep_count -= cnt;
2281       ndbout_c(" - decreasing cnt on %u/%u by %u to: %u",
2282                Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2283                cnt,
2284                tmp->m_gcp_complete_rep_count);
2285     }
2286   }
2287 }
2288 
2289 void
set_total_buckets(Uint32 cnt)2290 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2291 {
2292   if (m_total_buckets == cnt)
2293     return;
2294 
2295   assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2296   m_total_buckets = cnt;
2297 
2298   Uint64 * array = m_known_gci.getBase();
2299   Uint32 mask = m_known_gci.size() - 1;
2300   Uint32 minpos = m_min_gci_index;
2301   Uint32 maxpos = m_max_gci_index;
2302 
2303   bool found = false;
2304   Uint32 pos = minpos;
2305   for (; pos != maxpos; pos = (pos + 1) & mask)
2306   {
2307     Gci_container* tmp = find_bucket(array[pos]);
2308     if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2309     {
2310       found = true;
2311       if (0)
2312         ndbout_c("set_total_buckets(%u) complete %u/%u",
2313                  cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2314       tmp->m_gcp_complete_rep_count = 0;
2315       complete_bucket(tmp);
2316     }
2317     else
2318     {
2319       assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2320       tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2321     }
2322   }
2323   if (found)
2324   {
2325     NdbCondition_Signal(p_cond);
2326   }
2327 }
2328 
2329 void
report_node_failure_completed(Uint32 node_id)2330 NdbEventBuffer::report_node_failure_completed(Uint32 node_id)
2331 {
2332   m_alive_node_bit_mask.clear(node_id);
2333 
2334   NdbEventOperation* op= m_ndb->getEventOperation(0);
2335   if (op == 0)
2336     return;
2337 
2338   DBUG_ENTER("NdbEventBuffer::report_node_failure_completed");
2339   SubTableData data;
2340   LinearSectionPtr ptr[3];
2341   bzero(&data, sizeof(data));
2342   bzero(ptr, sizeof(ptr));
2343 
2344   data.tableId = ~0;
2345   data.requestInfo = 0;
2346   SubTableData::setOperation(data.requestInfo,
2347 			     NdbDictionary::Event::_TE_NODE_FAILURE);
2348   SubTableData::setReqNodeId(data.requestInfo, node_id);
2349   SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2350   data.flags = SubTableData::LOG;
2351 
2352   Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2353   find_max_known_gci(&gci);
2354 
2355   data.gci_hi = Uint32(gci >> 32);
2356   data.gci_lo = Uint32(gci);
2357 
2358   /**
2359    * Insert this event for each operation
2360    */
2361   // no need to lock()/unlock(), receive thread calls this
2362   insert_event(&op->m_impl, data, ptr, data.senderData);
2363 
2364   if (!m_alive_node_bit_mask.isclear())
2365     DBUG_VOID_RETURN;
2366 
2367   /*
2368    * Cluster failure
2369    */
2370 
2371   DBUG_PRINT("info", ("Cluster failure"));
2372 
2373   gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2374   bool found = find_max_known_gci(&gci);
2375 
2376   Uint64 * array = m_known_gci.getBase();
2377   Uint32 mask = m_known_gci.size() - 1;
2378   Uint32 minpos = m_min_gci_index;
2379   Uint32 maxpos = m_max_gci_index;
2380 
2381   while (minpos != maxpos && array[minpos] != gci)
2382   {
2383     Gci_container* tmp = find_bucket(array[minpos]);
2384     assert(tmp);
2385     assert(maxpos == m_max_gci_index);
2386 
2387     if(!tmp->m_data.is_empty())
2388     {
2389       free_list(tmp->m_data);
2390     }
2391     tmp->~Gci_container();
2392     bzero(tmp, sizeof(Gci_container));
2393 
2394     minpos = (minpos + 1) & mask;
2395   }
2396   m_min_gci_index = minpos;
2397   if (found)
2398   {
2399     assert(((minpos + 1) & mask) == maxpos);
2400   }
2401   else
2402   {
2403     assert(minpos == maxpos);
2404   }
2405 
2406   /**
2407    * Inject new event
2408    */
2409   data.tableId = ~0;
2410   data.requestInfo = 0;
2411   SubTableData::setOperation(data.requestInfo,
2412 			     NdbDictionary::Event::_TE_CLUSTER_FAILURE);
2413 
2414   /**
2415    * Insert this event for each operation
2416    */
2417   // no need to lock()/unlock(), receive thread calls this
2418   insert_event(&op->m_impl, data, ptr, data.senderData);
2419 
2420 #ifdef VM_TRACE
2421   m_flush_gci = 0;
2422 #endif
2423 
2424   /**
2425    * And finally complete this GCI
2426    */
2427   Gci_container* tmp = find_bucket(gci);
2428   assert(tmp);
2429   if (found)
2430   {
2431     assert(m_max_gci_index == maxpos); // shouldnt have changed...
2432   }
2433   else
2434   {
2435     assert(m_max_gci_index == ((maxpos + 1) & mask));
2436   }
2437   Uint32 cnt = tmp->m_gcp_complete_rep_count;
2438 
2439   SubGcpCompleteRep rep;
2440   rep.gci_hi= (Uint32)(gci >> 32);
2441   rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
2442   rep.gcp_complete_rep_count= cnt;
2443   rep.flags = 0;
2444   execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength, 1);
2445 
2446   DBUG_VOID_RETURN;
2447 }
2448 
2449 Uint64
getLatestGCI()2450 NdbEventBuffer::getLatestGCI()
2451 {
2452   return m_latestGCI;
2453 }
2454 
2455 int
insertDataL(NdbEventOperationImpl * op,const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3])2456 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
2457 			    const SubTableData * const sdata,
2458                             Uint32 len,
2459 			    LinearSectionPtr ptr[3])
2460 {
2461   DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
2462   const Uint32 ri = sdata->requestInfo;
2463   const Uint32 operation = SubTableData::getOperation(ri);
2464   Uint32 gci_hi = sdata->gci_hi;
2465   Uint32 gci_lo = sdata->gci_lo;
2466 
2467   if (unlikely(len < SubTableData::SignalLength))
2468   {
2469     gci_lo = 0;
2470   }
2471 
2472   Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2473   const bool is_data_event =
2474     operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
2475 
2476   if (!is_data_event)
2477   {
2478     if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
2479     {
2480       /*
2481         Mark event as stopping.  Subsequent dropEventOperation
2482         will add the event to the dropped list for delete
2483       */
2484       op->m_stop_gci = gci;
2485     }
2486     else if (operation == NdbDictionary::Event::_TE_ACTIVE)
2487     {
2488       // internal event, do not relay to user
2489       DBUG_PRINT("info",
2490                  ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
2491                   op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2492       DBUG_RETURN_EVENT(0);
2493     }
2494     else if (operation == NdbDictionary::Event::_TE_STOP)
2495     {
2496       // internal event, do not relay to user
2497       DBUG_PRINT("info",
2498                  ("_TE_STOP: m_ref_count: %u for op: %p id: %u",
2499                   op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2500       DBUG_RETURN_EVENT(0);
2501     }
2502   }
2503 
2504   if ( likely((Uint32)op->mi_type & (1 << operation)))
2505   {
2506     Gci_container* bucket= find_bucket(gci);
2507 
2508     DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId));
2509     DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
2510                               sdata->gci, sdata->tableId,
2511 			      SubTableData::getOperation(sdata->requestInfo),
2512                               SubTableData::getReqNodeId(sdata->requestInfo)));
2513 
2514     if (unlikely(bucket == 0))
2515     {
2516       /**
2517        * Already completed GCI...
2518        *   Possible in case of resend during NF handling
2519        */
2520       DBUG_RETURN_EVENT(0);
2521     }
2522 
2523     const bool is_blob_event = (op->theMainOp != NULL);
2524     const bool use_hash =  op->m_mergeEvents && is_data_event;
2525 
2526     if (! is_data_event && is_blob_event)
2527     {
2528       // currently subscribed to but not used
2529       DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table"));
2530       DBUG_RETURN_EVENT(0);
2531     }
2532 
2533     // find position in bucket hash table
2534     EventBufData* data = 0;
2535     EventBufData_hash::Pos hpos;
2536     if (use_hash)
2537     {
2538       bucket->m_data_hash.search(hpos, op, ptr);
2539       data = hpos.data;
2540     }
2541 
2542     if (data == 0)
2543     {
2544       // allocate new result buffer
2545       data = alloc_data();
2546       if (unlikely(data == 0))
2547       {
2548         op->m_has_error = 2;
2549         DBUG_RETURN_EVENT(-1);
2550       }
2551       if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
2552       {
2553         op->m_has_error = 3;
2554         DBUG_RETURN_EVENT(-1);
2555       }
2556       data->m_event_op = op;
2557       if (! is_blob_event || ! is_data_event)
2558       {
2559         bucket->m_data.append_data(data);
2560       }
2561       else
2562       {
2563         // find or create main event for this blob event
2564         EventBufData_hash::Pos main_hpos;
2565         int ret = get_main_data(bucket, main_hpos, data);
2566         if (ret == -1)
2567         {
2568           op->m_has_error = 4;
2569           DBUG_RETURN_EVENT(-1);
2570         }
2571         EventBufData* main_data = main_hpos.data;
2572         if (ret != 0) // main event was created
2573         {
2574           main_data->m_event_op = op->theMainOp;
2575           bucket->m_data.append_data(main_data);
2576           if (use_hash)
2577           {
2578             main_data->m_pkhash = main_hpos.pkhash;
2579             bucket->m_data_hash.append(main_hpos, main_data);
2580           }
2581         }
2582         // link blob event under main event
2583         add_blob_data(bucket, main_data, data);
2584       }
2585       if (use_hash)
2586       {
2587         data->m_pkhash = hpos.pkhash;
2588         bucket->m_data_hash.append(hpos, data);
2589       }
2590 #ifdef VM_TRACE
2591       op->m_data_count++;
2592 #endif
2593     }
2594     else
2595     {
2596       // event with same op, PK found, merge into old buffer
2597       if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
2598       {
2599         op->m_has_error = 3;
2600         DBUG_RETURN_EVENT(-1);
2601       }
2602       // merge is on so we do not report blob part events
2603       if (! is_blob_event) {
2604         // report actual operation and the composite
2605         // there is no way to "fix" the flags for a composite op
2606         // since the flags represent multiple ops on multiple PKs
2607         // XXX fix by doing merge at end of epoch (extra mem cost)
2608         {
2609           EventBufData_list::Gci_op g = { op, (1 << operation) };
2610           bucket->m_data.add_gci_op(g);
2611         }
2612         {
2613           EventBufData_list::Gci_op
2614 	    g = { op,
2615 		  (1 << SubTableData::getOperation(data->sdata->requestInfo))};
2616           bucket->m_data.add_gci_op(g);
2617         }
2618       }
2619     }
2620 #ifdef NDB_EVENT_VERIFY_SIZE
2621     verify_size(bucket->m_data);
2622 #endif
2623     DBUG_RETURN_EVENT(0);
2624   }
2625 
2626 #ifdef VM_TRACE
2627   if ((Uint32)op->m_eventImpl->mi_type & (1 << operation))
2628   {
2629     DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
2630     DBUG_RETURN_EVENT(0);
2631   }
2632   else {
2633     DBUG_PRINT_EVENT("info",("skipped"));
2634     DBUG_RETURN_EVENT(0);
2635   }
2636 #else
2637   DBUG_RETURN_EVENT(0);
2638 #endif
2639 }
2640 
2641 // allocate EventBufData
2642 EventBufData*
alloc_data()2643 NdbEventBuffer::alloc_data()
2644 {
2645   DBUG_ENTER_EVENT("alloc_data");
2646   EventBufData* data = m_free_data;
2647 
2648   if (unlikely(data == 0))
2649   {
2650 #ifdef VM_TRACE
2651     assert(m_free_data_count == 0);
2652     assert(m_free_data_sz == 0);
2653 #endif
2654     expand(4000);
2655     reportStatus();
2656 
2657     data = m_free_data;
2658     if (unlikely(data == 0))
2659     {
2660 #ifdef VM_TRACE
2661       printf("m_latest_command: %s\n", m_latest_command);
2662       printf("no free data, m_latestGCI %u/%u\n",
2663              (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
2664       printf("m_free_data_count %d\n", m_free_data_count);
2665       printf("m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
2666              m_available_data.m_count,
2667              m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
2668              m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
2669              m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
2670              m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
2671       printf("m_used_data_count %d\n", m_used_data.m_count);
2672 #endif
2673       DBUG_RETURN_EVENT(0); // TODO handle this, overrun, or, skip?
2674     }
2675   }
2676 
2677   // remove data from free list
2678   if (data->m_next_blob == 0)
2679     m_free_data = data->m_next;
2680   else {
2681     EventBufData* data2 = data->m_next_blob;
2682     if (data2->m_next == 0) {
2683       data->m_next_blob = data2->m_next_blob;
2684       data = data2;
2685     } else {
2686       EventBufData* data3 = data2->m_next;
2687       data2->m_next = data3->m_next;
2688       data = data3;
2689     }
2690   }
2691   data->m_next = 0;
2692   data->m_next_blob = 0;
2693 #ifdef VM_TRACE
2694   m_free_data_count--;
2695   assert(m_free_data_sz >= data->sz);
2696 #endif
2697   m_free_data_sz -= data->sz;
2698   DBUG_RETURN_EVENT(data);
2699 }
2700 
2701 // allocate initial or bigger memory area in EventBufData
2702 // takes sizes from given ptr and sets up data->ptr
2703 int
alloc_mem(EventBufData * data,LinearSectionPtr ptr[3],Uint32 * change_sz)2704 NdbEventBuffer::alloc_mem(EventBufData* data,
2705                           LinearSectionPtr ptr[3],
2706                           Uint32 * change_sz)
2707 {
2708   DBUG_ENTER("NdbEventBuffer::alloc_mem");
2709   DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
2710   const Uint32 min_alloc_size = 128;
2711 
2712   Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
2713   Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
2714   if (alloc_size < min_alloc_size)
2715     alloc_size = min_alloc_size;
2716 
2717   if (data->sz < alloc_size)
2718   {
2719     Uint32 add_sz = alloc_size - data->sz;
2720 
2721     NdbMem_Free((char*)data->memory);
2722     assert(m_total_alloc >= data->sz);
2723     data->memory = 0;
2724     data->sz = 0;
2725 
2726     data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
2727     if (data->memory == 0)
2728     {
2729       m_total_alloc -= data->sz;
2730       DBUG_RETURN(-1);
2731     }
2732     data->sz = alloc_size;
2733     m_total_alloc += add_sz;
2734 
2735     if (change_sz != NULL)
2736       *change_sz += add_sz;
2737   }
2738 
2739   Uint32* memptr = data->memory;
2740   memptr += sz4;
2741   int i;
2742   for (i = 0; i <= 2; i++)
2743   {
2744     data->ptr[i].p = memptr;
2745     data->ptr[i].sz = ptr[i].sz;
2746     memptr += ptr[i].sz;
2747   }
2748 
2749   DBUG_RETURN(0);
2750 }
2751 
2752 void
dealloc_mem(EventBufData * data,Uint32 * change_sz)2753 NdbEventBuffer::dealloc_mem(EventBufData* data,
2754                             Uint32 * change_sz)
2755 {
2756   NdbMem_Free((char*)data->memory);
2757   assert(m_total_alloc >= data->sz);
2758   m_total_alloc -= data->sz;
2759   if (change_sz != NULL) {
2760     assert(*change_sz >= data->sz);
2761     *change_sz -= data->sz;
2762   }
2763   data->memory = 0;
2764   data->sz = 0;
2765 }
2766 
2767 int
copy_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3],EventBufData * data,Uint32 * change_sz)2768 NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
2769                           LinearSectionPtr ptr[3],
2770                           EventBufData* data,
2771                           Uint32 * change_sz)
2772 {
2773   DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
2774 
2775   if (alloc_mem(data, ptr, change_sz) != 0)
2776     DBUG_RETURN_EVENT(-1);
2777   memcpy(data->sdata, sdata, sizeof(SubTableData));
2778 
2779   if (unlikely(len < SubTableData::SignalLength))
2780   {
2781     data->sdata->gci_lo = 0;
2782   }
2783   if (len < SubTableData::SignalLengthWithTransId)
2784   {
2785     /* No TransId, set to uninit value */
2786     data->sdata->transId1 = ~Uint32(0);
2787     data->sdata->transId2 = ~Uint32(0);
2788   }
2789 
2790   int i;
2791   for (i = 0; i <= 2; i++)
2792     memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
2793   DBUG_RETURN_EVENT(0);
2794 }
2795 
2796 static struct Ev_t {
2797   enum {
2798     enum_INS = NdbDictionary::Event::_TE_INSERT,
2799     enum_DEL = NdbDictionary::Event::_TE_DELETE,
2800     enum_UPD = NdbDictionary::Event::_TE_UPDATE,
2801     enum_NUL = NdbDictionary::Event::_TE_NUL,
2802     enum_IDM = 254,     // idempotent op possibly allowed on NF
2803     enum_ERR = 255      // always impossible
2804   };
2805   int t1, t2, t3;
2806 } ev_t[] = {
2807   { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
2808   { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok
2809   { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok
2810   { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok
2811   { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
2812   { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
2813   { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
2814   { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok
2815   { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD }  //ok
2816 };
2817 
2818 /*
2819  *   | INS            | DEL              | UPD
2820  * 0 | pk ah + all ah | pk ah            | pk ah + new ah
2821  * 1 | pk ad + all ad | old pk ad        | new pk ad + new ad
2822  * 2 | empty          | old non-pk ah+ad | old ah+ad
2823  */
2824 
2825 static AttributeHeader
copy_head(Uint32 & i1,Uint32 * p1,Uint32 & i2,const Uint32 * p2,Uint32 flags)2826 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
2827           Uint32 flags)
2828 {
2829   AttributeHeader ah(p2[i2]);
2830   bool do_copy = (flags & 1);
2831   if (do_copy)
2832     p1[i1] = p2[i2];
2833   i1++;
2834   i2++;
2835   return ah;
2836 }
2837 
2838 static void
copy_attr(AttributeHeader ah,Uint32 & j1,Uint32 * p1,Uint32 & j2,const Uint32 * p2,Uint32 flags)2839 copy_attr(AttributeHeader ah,
2840           Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
2841           Uint32 flags)
2842 {
2843   bool do_copy = (flags & 1);
2844   bool with_head = (flags & 2);
2845   Uint32 n = with_head + ah.getDataSize();
2846   if (do_copy)
2847   {
2848     Uint32 k;
2849     for (k = 0; k < n; k++)
2850       p1[j1 + k] = p2[j2 + k];
2851   }
2852   j1 += n;
2853   j2 += n;
2854 }
2855 
2856 int
merge_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr2[3],EventBufData * data,Uint32 * change_sz)2857 NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
2858                            LinearSectionPtr ptr2[3],
2859                            EventBufData* data,
2860                            Uint32 * change_sz)
2861 {
2862   DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
2863 
2864   /* TODO : Consider how/if to merge multiple events/key with different
2865    * transid
2866    * Same consideration probably applies to AnyValue!
2867    */
2868 
2869   Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
2870 
2871   int t1 = SubTableData::getOperation(data->sdata->requestInfo);
2872   int t2 = SubTableData::getOperation(sdata->requestInfo);
2873   if (t1 == Ev_t::enum_NUL)
2874     DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
2875 
2876   Ev_t* tp = 0;
2877   int i;
2878   for (i = 0; (uint) i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
2879     if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
2880       tp = &ev_t[i];
2881       break;
2882     }
2883   }
2884   assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
2885 
2886   if (tp->t3 == Ev_t::enum_IDM) {
2887     LinearSectionPtr (&ptr1)[3] = data->ptr;
2888 
2889     /*
2890      * TODO
2891      * - can get data in INS ptr2[2] which is supposed to be empty
2892      * - can get extra data in DEL ptr2[2]
2893      * - why does DBUG_PRINT not work in this file ???
2894      *
2895      * replication + bug#19872 can ignore this since merge is on
2896      * only for tables with explicit PK and before data is not used
2897      */
2898     const int maxsec = 1; // ignore section 2
2899 
2900     int i;
2901     for (i = 0; i <= maxsec; i++) {
2902       if (ptr1[i].sz != ptr2[i].sz ||
2903           memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
2904         DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d",
2905                              tp->t1, tp->t2, i));
2906         assert(false);
2907         DBUG_RETURN_EVENT(-1);
2908       }
2909     }
2910     DBUG_PRINT("info", ("idempotent op %d*%d data ok", tp->t1, tp->t2));
2911     DBUG_RETURN_EVENT(0);
2912   }
2913 
2914   // TODO: use old data items, avoid malloc/free on each merge
2915 
2916   // save old data
2917   EventBufData olddata = *data;
2918   data->memory = 0;
2919   data->sz = 0;
2920 
2921   // compose ptr1 o ptr2 = ptr
2922   LinearSectionPtr (&ptr1)[3] = olddata.ptr;
2923   LinearSectionPtr (&ptr)[3] = data->ptr;
2924 
2925   // loop twice where first loop only sets sizes
2926   int loop;
2927   int result = 0;
2928   for (loop = 0; loop <= 1; loop++)
2929   {
2930     if (loop == 1)
2931     {
2932       if (alloc_mem(data, ptr, change_sz) != 0)
2933       {
2934         result = -1;
2935         goto end;
2936       }
2937       *data->sdata = *sdata;
2938       SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
2939     }
2940 
2941     ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
2942 
2943     // copy pk from new version
2944     {
2945       AttributeHeader ah;
2946       Uint32 i = 0;
2947       Uint32 j = 0;
2948       Uint32 i2 = 0;
2949       Uint32 j2 = 0;
2950       while (i < nkey)
2951       {
2952         ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2953         copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
2954       }
2955       ptr[0].sz = i;
2956       ptr[1].sz = j;
2957     }
2958 
2959     // merge after values, new version overrides
2960     if (tp->t3 != Ev_t::enum_DEL)
2961     {
2962       AttributeHeader ah;
2963       Uint32 i = ptr[0].sz;
2964       Uint32 j = ptr[1].sz;
2965       Uint32 i1 = 0;
2966       Uint32 j1 = 0;
2967       Uint32 i2 = nkey;
2968       Uint32 j2 = ptr[1].sz;
2969       while (i1 < nkey)
2970       {
2971         j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2972       }
2973       while (1)
2974       {
2975         bool b1 = (i1 < ptr1[0].sz);
2976         bool b2 = (i2 < ptr2[0].sz);
2977         if (b1 && b2)
2978         {
2979           Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
2980           Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
2981           if (id1 < id2)
2982             b2 = false;
2983           else if (id1 > id2)
2984             b1 = false;
2985           else
2986           {
2987             j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2988             b1 = false;
2989           }
2990         }
2991         if (b1)
2992         {
2993           ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
2994           copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
2995         }
2996         else if (b2)
2997         {
2998           ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2999           copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3000         }
3001         else
3002           break;
3003       }
3004       ptr[0].sz = i;
3005       ptr[1].sz = j;
3006     }
3007 
3008     // merge before values, old version overrides
3009     if (tp->t3 != Ev_t::enum_INS)
3010     {
3011       AttributeHeader ah;
3012       Uint32 k = 0;
3013       Uint32 k1 = 0;
3014       Uint32 k2 = 0;
3015       while (1)
3016       {
3017         bool b1 = (k1 < ptr1[2].sz);
3018         bool b2 = (k2 < ptr2[2].sz);
3019         if (b1 && b2)
3020         {
3021           Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
3022           Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
3023           if (id1 < id2)
3024             b2 = false;
3025           else if (id1 > id2)
3026             b1 = false;
3027           else
3028           {
3029             k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
3030             b2 = false;
3031           }
3032         }
3033         if (b1)
3034         {
3035           ah = AttributeHeader(ptr1[2].p[k1]);
3036           copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3037         }
3038         else if (b2)
3039         {
3040           ah = AttributeHeader(ptr2[2].p[k2]);
3041           copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3042         }
3043         else
3044           break;
3045       }
3046       ptr[2].sz = k;
3047     }
3048   }
3049 
3050 end:
3051   dealloc_mem(&olddata, change_sz);
3052   DBUG_RETURN_EVENT(result);
3053 }
3054 
3055 /*
3056  * Given blob part event, find main table event on inline part.  It
3057  * should exist (force in TUP) but may arrive later.  If so, create
3058  * NUL event on main table.  The real event replaces it later.
3059  */
3060 
3061 int
get_main_data(Gci_container * bucket,EventBufData_hash::Pos & hpos,EventBufData * blob_data)3062 NdbEventBuffer::get_main_data(Gci_container* bucket,
3063                               EventBufData_hash::Pos& hpos,
3064                               EventBufData* blob_data)
3065 {
3066   DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
3067 
3068   int blobVersion = blob_data->m_event_op->theBlobVersion;
3069   assert(blobVersion == 1 || blobVersion == 2);
3070 
3071   NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
3072   assert(main_op != NULL);
3073   const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3074 
3075   // create LinearSectionPtr for main table key
3076   LinearSectionPtr ptr[3];
3077 
3078   Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3079   Uint32* pk_data = blob_data->ptr[1].p;
3080   Uint32 pk_size = 0;
3081 
3082   if (unlikely(blobVersion == 1)) {
3083     /*
3084      * Blob PK attribute 0 is concatenated table PK null padded
3085      * to fixed maximum size.  The actual size and attributes of
3086      * table PK must be discovered.
3087      */
3088     Uint32 max_size = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3089 
3090     Uint32 sz = 0; // words parsed so far
3091     Uint32 n = 0;
3092     Uint32 i;
3093     for (i = 0; n < mainTable->m_noOfKeys; i++) {
3094       const NdbColumnImpl* c = mainTable->getColumn(i);
3095       assert(c != NULL);
3096       if (! c->m_pk)
3097         continue;
3098 
3099       Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3100       Uint32 lb, len;
3101       assert(sz < max_size);
3102       bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_data[sz],
3103                                            bytesize, lb, len);
3104       assert(ok);
3105 
3106       AttributeHeader ah(i, lb + len);
3107       pk_ah[n] = ah.m_value;
3108       sz += ah.getDataSize();
3109       n++;
3110     }
3111     assert(n == mainTable->m_noOfKeys);
3112     assert(sz <= max_size);
3113     pk_size = sz;
3114   } else {
3115     /*
3116      * Blob PK starts with separate table PKs.  Total size must be
3117      * counted and blob attribute ids changed to table attribute ids.
3118      */
3119     Uint32 sz = 0; // count size
3120     Uint32 n = 0;
3121     Uint32 i;
3122     for (i = 0; n < mainTable->m_noOfKeys; i++) {
3123       const NdbColumnImpl* c = mainTable->getColumn(i);
3124       assert(c != NULL);
3125       if (! c->m_pk)
3126         continue;
3127 
3128       AttributeHeader ah(blob_data->ptr[0].p[n]);
3129       ah.setAttributeId(i);
3130       pk_ah[n] = ah.m_value;
3131       sz += ah.getDataSize();
3132       n++;
3133     }
3134     assert(n == mainTable->m_noOfKeys);
3135     pk_size = sz;
3136   }
3137 
3138   ptr[0].sz = mainTable->m_noOfKeys;
3139   ptr[0].p = pk_ah;
3140   ptr[1].sz = pk_size;
3141   ptr[1].p = pk_data;
3142   ptr[2].sz = 0;
3143   ptr[2].p = 0;
3144 
3145   DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3146   DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3147 
3148   // search for main event buffer
3149   bucket->m_data_hash.search(hpos, main_op, ptr);
3150   if (hpos.data != NULL)
3151     DBUG_RETURN_EVENT(0);
3152 
3153   // not found, create a place-holder
3154   EventBufData* main_data = alloc_data();
3155   if (main_data == NULL)
3156     DBUG_RETURN_EVENT(-1);
3157   SubTableData sdata = *blob_data->sdata;
3158   sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3159   SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3160   if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3161     DBUG_RETURN_EVENT(-1);
3162   hpos.data = main_data;
3163 
3164   DBUG_RETURN_EVENT(1);
3165 }
3166 
3167 void
add_blob_data(Gci_container * bucket,EventBufData * main_data,EventBufData * blob_data)3168 NdbEventBuffer::add_blob_data(Gci_container* bucket,
3169                               EventBufData* main_data,
3170                               EventBufData* blob_data)
3171 {
3172   DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
3173   DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data));
3174   EventBufData* head;
3175   head = main_data->m_next_blob;
3176   while (head != NULL)
3177   {
3178     if (head->m_event_op == blob_data->m_event_op)
3179       break;
3180     head = head->m_next_blob;
3181   }
3182   if (head == NULL)
3183   {
3184     head = blob_data;
3185     head->m_next_blob = main_data->m_next_blob;
3186     main_data->m_next_blob = head;
3187   }
3188   else
3189   {
3190     blob_data->m_next = head->m_next;
3191     head->m_next = blob_data;
3192   }
3193   // adjust data list size
3194   bucket->m_data.m_count += 1;
3195   bucket->m_data.m_sz += blob_data->sz;
3196   DBUG_VOID_RETURN_EVENT;
3197 }
3198 
3199 NdbEventOperationImpl *
move_data()3200 NdbEventBuffer::move_data()
3201 {
3202   // handle received data
3203   if (!m_complete_data.m_data.is_empty())
3204   {
3205     // move this list to last in m_available_data
3206     m_available_data.append_list(&m_complete_data.m_data, 0);
3207 
3208     bzero(&m_complete_data, sizeof(m_complete_data));
3209   }
3210 
3211   // handle used data
3212   if (!m_used_data.is_empty())
3213   {
3214     // return m_used_data to m_free_data
3215     free_list(m_used_data);
3216   }
3217   if (!m_available_data.is_empty())
3218   {
3219     DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
3220 #ifdef VM_TRACE
3221     DBUG_PRINT_EVENT("exit",("m_available_data_count %u", m_available_data.m_count));
3222 #endif
3223     DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3224   }
3225   return 0;
3226 }
3227 
3228 void
free_list(EventBufData_list & list)3229 NdbEventBuffer::free_list(EventBufData_list &list)
3230 {
3231 #ifdef NDB_EVENT_VERIFY_SIZE
3232   verify_size(list);
3233 #endif
3234   // return list to m_free_data
3235   list.m_tail->m_next= m_free_data;
3236   m_free_data= list.m_head;
3237 #ifdef VM_TRACE
3238   m_free_data_count+= list.m_count;
3239 #endif
3240   m_free_data_sz+= list.m_sz;
3241 
3242   list.m_head = list.m_tail = NULL;
3243   list.m_count = list.m_sz = 0;
3244 }
3245 
append_list(EventBufData_list * list,Uint64 gci)3246 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
3247 {
3248 #ifdef NDB_EVENT_VERIFY_SIZE
3249   NdbEventBuffer::verify_size(*list);
3250 #endif
3251   move_gci_ops(list, gci);
3252 
3253   if (m_tail)
3254     m_tail->m_next= list->m_head;
3255   else
3256     m_head= list->m_head;
3257   m_tail= list->m_tail;
3258   m_count+= list->m_count;
3259   m_sz+= list->m_sz;
3260 }
3261 
3262 void
add_gci_op(Gci_op g)3263 EventBufData_list::add_gci_op(Gci_op g)
3264 {
3265   DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
3266   DBUG_PRINT_EVENT("info", ("p.op: %p  g.event_types: %x", g.op, g.event_types));
3267   assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
3268   Uint32 i;
3269   for (i = 0; i < m_gci_op_count; i++) {
3270     if (m_gci_op_list[i].op == g.op)
3271       break;
3272   }
3273   if (i < m_gci_op_count) {
3274     m_gci_op_list[i].event_types |= g.event_types;
3275   } else {
3276     if (m_gci_op_count == m_gci_op_alloc) {
3277       Uint32 n = 1 + 2 * m_gci_op_alloc;
3278       Gci_op* old_list = m_gci_op_list;
3279       m_gci_op_list = new Gci_op [n];
3280       if (m_gci_op_alloc != 0) {
3281         Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
3282         memcpy(m_gci_op_list, old_list, bytes);
3283         DBUG_PRINT_EVENT("info", ("this: %p  delete m_gci_op_list: %p",
3284                                   this, old_list));
3285         delete [] old_list;
3286       }
3287       else
3288         assert(old_list == 0);
3289       DBUG_PRINT_EVENT("info", ("this: %p  new m_gci_op_list: %p",
3290                                 this, m_gci_op_list));
3291       m_gci_op_alloc = n;
3292     }
3293     assert(m_gci_op_count < m_gci_op_alloc);
3294 #ifndef DBUG_OFF
3295     i = m_gci_op_count;
3296 #endif
3297     m_gci_op_list[m_gci_op_count++] = g;
3298   }
3299   DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3300   DBUG_VOID_RETURN_EVENT;
3301 }
3302 
3303 void
move_gci_ops(EventBufData_list * list,Uint64 gci)3304 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
3305 {
3306   DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops");
3307   DBUG_PRINT_EVENT("info", ("this: %p  list: %p  gci: %u/%u",
3308                             this, list, (Uint32)(gci >> 32), (Uint32)gci));
3309   assert(!m_is_not_multi_list);
3310   if (!list->m_is_not_multi_list)
3311   {
3312     assert(gci == 0);
3313     if (m_gci_ops_list_tail)
3314       m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3315     else
3316     {
3317       m_gci_ops_list =  list->m_gci_ops_list;
3318     }
3319     m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3320     goto end;
3321   }
3322   {
3323     Gci_ops *new_gci_ops = new Gci_ops;
3324     DBUG_PRINT_EVENT("info", ("this: %p  m_gci_op_list: %p",
3325                         new_gci_ops, list->m_gci_op_list));
3326     if (m_gci_ops_list_tail)
3327       m_gci_ops_list_tail->m_next = new_gci_ops;
3328     else
3329     {
3330       assert(m_gci_ops_list == 0);
3331       m_gci_ops_list = new_gci_ops;
3332     }
3333     m_gci_ops_list_tail = new_gci_ops;
3334 
3335     new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3336     new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3337     new_gci_ops->m_gci = gci;
3338     new_gci_ops->m_next = 0;
3339   }
3340 end:
3341   list->m_gci_op_list = 0;
3342   list->m_gci_ops_list_tail = 0;
3343   list->m_gci_op_alloc = 0;
3344   DBUG_VOID_RETURN_EVENT;
3345 }
3346 
3347 NdbEventOperation*
createEventOperation(const char * eventName,NdbError & theError)3348 NdbEventBuffer::createEventOperation(const char* eventName,
3349 				     NdbError &theError)
3350 {
3351   DBUG_ENTER("NdbEventBuffer::createEventOperation");
3352   NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
3353   if (tOp == 0)
3354   {
3355     theError.code= 4000;
3356     DBUG_RETURN(NULL);
3357   }
3358   if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3359     theError.code= tOp->getNdbError().code;
3360     delete tOp;
3361     DBUG_RETURN(NULL);
3362   }
3363   // add user reference
3364   // removed in dropEventOperation
3365   getEventOperationImpl(tOp)->m_ref_count = 1;
3366   DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
3367                       getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
3368   DBUG_RETURN(tOp);
3369 }
3370 
3371 NdbEventOperationImpl*
createEventOperationImpl(NdbEventImpl & evnt,NdbError & theError)3372 NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
3373                                          NdbError &theError)
3374 {
3375   DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
3376   NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
3377   if (tOp == 0)
3378   {
3379     theError.code= 4000;
3380     DBUG_RETURN(NULL);
3381   }
3382   if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3383     theError.code= tOp->getNdbError().code;
3384     delete tOp;
3385     DBUG_RETURN(NULL);
3386   }
3387   DBUG_RETURN(tOp);
3388 }
3389 
3390 void
dropEventOperation(NdbEventOperation * tOp)3391 NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
3392 {
3393   DBUG_ENTER("NdbEventBuffer::dropEventOperation");
3394   NdbEventOperationImpl* op= getEventOperationImpl(tOp);
3395 
3396   op->stop();
3397   // stop blob event ops
3398   if (op->theMainOp == NULL)
3399   {
3400     Uint64 max_stop_gci = op->m_stop_gci;
3401     NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
3402     while (tBlobOp != NULL)
3403     {
3404       tBlobOp->stop();
3405       Uint64 stop_gci = tBlobOp->m_stop_gci;
3406       if (stop_gci > max_stop_gci)
3407         max_stop_gci = stop_gci;
3408       tBlobOp = tBlobOp->m_next;
3409     }
3410     tBlobOp = op->theBlobOpList;
3411     while (tBlobOp != NULL)
3412     {
3413       tBlobOp->m_stop_gci = max_stop_gci;
3414       tBlobOp = tBlobOp->m_next;
3415     }
3416     op->m_stop_gci = max_stop_gci;
3417   }
3418 
3419   /**
3420    * Needs mutex lock as report_node_XXX accesses list...
3421    */
3422   NdbMutex_Lock(m_mutex);
3423 
3424   // release blob handles now, further access is user error
3425   if (op->theMainOp == NULL)
3426   {
3427     while (op->theBlobList != NULL)
3428     {
3429       NdbBlob* tBlob = op->theBlobList;
3430       op->theBlobList = tBlob->theNext;
3431       m_ndb->releaseNdbBlob(tBlob);
3432     }
3433   }
3434 
3435   if (op->m_next)
3436     op->m_next->m_prev= op->m_prev;
3437   if (op->m_prev)
3438     op->m_prev->m_next= op->m_next;
3439   else
3440     m_ndb->theImpl->m_ev_op= op->m_next;
3441 
3442   assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
3443 
3444   DBUG_ASSERT(op->m_ref_count > 0);
3445   // remove user reference
3446   // added in createEventOperation
3447   // user error to use reference after this
3448   op->m_ref_count--;
3449   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
3450   if (op->m_ref_count == 0)
3451   {
3452     NdbMutex_Unlock(m_mutex);
3453     DBUG_PRINT("info", ("deleting op: %p", op));
3454     delete op->m_facade;
3455   }
3456   else
3457   {
3458     op->m_next= m_dropped_ev_op;
3459     op->m_prev= 0;
3460     if (m_dropped_ev_op)
3461       m_dropped_ev_op->m_prev= op;
3462     m_dropped_ev_op= op;
3463 
3464     NdbMutex_Unlock(m_mutex);
3465   }
3466   DBUG_VOID_RETURN;
3467 }
3468 
3469 void
reportStatus()3470 NdbEventBuffer::reportStatus()
3471 {
3472   EventBufData *apply_buf= m_available_data.m_head;
3473   Uint64 apply_gci, latest_gci= m_latestGCI;
3474   if (apply_buf == 0)
3475     apply_buf= m_complete_data.m_data.m_head;
3476   if (apply_buf && apply_buf->sdata)
3477   {
3478     Uint32 gci_hi = apply_buf->sdata->gci_hi;
3479     Uint32 gci_lo = apply_buf->sdata->gci_lo;
3480     apply_gci= gci_lo | (Uint64(gci_hi) << 32);
3481   }
3482   else
3483     apply_gci= latest_gci;
3484 
3485   if (m_free_thresh)
3486   {
3487     if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
3488         m_total_alloc > 1024*1024)
3489     {
3490       /* report less free buffer than m_free_thresh,
3491          next report when more free than 2 * m_free_thresh
3492       */
3493       m_min_free_thresh= 0;
3494       m_max_free_thresh= 2 * m_free_thresh;
3495       goto send_report;
3496     }
3497 
3498     if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
3499         m_total_alloc > 1024*1024)
3500     {
3501       /* report more free than 2 * m_free_thresh
3502          next report when less free than m_free_thresh
3503       */
3504       m_min_free_thresh= m_free_thresh;
3505       m_max_free_thresh= 100;
3506       goto send_report;
3507     }
3508   }
3509   if (m_gci_slip_thresh &&
3510       (latest_gci-apply_gci >= m_gci_slip_thresh))
3511   {
3512     goto send_report;
3513   }
3514   return;
3515 
3516 send_report:
3517   Uint32 data[8];
3518   data[0]= NDB_LE_EventBufferStatus;
3519   data[1]= m_total_alloc-m_free_data_sz;
3520   data[2]= m_total_alloc;
3521   data[3]= 0;
3522   data[4]= (Uint32)(apply_gci);
3523   data[5]= (Uint32)(apply_gci >> 32);
3524   data[6]= (Uint32)(latest_gci);
3525   data[7]= (Uint32)(latest_gci >> 32);
3526   Ndb_internal::send_event_report(true, m_ndb, data,8);
3527 #ifdef VM_TRACE
3528   assert(m_total_alloc >= m_free_data_sz);
3529 #endif
3530 }
3531 
3532 #ifdef VM_TRACE
3533 void
verify_size(const EventBufData * data,Uint32 count,Uint32 sz)3534 NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
3535 {
3536 #if 0
3537   Uint32 tmp_count = 0;
3538   Uint32 tmp_sz = 0;
3539   while (data != 0) {
3540     Uint32 full_count, full_sz;
3541     data->get_full_size(full_count, full_sz);
3542     tmp_count += full_count;
3543     tmp_sz += full_sz;
3544     data = data->m_next;
3545   }
3546   assert(tmp_count == count);
3547   assert(tmp_sz == sz);
3548 #endif
3549 }
3550 void
verify_size(const EventBufData_list & list)3551 NdbEventBuffer::verify_size(const EventBufData_list & list)
3552 {
3553 #if 0
3554   verify_size(list.m_head, list.m_count, list.m_sz);
3555 #endif
3556 }
3557 #endif
3558 
3559 // hash table routines
3560 
3561 // could optimize the all-fixed case
3562 Uint32
getpkhash(NdbEventOperationImpl * op,LinearSectionPtr ptr[3])3563 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3564 {
3565   DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
3566   DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3567   DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3568 
3569   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3570 
3571   // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
3572   // for pk update (to equivalent pk) post/pre values give same hash
3573   Uint32 nkey = tab->m_noOfKeys;
3574   assert(nkey != 0 && nkey <= ptr[0].sz);
3575   const Uint32* hptr = ptr[0].p;
3576   const uchar* dptr = (uchar*)ptr[1].p;
3577 
3578   // hash registers
3579   ulong nr1 = 0;
3580   ulong nr2 = 0;
3581   while (nkey-- != 0)
3582   {
3583     AttributeHeader ah(*hptr++);
3584     Uint32 bytesize = ah.getByteSize();
3585     assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
3586 
3587     Uint32 i = ah.getAttributeId();
3588     const NdbColumnImpl* col = tab->getColumn(i);
3589     assert(col != 0);
3590 
3591     Uint32 lb, len;
3592     bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
3593     assert(ok);
3594 
3595     CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3596     (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
3597     dptr += ((bytesize + 3) / 4) * 4;
3598   }
3599   DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
3600   DBUG_RETURN_EVENT(nr1);
3601 }
3602 
3603 bool
getpkequal(NdbEventOperationImpl * op,LinearSectionPtr ptr1[3],LinearSectionPtr ptr2[3])3604 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
3605 {
3606   DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
3607   DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
3608   DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
3609   DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
3610   DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
3611 
3612   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3613 
3614   Uint32 nkey = tab->m_noOfKeys;
3615   assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
3616   const Uint32* hptr1 = ptr1[0].p;
3617   const Uint32* hptr2 = ptr2[0].p;
3618   const uchar* dptr1 = (uchar*)ptr1[1].p;
3619   const uchar* dptr2 = (uchar*)ptr2[1].p;
3620 
3621   bool equal = true;
3622 
3623   while (nkey-- != 0)
3624   {
3625     AttributeHeader ah1(*hptr1++);
3626     AttributeHeader ah2(*hptr2++);
3627     // sizes can differ on update of varchar endspace
3628     Uint32 bytesize1 = ah1.getByteSize();
3629     Uint32 bytesize2 = ah2.getByteSize();
3630     assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
3631     assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
3632 
3633     assert(ah1.getAttributeId() == ah2.getAttributeId());
3634     Uint32 i = ah1.getAttributeId();
3635     const NdbColumnImpl* col = tab->getColumn(i);
3636     assert(col != 0);
3637 
3638     Uint32 lb1, len1;
3639     bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
3640     Uint32 lb2, len2;
3641     bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
3642     assert(ok1 && ok2 && lb1 == lb2);
3643 
3644     CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3645     int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
3646     if (res != 0)
3647     {
3648       equal = false;
3649       break;
3650     }
3651     dptr1 += ((bytesize1 + 3) / 4) * 4;
3652     dptr2 += ((bytesize2 + 3) / 4) * 4;
3653   }
3654 
3655   DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
3656   DBUG_RETURN_EVENT(equal);
3657 }
3658 
3659 void
search(Pos & hpos,NdbEventOperationImpl * op,LinearSectionPtr ptr[3])3660 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3661 {
3662   DBUG_ENTER_EVENT("EventBufData_hash::search");
3663   Uint32 pkhash = getpkhash(op, ptr);
3664   Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
3665   EventBufData* data = m_hash[index];
3666   while (data != 0)
3667   {
3668     if (data->m_event_op == op &&
3669         data->m_pkhash == pkhash &&
3670         getpkequal(op, data->ptr, ptr))
3671       break;
3672     data = data->m_next_hash;
3673   }
3674   hpos.index = index;
3675   hpos.data = data;
3676   hpos.pkhash = pkhash;
3677   DBUG_PRINT_EVENT("info", ("search result=%p", data));
3678   DBUG_VOID_RETURN_EVENT;
3679 }
3680 
3681 template class Vector<Gci_container_pod>;
3682 template class Vector<NdbEventBuffer::EventBufData_chunk*>;
3683