1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27 #include <kernel_types.h>
28
29 #include "API.hpp"
30 #include <NdbOut.hpp>
31
32 #include <signaldata/CreateEvnt.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <SimpleProperties.hpp>
35 #include <Bitmask.hpp>
36 #include <AttributeHeader.hpp>
37 #include <AttributeList.hpp>
38 #include <NdbError.hpp>
39 #include <BaseString.hpp>
40 #include <UtilBuffer.hpp>
41 #include <portlib/NdbMem.h>
42 #include <signaldata/AlterTable.hpp>
43 #include "ndb_internal.hpp"
44
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47
48 #define TOTAL_BUCKETS_INIT (1U << 15)
49 static Gci_container_pod g_empty_gci_container;
50
51 #if defined(VM_TRACE) && defined(NOT_USED)
52 static void
print_std(const SubTableData * sdata,LinearSectionPtr ptr[3])53 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
54 {
55 printf("addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (void*)sdata,
56 sdata->gci_hi, sdata->gci_lo,
57 SubTableData::getOperation(sdata->requestInfo));
58 for (int i = 0; i <= 2; i++) {
59 printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
60 for (int j = 0; (uint) j < ptr[i].sz; j++)
61 printf("%08x ", ptr[i].p[j]);
62 printf("\n");
63 }
64 }
65 #endif
66
67 // EventBufData
68
69 void
add_part_size(Uint32 & full_count,Uint32 & full_sz) const70 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
71 {
72 Uint32 tmp_count = 0;
73 Uint32 tmp_sz = 0;
74 const EventBufData* data2 = m_next_blob;
75 while (data2 != 0) {
76 tmp_count++;
77 tmp_sz += data2->sz;
78 const EventBufData* data3 = data2->m_next;
79 while (data3 != 0) {
80 tmp_count++;
81 tmp_sz += data3->sz;
82 data3 = data3->m_next;
83 }
84 data2 = data2->m_next_blob;
85 }
86 full_count += tmp_count;
87 full_sz += tmp_sz;
88 }
89
90 /*
91 * Class NdbEventOperationImpl
92 *
93 *
94 */
95
96 // todo handle several ndb objects
97 // todo free allocated data when closing NdbEventBuffer
98
NdbEventOperationImpl(NdbEventOperation & f,Ndb * theNdb,const char * eventName)99 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f,
100 Ndb *theNdb,
101 const char* eventName) :
102 NdbEventOperation(*this),
103 m_facade(&f),
104 m_ndb(theNdb),
105 m_state(EO_ERROR),
106 m_oid(~(Uint32)0),
107 m_allow_empty_update(false)
108 {
109 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
110
111 assert(m_ndb != NULL);
112 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
113 assert(myDict != NULL);
114
115 const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
116 if (!myEvnt)
117 {
118 m_error.code= myDict->getNdbError().code;
119 DBUG_VOID_RETURN;
120 }
121
122 init(myEvnt->m_impl);
123 DBUG_VOID_RETURN;
124 }
125
NdbEventOperationImpl(Ndb * theNdb,NdbEventImpl & evnt)126 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb,
127 NdbEventImpl& evnt) :
128 NdbEventOperation(*this),
129 m_facade(this),
130 m_ndb(theNdb),
131 m_state(EO_ERROR),
132 m_oid(~(Uint32)0),
133 m_allow_empty_update(false)
134 {
135 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
136 init(evnt);
137 DBUG_VOID_RETURN;
138 }
139
140 void
init(NdbEventImpl & evnt)141 NdbEventOperationImpl::init(NdbEventImpl& evnt)
142 {
143 DBUG_ENTER("NdbEventOperationImpl::init");
144
145 m_magic_number = 0;
146 mi_type = 0;
147 m_change_mask = 0;
148 #ifdef VM_TRACE
149 m_data_done_count = 0;
150 m_data_count = 0;
151 #endif
152 m_next = 0;
153 m_prev = 0;
154
155 m_eventId = 0;
156 theFirstPkAttrs[0] = NULL;
157 theCurrentPkAttrs[0] = NULL;
158 theFirstPkAttrs[1] = NULL;
159 theCurrentPkAttrs[1] = NULL;
160 theFirstDataAttrs[0] = NULL;
161 theCurrentDataAttrs[0] = NULL;
162 theFirstDataAttrs[1] = NULL;
163 theCurrentDataAttrs[1] = NULL;
164
165 theBlobList = NULL;
166 theBlobOpList = NULL;
167 theMainOp = NULL;
168 theBlobVersion = 0;
169
170 m_data_item= NULL;
171 m_eventImpl = NULL;
172
173 m_custom_data= 0;
174 m_has_error= 1;
175
176 // we should lookup id in Dictionary, TODO
177 // also make sure we only have one listener on each event
178
179 m_eventImpl = &evnt;
180
181 m_eventId = m_eventImpl->m_eventId;
182
183 m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
184
185 m_state= EO_CREATED;
186
187 m_stop_gci = 0;
188 #ifdef ndb_event_stores_merge_events_flag
189 m_mergeEvents = m_eventImpl->m_mergeEvents;
190 #else
191 m_mergeEvents = false;
192 #endif
193 m_ref_count = 0;
194 DBUG_PRINT("info", ("m_ref_count = 0 for op: 0x%lx", (long) this));
195
196 m_has_error= 0;
197
198 DBUG_PRINT("exit",("this: 0x%lx oid: %u", (long) this, m_oid));
199 DBUG_VOID_RETURN;
200 }
201
~NdbEventOperationImpl()202 NdbEventOperationImpl::~NdbEventOperationImpl()
203 {
204 DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
205 m_magic_number= 0;
206
207 if (m_oid == ~(Uint32)0)
208 DBUG_VOID_RETURN;
209
210 stop();
211
212 if (theMainOp == NULL)
213 {
214 NdbEventOperationImpl* tBlobOp = theBlobOpList;
215 while (tBlobOp != NULL)
216 {
217 NdbEventOperationImpl *op = tBlobOp;
218 tBlobOp = tBlobOp->m_next;
219 delete op;
220 }
221 }
222
223 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
224 DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p",
225 this, m_facade, m_oid, theMainOp));
226
227 if (m_eventImpl)
228 {
229 delete m_eventImpl->m_facade;
230 m_eventImpl= 0;
231 }
232
233 DBUG_VOID_RETURN;
234 }
235
236 NdbEventOperation::State
getState()237 NdbEventOperationImpl::getState()
238 {
239 return m_state;
240 }
241
242 NdbRecAttr*
getValue(const char * colName,char * aValue,int n)243 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
244 {
245 DBUG_ENTER("NdbEventOperationImpl::getValue");
246 if (m_state != EO_CREATED) {
247 ndbout_c("NdbEventOperationImpl::getValue may only be called between "
248 "instantiation and execute()");
249 DBUG_RETURN(NULL);
250 }
251
252 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
253
254 if (tAttrInfo == NULL) {
255 ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
256 DBUG_RETURN(NULL);
257 }
258
259 DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
260 }
261
262 NdbRecAttr*
getValue(const NdbColumnImpl * tAttrInfo,char * aValue,int n)263 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
264 {
265 DBUG_ENTER("NdbEventOperationImpl::getValue");
266 // Insert Attribute Id into ATTRINFO part.
267
268 NdbRecAttr **theFirstAttr;
269 NdbRecAttr **theCurrentAttr;
270
271 if (tAttrInfo->getPrimaryKey())
272 {
273 theFirstAttr = &theFirstPkAttrs[n];
274 theCurrentAttr = &theCurrentPkAttrs[n];
275 }
276 else
277 {
278 theFirstAttr = &theFirstDataAttrs[n];
279 theCurrentAttr = &theCurrentDataAttrs[n];
280 }
281
282 /************************************************************************
283 * Get a Receive Attribute object and link it into the operation object.
284 ************************************************************************/
285 NdbRecAttr *tAttr = m_ndb->getRecAttr();
286 if (tAttr == NULL) {
287 exit(-1);
288 //setErrorCodeAbort(4000);
289 DBUG_RETURN(NULL);
290 }
291
292 /**********************************************************************
293 * Now set the attribute identity and the pointer to the data in
294 * the RecAttr object
295 * Also set attribute size, array size and attribute type
296 ********************************************************************/
297 if (tAttr->setup(tAttrInfo, aValue)) {
298 //setErrorCodeAbort(4000);
299 m_ndb->releaseRecAttr(tAttr);
300 exit(-1);
301 DBUG_RETURN(NULL);
302 }
303 //theErrorLine++;
304
305 tAttr->setUNDEFINED();
306
307 // We want to keep the list sorted to make data insertion easier later
308
309 if (*theFirstAttr == NULL) {
310 *theFirstAttr = tAttr;
311 *theCurrentAttr = tAttr;
312 tAttr->next(NULL);
313 } else {
314 Uint32 tAttrId = tAttrInfo->m_attrId;
315 if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
316 (*theCurrentAttr)->next(tAttr);
317 tAttr->next(NULL);
318 *theCurrentAttr = tAttr;
319 } else if ((*theFirstAttr)->next() == NULL || // only one in list
320 (*theFirstAttr)->attrId() > tAttrId) {// or first
321 tAttr->next(*theFirstAttr);
322 *theFirstAttr = tAttr;
323 } else { // at least 2 in list and not first and not last
324 NdbRecAttr *p = *theFirstAttr;
325 NdbRecAttr *p_next = p->next();
326 while (tAttrId > p_next->attrId()) {
327 p = p_next;
328 p_next = p->next();
329 }
330 if (tAttrId == p_next->attrId()) { // Using same attribute twice
331 tAttr->release(); // do I need to do this?
332 m_ndb->releaseRecAttr(tAttr);
333 exit(-1);
334 DBUG_RETURN(NULL);
335 }
336 // this is it, between p and p_next
337 p->next(tAttr);
338 tAttr->next(p_next);
339 }
340 }
341 DBUG_RETURN(tAttr);
342 }
343
344 NdbBlob*
getBlobHandle(const char * colName,int n)345 NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
346 {
347 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
348
349 assert(m_mergeEvents);
350
351 if (m_state != EO_CREATED) {
352 ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
353 "instantiation and execute()");
354 DBUG_RETURN(NULL);
355 }
356
357 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
358
359 if (tAttrInfo == NULL) {
360 ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
361 DBUG_RETURN(NULL);
362 }
363
364 NdbBlob* bh = getBlobHandle(tAttrInfo, n);
365 DBUG_RETURN(bh);
366 }
367
368 NdbBlob*
getBlobHandle(const NdbColumnImpl * tAttrInfo,int n)369 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
370 {
371 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
372 DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
373
374 // as in NdbOperation, create only one instance
375 NdbBlob* tBlob = theBlobList;
376 NdbBlob* tLastBlob = NULL;
377 while (tBlob != NULL) {
378 if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
379 DBUG_RETURN(tBlob);
380 tLastBlob = tBlob;
381 tBlob = tBlob->theNext;
382 }
383
384 NdbEventOperationImpl* tBlobOp = NULL;
385
386 const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
387 assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
388
389 if (! is_tinyblob) {
390 // blob event name
391 char bename[MAX_TAB_NAME_SIZE];
392 NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
393
394 // find blob event op if any (it serves both post and pre handles)
395 tBlobOp = theBlobOpList;
396 NdbEventOperationImpl* tLastBlopOp = NULL;
397 while (tBlobOp != NULL) {
398 if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
399 break;
400 }
401 tLastBlopOp = tBlobOp;
402 tBlobOp = tBlobOp->m_next;
403 }
404
405 DBUG_PRINT("info", ("%s blob event op for %s",
406 tBlobOp ? " reuse" : " create", bename));
407
408 // create blob event op if not found
409 if (tBlobOp == NULL) {
410 // get blob event
411 NdbDictionaryImpl& dict =
412 NdbDictionaryImpl::getImpl(*m_ndb->getDictionary());
413 NdbEventImpl* blobEvnt =
414 dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
415 if (blobEvnt == NULL) {
416 m_error.code = dict.m_error.code;
417 DBUG_RETURN(NULL);
418 }
419
420 // create blob event operation
421 tBlobOp =
422 m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
423 if (tBlobOp == NULL)
424 DBUG_RETURN(NULL);
425
426 // pointer to main table op
427 tBlobOp->theMainOp = this;
428 tBlobOp->m_mergeEvents = m_mergeEvents;
429 tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
430
431 // to hide blob op it is linked under main op, not under m_ndb
432 if (tLastBlopOp == NULL)
433 theBlobOpList = tBlobOp;
434 else
435 tLastBlopOp->m_next = tBlobOp;
436 tBlobOp->m_next = NULL;
437 }
438 }
439
440 tBlob = m_ndb->getNdbBlob();
441 if (tBlob == NULL) {
442 m_error.code = m_ndb->getNdbError().code;
443 DBUG_RETURN(NULL);
444 }
445
446 // calls getValue on inline and blob part
447 if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
448 m_error.code = tBlob->getNdbError().code;
449 m_ndb->releaseNdbBlob(tBlob);
450 DBUG_RETURN(NULL);
451 }
452
453 // add to list end
454 if (tLastBlob == NULL)
455 theBlobList = tBlob;
456 else
457 tLastBlob->theNext = tBlob;
458 tBlob->theNext = NULL;
459 DBUG_RETURN(tBlob);
460 }
461
462 Uint32
get_blob_part_no(bool hasDist)463 NdbEventOperationImpl::get_blob_part_no(bool hasDist)
464 {
465 assert(theBlobVersion == 1 || theBlobVersion == 2);
466 assert(theMainOp != NULL);
467 const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
468 assert(m_data_item != NULL);
469 LinearSectionPtr (&ptr)[3] = m_data_item->ptr;
470
471 uint pos = 0; // PK and possibly DIST to skip
472
473 if (unlikely(theBlobVersion == 1)) {
474 pos += AttributeHeader(ptr[0].p[0]).getDataSize();
475 assert(hasDist);
476 pos += AttributeHeader(ptr[0].p[1]).getDataSize();
477 } else {
478 uint n = mainTable->m_noOfKeys;
479 uint i;
480 for (i = 0; i < n; i++) {
481 pos += AttributeHeader(ptr[0].p[i]).getDataSize();
482 }
483 if (hasDist)
484 pos += AttributeHeader(ptr[0].p[n]).getDataSize();
485 }
486
487 assert(pos < ptr[1].sz);
488 Uint32 no = ptr[1].p[pos];
489 return no;
490 }
491
492 int
readBlobParts(char * buf,NdbBlob * blob,Uint32 part,Uint32 count,Uint16 * lenLoc)493 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
494 Uint32 part, Uint32 count, Uint16* lenLoc)
495 {
496 DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
497 DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
498 part, count, blob->theEventBlobVersion));
499
500 NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
501 const bool hasDist = (blob->theStripeSize != 0);
502
503 DBUG_PRINT_EVENT("info", ("m_data_item=%p", m_data_item));
504 assert(m_data_item != NULL);
505
506 // search for blob parts list head
507 EventBufData* head;
508 assert(m_data_item != NULL);
509 head = m_data_item->m_next_blob;
510 while (head != NULL)
511 {
512 if (head->m_event_op == blob_op)
513 {
514 DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
515 break;
516 }
517 head = head->m_next_blob;
518 }
519
520 Uint32 nparts = 0;
521 Uint32 noutside = 0;
522 EventBufData* data = head;
523 // XXX optimize using part no ordering
524 while (data != NULL)
525 {
526 /*
527 * Hack part no directly out of buffer since it is not returned
528 * in pre data (PK buglet). For part data use receive_event().
529 * This means extra copy. XXX fix
530 */
531 blob_op->m_data_item = data;
532 int r = blob_op->receive_event();
533 require(r > 0);
534 // XXX should be: no = blob->theBlobEventPartValue
535 Uint32 no = blob_op->get_blob_part_no(hasDist);
536
537 DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part", data, no));
538
539 if (part <= no && no < part + count)
540 {
541 DBUG_PRINT_EVENT("info", ("part within read range"));
542
543 const char* src = blob->theBlobEventDataBuf.data;
544 Uint32 sz = 0;
545 if (blob->theFixedDataFlag) {
546 sz = blob->thePartSize;
547 } else {
548 const uchar* p = (const uchar*)blob->theBlobEventDataBuf.data;
549 sz = p[0] + (p[1] << 8);
550 src += 2;
551 }
552 memcpy(buf + (no - part) * sz, src, sz);
553 nparts++;
554 if (lenLoc != NULL) {
555 assert(count == 1);
556 *lenLoc = sz;
557 } else {
558 assert(sz == blob->thePartSize);
559 }
560 }
561 else
562 {
563 DBUG_PRINT_EVENT("info", ("part outside read range"));
564 noutside++;
565 }
566 data = data->m_next;
567 }
568 if (unlikely(nparts != count))
569 {
570 ndbout_c("nparts: %u count: %u noutside: %u", nparts, count, noutside);
571 }
572 assert(nparts == count);
573
574 DBUG_RETURN_EVENT(0);
575 }
576
577 int
execute()578 NdbEventOperationImpl::execute()
579 {
580 DBUG_ENTER("NdbEventOperationImpl::execute");
581 m_ndb->theEventBuffer->add_drop_lock();
582 int r = execute_nolock();
583 m_ndb->theEventBuffer->add_drop_unlock();
584 DBUG_RETURN(r);
585 }
586
587 int
execute_nolock()588 NdbEventOperationImpl::execute_nolock()
589 {
590 DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
591 DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
592
593 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
594 if (!myDict) {
595 m_error.code= m_ndb->getNdbError().code;
596 DBUG_RETURN(-1);
597 }
598
599 bool schemaTrans = false;
600 if (m_ndb->theEventBuffer->m_prevent_nodegroup_change)
601 {
602 /*
603 * Since total count of sub data streams (Suma buckets)
604 * are initially set when the first subscription are setup,
605 * a dummy schema transaction are used to stop add or drop
606 * node to occur for first subscription. Otherwise count may
607 * change before we are in a state to detect that correctly.
608 * This should not be needed since the handling of
609 * SUB_GCP_COMPLETE_REP in recevier thread(s) should handle
610 * this, but until sure this behaviour is kept.
611 */
612 int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(false);
613 if (res != 0)
614 {
615 switch(myDict->getNdbError().code){
616 case 711:
617 case 763:
618 // ignore;
619 break;
620 default:
621 m_error.code= myDict->getNdbError().code;
622 DBUG_RETURN(-1);
623 }
624 }
625 else
626 {
627 schemaTrans = true;
628 }
629 }
630
631 if (theFirstPkAttrs[0] == NULL &&
632 theFirstDataAttrs[0] == NULL) { // defaults to get all
633 }
634
635 m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
636 m_state= EO_EXECUTING;
637 mi_type= m_eventImpl->mi_type;
638 // add kernel reference
639 // removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
640 m_ref_count++;
641 m_stop_gci= ~(Uint64)0;
642 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
643 int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
644 if (r == 0)
645 {
646 m_ndb->theEventBuffer->m_prevent_nodegroup_change = false;
647 if (schemaTrans)
648 {
649 schemaTrans = false;
650 myDict->endSchemaTrans(1);
651 }
652
653 if (theMainOp == NULL) {
654 DBUG_PRINT("info", ("execute blob ops"));
655 NdbEventOperationImpl* blob_op = theBlobOpList;
656 while (blob_op != NULL) {
657 r = blob_op->execute_nolock();
658 if (r != 0) {
659 // since main op is running and possibly some blob ops as well
660 // we can't just reset the main op. Instead return with error,
661 // main op (and blob ops) will be cleaned up when user calls
662 // dropEventOperation
663 m_error.code= myDict->getNdbError().code;
664 DBUG_RETURN(r);
665 }
666 blob_op = blob_op->m_next;
667 }
668 }
669 if (r == 0)
670 {
671 DBUG_RETURN(0);
672 }
673 }
674 // Error
675 // remove kernel reference
676 // added above
677 m_ref_count--;
678 m_stop_gci = 0;
679 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
680 m_state= EO_ERROR;
681 mi_type= 0;
682 m_magic_number= 0;
683 m_error.code= myDict->getNdbError().code;
684
685 if (schemaTrans)
686 {
687 schemaTrans = false;
688 myDict->endSchemaTrans(1);
689 }
690
691 DBUG_RETURN(r);
692 }
693
694 int
stop()695 NdbEventOperationImpl::stop()
696 {
697 DBUG_ENTER("NdbEventOperationImpl::stop");
698 int i;
699
700 for (i=0 ; i<2; i++) {
701 NdbRecAttr *p = theFirstPkAttrs[i];
702 while (p) {
703 NdbRecAttr *p_next = p->next();
704 m_ndb->releaseRecAttr(p);
705 p = p_next;
706 }
707 theFirstPkAttrs[i]= 0;
708 }
709 for (i=0 ; i<2; i++) {
710 NdbRecAttr *p = theFirstDataAttrs[i];
711 while (p) {
712 NdbRecAttr *p_next = p->next();
713 m_ndb->releaseRecAttr(p);
714 p = p_next;
715 }
716 theFirstDataAttrs[i]= 0;
717 }
718
719 if (m_state != EO_EXECUTING)
720 {
721 DBUG_RETURN(-1);
722 }
723
724 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
725 if (!myDict) {
726 m_error.code= m_ndb->getNdbError().code;
727 DBUG_RETURN(-1);
728 }
729
730 m_ndb->theEventBuffer->add_drop_lock();
731 int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
732 m_state= EO_DROPPED;
733 mi_type= 0;
734 if (r == 0) {
735 if (m_stop_gci == 0)
736 {
737 // response from old kernel
738 Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
739 if (gci)
740 {
741 // calculate a "safe" gci in the future to remove event op.
742 gci += Uint64(3) << 32;
743 }
744 else
745 {
746 // set highest value to ensure that operation does not get dropped
747 // too early. Note '-1' as ~Uint64(0) indicates active event
748 gci = ~Uint64(0)-1;
749 }
750 m_stop_gci = gci;
751 }
752 m_ndb->theEventBuffer->add_drop_unlock();
753 DBUG_RETURN(0);
754 }
755 //Error
756 m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
757 m_state= EO_ERROR;
758 m_ndb->theEventBuffer->add_drop_unlock();
759 DBUG_RETURN(r);
760 }
761
tableNameChanged() const762 bool NdbEventOperationImpl::tableNameChanged() const
763 {
764 return (bool)AlterTableReq::getNameFlag(m_change_mask);
765 }
766
tableFrmChanged() const767 bool NdbEventOperationImpl::tableFrmChanged() const
768 {
769 return (bool)AlterTableReq::getFrmFlag(m_change_mask);
770 }
771
tableFragmentationChanged() const772 bool NdbEventOperationImpl::tableFragmentationChanged() const
773 {
774 return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
775 }
776
tableRangeListChanged() const777 bool NdbEventOperationImpl::tableRangeListChanged() const
778 {
779 return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
780 }
781
782 Uint64
getGCI()783 NdbEventOperationImpl::getGCI()
784 {
785 Uint32 gci_hi = m_data_item->sdata->gci_hi;
786 Uint32 gci_lo = m_data_item->sdata->gci_lo;
787 return gci_lo | (Uint64(gci_hi) << 32);
788 }
789
790 bool
isErrorEpoch(NdbDictionary::Event::TableEvent * error_type)791 NdbEventOperationImpl::isErrorEpoch(NdbDictionary::Event::TableEvent *error_type)
792 {
793 const NdbDictionary::Event::TableEvent type = getEventType2();
794 // Error types are defined from TE_INCONSISTENT
795 if (type >= NdbDictionary::Event::TE_INCONSISTENT)
796 {
797 if (error_type)
798 *error_type = type;
799 return true;
800 }
801 return false;
802 }
803
804 bool
isEmptyEpoch()805 NdbEventOperationImpl::isEmptyEpoch()
806 {
807 const Uint32 type = getEventType2();
808 if (type == NdbDictionary::Event::TE_EMPTY)
809 return true;
810 return false;
811 }
812
813 Uint32
getAnyValue() const814 NdbEventOperationImpl::getAnyValue() const
815 {
816 return m_data_item->sdata->anyValue;
817 }
818
819 Uint64
getLatestGCI()820 NdbEventOperationImpl::getLatestGCI()
821 {
822 return m_ndb->theEventBuffer->getLatestGCI();
823 }
824
825 Uint64
getTransId() const826 NdbEventOperationImpl::getTransId() const
827 {
828 /* Return 64 bit composite */
829 Uint32 transId1 = m_data_item->sdata->transId1;
830 Uint32 transId2 = m_data_item->sdata->transId2;
831 return Uint64(transId1) << 32 | transId2;
832 }
833
834 bool
execSUB_TABLE_DATA(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])835 NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal,
836 const LinearSectionPtr ptr[3])
837 {
838 DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
839 const SubTableData * const sdata=
840 CAST_CONSTPTR(SubTableData, signal->getDataPtr());
841
842 if(signal->isFirstFragment()){
843 m_fragmentId = signal->getFragmentId();
844 m_buffer.grow(4 * sdata->totalLen);
845 } else {
846 if(m_fragmentId != signal->getFragmentId()){
847 abort();
848 }
849 }
850
851 const Uint32 i = SubTableData::DICT_TAB_INFO;
852 DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
853 4 * ptr[i].sz, m_fragmentId));
854 m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
855
856 if(!signal->isLastFragment()){
857 DBUG_RETURN(FALSE);
858 }
859
860 DBUG_RETURN(TRUE);
861 }
862
863
864 int
receive_event()865 NdbEventOperationImpl::receive_event()
866 {
867 Uint32 operation=
868 SubTableData::getOperation(m_data_item->sdata->requestInfo);
869 if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
870 {
871 DBUG_ENTER("NdbEventOperationImpl::receive_event");
872 DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this));
873 m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
874 if (operation == NdbDictionary::Event::_TE_ALTER)
875 {
876 // Parse the new table definition and
877 // create a table object
878 NdbDictInterface::Tx tx_unused;
879 NdbError error;
880 int warn;
881 NdbDictInterface dif(tx_unused, error, warn);
882 NdbTableImpl *at;
883 m_change_mask = m_data_item->sdata->changeMask;
884 error.code = dif.parseTableInfo(&at,
885 (Uint32*)m_buffer.get_data(),
886 m_buffer.length() / 4,
887 true);
888 m_buffer.clear();
889 if (unlikely(error.code))
890 {
891 DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
892 error.code));
893 ndbout_c("Failed to parse DictTabInfo error %u", error.code);
894 DBUG_RETURN(1);
895 }
896 at->buildColumnHash();
897
898 NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
899 m_eventImpl->m_tableImpl = at;
900
901 DBUG_PRINT("info", ("switching table impl 0x%lx -> 0x%lx",
902 (long) tmp_table_impl, (long) at));
903
904 // change the rec attrs to refer to the new table object
905 int i;
906 for (i = 0; i < 2; i++)
907 {
908 NdbRecAttr *p = theFirstPkAttrs[i];
909 while (p)
910 {
911 int no = p->getColumn()->getColumnNo();
912 NdbColumnImpl *tAttrInfo = at->getColumn(no);
913 DBUG_PRINT("info", ("rec_attr: 0x%lx "
914 "switching column impl 0x%lx -> 0x%lx",
915 (long) p, (long) p->m_column, (long) tAttrInfo));
916 p->m_column = tAttrInfo;
917 p = p->next();
918 }
919 }
920 for (i = 0; i < 2; i++)
921 {
922 NdbRecAttr *p = theFirstDataAttrs[i];
923 while (p)
924 {
925 int no = p->getColumn()->getColumnNo();
926 NdbColumnImpl *tAttrInfo = at->getColumn(no);
927 DBUG_PRINT("info", ("rec_attr: 0x%lx "
928 "switching column impl 0x%lx -> 0x%lx",
929 (long) p, (long) p->m_column, (long) tAttrInfo));
930 p->m_column = tAttrInfo;
931 p = p->next();
932 }
933 }
934 // change the blobHandle's to refer to the new table object.
935 NdbBlob *p = theBlobList;
936 while (p)
937 {
938 int no = p->getColumn()->getColumnNo();
939 NdbColumnImpl *tAttrInfo = at->getColumn(no);
940 DBUG_PRINT("info", ("blob_handle: 0x%lx "
941 "switching column impl 0x%lx -> 0x%lx",
942 (long) p, (long) p->theColumn, (long) tAttrInfo));
943 p->theColumn = tAttrInfo;
944 p = p->next();
945 }
946 if (tmp_table_impl)
947 delete tmp_table_impl;
948 }
949 DBUG_RETURN(1);
950 }
951
952 DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
953 DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this));
954 // now move the data into the RecAttrs
955 m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
956
957 int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
958
959 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
960 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
961 Uint32 *aDataPtr = m_data_item->ptr[1].p;
962
963 DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
964 DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
965
966 // copy data into the RecAttr's
967 // we assume that the respective attribute lists are sorted
968
969 // first the pk's
970 {
971 NdbRecAttr *tAttr= theFirstPkAttrs[0];
972 NdbRecAttr *tAttr1= theFirstPkAttrs[1];
973 while(tAttr)
974 {
975 assert(aAttrPtr < aAttrEndPtr);
976 unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
977 assert(tAttr->attrId() ==
978 AttributeHeader(*aAttrPtr).getAttributeId());
979 receive_data(tAttr, aDataPtr, tDataSz);
980 if (!is_insert)
981 receive_data(tAttr1, aDataPtr, tDataSz);
982 else
983 tAttr1->setUNDEFINED(); // do not leave unspecified
984 tAttr1= tAttr1->next();
985 // next
986 aAttrPtr++;
987 aDataPtr+= (tDataSz + 3) >> 2;
988 tAttr= tAttr->next();
989 }
990 }
991
992 NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
993 Uint32 tRecAttrId;
994 Uint32 tAttrId;
995 Uint32 tDataSz;
996 int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE) ||
997 m_allow_empty_update;
998 while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
999 tRecAttrId = tWorkingRecAttr->attrId();
1000 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1001 tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
1002
1003 while (tAttrId > tRecAttrId) {
1004 DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
1005 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
1006 tWorkingRecAttr->setUNDEFINED();
1007 tWorkingRecAttr = tWorkingRecAttr->next();
1008 if (tWorkingRecAttr == NULL)
1009 break;
1010 tRecAttrId = tWorkingRecAttr->attrId();
1011 }
1012 if (tWorkingRecAttr == NULL)
1013 break;
1014
1015 if (tAttrId == tRecAttrId) {
1016 hasSomeData=1;
1017
1018 DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
1019 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
1020
1021 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1022 tWorkingRecAttr = tWorkingRecAttr->next();
1023 }
1024 aAttrPtr++;
1025 aDataPtr += (tDataSz + 3) >> 2;
1026 }
1027
1028 while (tWorkingRecAttr != NULL) {
1029 tRecAttrId = tWorkingRecAttr->attrId();
1030 //printf("set undefined [%u] %u %u [%u]\n",
1031 // tAttrId, tDataSz, *aDataPtr, tRecAttrId);
1032 tWorkingRecAttr->setUNDEFINED();
1033 tWorkingRecAttr = tWorkingRecAttr->next();
1034 }
1035
1036 tWorkingRecAttr = theFirstDataAttrs[1];
1037 aDataPtr = m_data_item->ptr[2].p;
1038 Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1039 while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1040 tRecAttrId = tWorkingRecAttr->attrId();
1041 tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
1042 tDataSz = AttributeHeader(*aDataPtr).getByteSize();
1043 aDataPtr++;
1044 while (tAttrId > tRecAttrId) {
1045 tWorkingRecAttr->setUNDEFINED();
1046 tWorkingRecAttr = tWorkingRecAttr->next();
1047 if (tWorkingRecAttr == NULL)
1048 break;
1049 tRecAttrId = tWorkingRecAttr->attrId();
1050 }
1051 if (tWorkingRecAttr == NULL)
1052 break;
1053 if (tAttrId == tRecAttrId) {
1054 assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
1055 hasSomeData=1;
1056
1057 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1058 tWorkingRecAttr = tWorkingRecAttr->next();
1059 }
1060 aDataPtr += (tDataSz + 3) >> 2;
1061 }
1062 while (tWorkingRecAttr != NULL) {
1063 tWorkingRecAttr->setUNDEFINED();
1064 tWorkingRecAttr = tWorkingRecAttr->next();
1065 }
1066
1067 if (hasSomeData)
1068 {
1069 DBUG_RETURN_EVENT(1);
1070 }
1071
1072 DBUG_RETURN_EVENT(0);
1073 }
1074
1075 NdbDictionary::Event::TableEvent
getEventType2()1076 NdbEventOperationImpl::getEventType2()
1077 {
1078 return (NdbDictionary::Event::TableEvent)
1079 (1U << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1080 }
1081
1082
1083
1084 void
print()1085 NdbEventOperationImpl::print()
1086 {
1087 int i;
1088 ndbout << "EventId " << m_eventId << "\n";
1089
1090 for (i = 0; i < 2; i++) {
1091 NdbRecAttr *p = theFirstPkAttrs[i];
1092 ndbout << " %u " << i;
1093 while (p) {
1094 ndbout << " : " << p->attrId() << " = " << *p;
1095 p = p->next();
1096 }
1097 ndbout << "\n";
1098 }
1099 for (i = 0; i < 2; i++) {
1100 NdbRecAttr *p = theFirstDataAttrs[i];
1101 ndbout << " %u " << i;
1102 while (p) {
1103 ndbout << " : " << p->attrId() << " = " << *p;
1104 p = p->next();
1105 }
1106 ndbout << "\n";
1107 }
1108 }
1109
1110 void
printAll()1111 NdbEventOperationImpl::printAll()
1112 {
1113 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1114 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1115 Uint32 *aDataPtr = m_data_item->ptr[1].p;
1116
1117 //tRecAttr->setup(tAttrInfo, aValue)) {
1118
1119 Uint32 tAttrId;
1120 Uint32 tDataSz;
1121 for (; aAttrPtr < aAttrEndPtr; ) {
1122 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1123 tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
1124
1125 aAttrPtr++;
1126 aDataPtr += tDataSz;
1127 }
1128 }
1129
EventBufferManager(const Ndb * const ndb)1130 EventBufferManager::EventBufferManager(const Ndb* const ndb) :
1131 m_ndb(ndb),
1132 m_pre_gap_epoch(0), // equivalent to setting state COMPLETELY_BUFFERING
1133 m_begin_gap_epoch(0),
1134 m_end_gap_epoch(0),
1135 m_max_buffered_epoch(0),
1136 m_max_received_epoch(0),
1137 m_free_percent(20),
1138 m_event_buffer_manager_state(EBM_COMPLETELY_BUFFERING)
1139 {}
1140
1141 unsigned
get_eventbuffer_free_percent()1142 EventBufferManager::get_eventbuffer_free_percent()
1143 {
1144 return m_free_percent;
1145 }
1146
1147 void
set_eventbuffer_free_percent(unsigned free)1148 EventBufferManager::set_eventbuffer_free_percent(unsigned free)
1149 {
1150 m_free_percent = free;
1151 }
1152
1153 void
onBufferingEpoch(Uint64 received_epoch)1154 EventBufferManager::onBufferingEpoch(Uint64 received_epoch)
1155 {
1156 if (m_max_buffered_epoch < received_epoch)
1157 m_max_buffered_epoch = received_epoch;
1158 }
1159
1160 bool
onEventDataReceived(Uint32 memory_usage_percent,Uint64 received_epoch)1161 EventBufferManager::onEventDataReceived(Uint32 memory_usage_percent,
1162 Uint64 received_epoch)
1163 {
1164 bool report_status = false;
1165
1166 if (isCompletelyBuffering())
1167 {
1168 if (memory_usage_percent >= 100)
1169 {
1170 // Transition COMPLETELY_BUFFERING -> PARTIALLY_DISCARDING.
1171 m_pre_gap_epoch = m_max_buffered_epoch;
1172 m_event_buffer_manager_state = EBM_PARTIALLY_DISCARDING;
1173 report_status = true;
1174 }
1175 }
1176 else if (isCompletelyDiscarding())
1177 {
1178 if (memory_usage_percent < 100 - m_free_percent)
1179 {
1180 // Transition COMPLETELY_DISCARDING -> PARTIALLY_BUFFERING
1181 m_end_gap_epoch = m_max_received_epoch;
1182 m_event_buffer_manager_state = EBM_PARTIALLY_BUFFERING;
1183 report_status = true;
1184 }
1185 }
1186 else if (isPartiallyBuffering())
1187 {
1188 if (memory_usage_percent >= 100)
1189 {
1190 // New gap is starting before the on-going gap ends.
1191 report_status = true;
1192
1193 g_eventLogger->warning("Ndb 0x%x %s: Event Buffer: Ending gap epoch %u/%u (%llu) lacks event buffer memory. Overbuffering.",
1194 m_ndb->getReference(), m_ndb->getNdbObjectName(),
1195 Uint32(m_begin_gap_epoch >> 32), Uint32(m_begin_gap_epoch),
1196 m_begin_gap_epoch);
1197 g_eventLogger->warning("Check how many epochs the eventbuffer_free_percent memory can accommodate.\n");
1198 g_eventLogger->warning("Increase eventbuffer_free_percent, eventbuffer memory or both accordingly.\n");
1199 }
1200 }
1201 /**
1202 * else: transition from PARTIALLY_DISCARDING to COMPLETELY_DISCARDING
1203 * and PARTIALLY_BUFFERING to COMPLETELY_BUFFERING
1204 * will be handled in execSUB_GCP_COMPLETE()
1205 */
1206
1207 // Any new epoch received after memory becomes available will be buffered
1208 if (m_max_received_epoch < received_epoch)
1209 m_max_received_epoch = received_epoch;
1210
1211 return report_status;
1212 }
1213
1214 bool
isEventDataToBeDiscarded(Uint64 received_epoch)1215 EventBufferManager::isEventDataToBeDiscarded(Uint64 received_epoch)
1216 {
1217 DBUG_ENTER_EVENT("EventBufferManager::isEventDataToBeDiscarded");
1218 /* Discard event data received via SUB_TABLE_DATA during gap period,
1219 * m_pre_gap_epoch > 0 : gap will start at the next epoch
1220 * m_end_gap_epoch == 0 : gap has not ended
1221 * received_epoch <= m_end_gap_epoch : gap has ended at m_end_gap_epoch
1222 */
1223 if (m_pre_gap_epoch > 0 && received_epoch > m_pre_gap_epoch &&
1224 (m_end_gap_epoch == 0 || received_epoch <= m_end_gap_epoch ))
1225 {
1226 assert(isInDiscardingState());
1227 DBUG_PRINT_EVENT("info", ("Discarding SUB_TABLE_DATA for epoch %u/%u (%llu) > begin_gap epoch %u/%u (%llu)",
1228 Uint32(received_epoch >> 32),
1229 Uint32(received_epoch),
1230 received_epoch,
1231 Uint32(m_pre_gap_epoch >> 32),
1232 Uint32(m_pre_gap_epoch),
1233 m_pre_gap_epoch));
1234 if (m_end_gap_epoch > 0)
1235 {
1236 DBUG_PRINT_EVENT("info", (" and <= end_gap epoch %u/%u (%llu)"
1237 Uint32(m_end_gap_epoch >> 32),
1238 Uint32(m_end_gap_epoch),
1239 m_end_gap_epoch));
1240 }
1241 DBUG_RETURN_EVENT(true);
1242 }
1243 DBUG_RETURN_EVENT(false);
1244 }
1245
1246 bool
onEpochCompleted(Uint64 completed_epoch,bool & gap_begins)1247 EventBufferManager::onEpochCompleted(Uint64 completed_epoch, bool& gap_begins)
1248 {
1249 bool report_status = false;
1250
1251 if (isPartiallyDiscarding() && completed_epoch > m_pre_gap_epoch)
1252 {
1253 /**
1254 * No on-going gap. This should be the first completed epoch after
1255 * a transition to PARTIALLY_DISCARDING (the first completed epoch
1256 * after m_pre_gap_epoch). Mark this as the beginning of a new gap.
1257 * Transition PARTIALLY_DISCARDING -> COMPLETELY_DISCARDING:
1258 */
1259 m_begin_gap_epoch = completed_epoch;
1260 m_event_buffer_manager_state = EBM_COMPLETELY_DISCARDING;
1261 gap_begins = true;
1262 report_status = true;
1263 g_eventLogger->warning("Ndb 0x%x %s: Event Buffer: New gap begins at epoch : %u/%u (%llu)",
1264 m_ndb->getReference(), m_ndb->getNdbObjectName(),
1265 (Uint32)(m_begin_gap_epoch >> 32),
1266 (Uint32)m_begin_gap_epoch, m_begin_gap_epoch);
1267 }
1268 else if (isPartiallyBuffering() && completed_epoch > m_end_gap_epoch)
1269 {
1270 // The completed_epoch marks the first completely buffered post_gap epoch
1271 // Transition PARTIALLY_BUFFERNG -> COMPLETELY_BUFFERING
1272 g_eventLogger->warning("Ndb 0x%x %s: Event Buffer : Gap began at epoch : %u/%u (%llu) ends at epoch %u/%u (%llu)",
1273 m_ndb->getReference(),
1274 m_ndb->getNdbObjectName(),
1275 (Uint32)(m_begin_gap_epoch >> 32),
1276 (Uint32)m_begin_gap_epoch, m_begin_gap_epoch,
1277 (Uint32)(completed_epoch >> 32),
1278 (Uint32)completed_epoch, completed_epoch);
1279 m_pre_gap_epoch = 0;
1280 m_begin_gap_epoch = 0;
1281 m_end_gap_epoch = 0;
1282 m_event_buffer_manager_state = EBM_COMPLETELY_BUFFERING;
1283 report_status = true;
1284 }
1285 /**
1286 * else: transition from COMPLETELY_BUFFERING to PARTIALLY_DISCARDING
1287 * and COMPLETELY_DISCARDING to PARTIALLY_BUFFERING
1288 * are handled in insertDataL
1289 */
1290 return report_status;
1291 }
1292
1293 bool
isGcpCompleteToBeDiscarded(Uint64 completed_epoch)1294 EventBufferManager::isGcpCompleteToBeDiscarded(Uint64 completed_epoch)
1295 {
1296 DBUG_ENTER_EVENT("EventBufferManager::isGcpCompleteToBeDiscarded");
1297 /* Discard SUB_GCP_COMPLETE during gap period,
1298 * m_begin_gap_epoch > 0 : gap has started at m_begin_gap_epoch
1299 * m_end_gap_epoch == 0 : gap has not ended
1300 * received_epoch <= m_end_gap_epoch : gap has ended at m_end_gap_epoch
1301 */
1302
1303 // for m_begin_gap_epoch < completed_epoch <= m_end_gap_epoch
1304
1305 if (m_begin_gap_epoch > 0 && completed_epoch > m_begin_gap_epoch &&
1306 (m_end_gap_epoch == 0 || completed_epoch <= m_end_gap_epoch ))
1307 {
1308 assert(isInDiscardingState());
1309 DBUG_PRINT_EVENT("info", ("Discarding SUB_GCP_COMPLETE_REP for epoch %u/%u (%llu) > begin_gap epoch %u/%u (%llu)",
1310 Uint32(completed_epoch >> 32),
1311 Uint32(completed_epoch),
1312 completed_epoch,
1313 Uint32(m_begin_gap_epoch >> 32),
1314 Uint32(m_begin_gap_epoch),
1315 m_begin_gap_epoch));
1316 if (m_end_gap_epoch > 0)
1317 {
1318 DBUG_PRINT_EVENT("info", (" and <= end_gap epoch %u/%u (%llu)"
1319 Uint32(m_end_gap_epoch >> 32),
1320 Uint32(m_end_gap_epoch),
1321 m_end_gap_epoch));
1322 }
1323 DBUG_RETURN_EVENT(true);
1324 }
1325 DBUG_RETURN_EVENT(false);
1326 }
1327
1328 /*
1329 * Class NdbEventBuffer
1330 * Each Ndb object has a Object.
1331 */
NdbEventBuffer(Ndb * ndb)1332 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
1333 m_total_buckets(TOTAL_BUCKETS_INIT),
1334 m_min_gci_index(0),
1335 m_max_gci_index(0),
1336 m_ndb(ndb),
1337 m_latestGCI(0), m_latest_complete_GCI(0),
1338 m_highest_sub_gcp_complete_GCI(0),
1339 m_latest_poll_GCI(0),
1340 m_failure_detected(false),
1341 m_prevent_nodegroup_change(true),
1342 m_total_alloc(0),
1343 m_max_alloc(0),
1344 m_event_buffer_manager(ndb),
1345 m_free_thresh(0),
1346 m_min_free_thresh(0),
1347 m_max_free_thresh(0),
1348 m_gci_slip_thresh(0),
1349 m_dropped_ev_op(0),
1350 m_active_op_count(0)
1351 {
1352 #ifdef VM_TRACE
1353 m_latest_command= "NdbEventBuffer::NdbEventBuffer";
1354 m_flush_gci = 0;
1355 #endif
1356
1357 if ((p_cond = NdbCondition_Create()) == NULL) {
1358 ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
1359 exit(-1);
1360 }
1361 m_mutex = 0; // Set in Ndb::init()
1362
1363 // ToDo set event buffer size
1364 // pre allocate event data array
1365 m_sz= 0;
1366 #ifdef VM_TRACE
1367 m_free_data_count= 0;
1368 #endif
1369 m_free_data= 0;
1370 m_free_data_sz= 0;
1371
1372 // get reference to mutex managed by current connection
1373 m_add_drop_mutex=
1374 m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1375
1376 // initialize lists
1377 bzero(&g_empty_gci_container, sizeof(Gci_container));
1378 init_gci_containers();
1379
1380 m_alive_node_bit_mask.clear();
1381
1382 bzero(&m_sub_data_streams, sizeof(m_sub_data_streams));
1383 }
1384
~NdbEventBuffer()1385 NdbEventBuffer::~NdbEventBuffer()
1386 {
1387 // todo lock? what if receive thread writes here?
1388 NdbEventOperationImpl* op= m_dropped_ev_op;
1389 while ((op = m_dropped_ev_op))
1390 {
1391 m_dropped_ev_op = m_dropped_ev_op->m_next;
1392 delete op->m_facade;
1393 }
1394
1395 unsigned j;
1396 Uint32 sz= m_active_gci.size();
1397 Gci_container* array = (Gci_container*)m_active_gci.getBase();
1398 for(j = 0; j < sz; j++)
1399 {
1400 array[j].~Gci_container();
1401 }
1402
1403 // Return EventBufData lists to free list in a nice way
1404 // before actual deallocation using m_allocated_data.
1405 if(!m_complete_data.m_data.is_empty())
1406 free_list(m_complete_data.m_data);
1407 if(!m_available_data.is_empty())
1408 free_list(m_available_data);
1409 if(!m_used_data.is_empty())
1410 free_list(m_used_data);
1411 // Return event queue leftovers to free list
1412 clear_event_queue();
1413
1414
1415 for (j= 0; j < m_allocated_data.size(); j++)
1416 {
1417 unsigned sz= m_allocated_data[j]->sz;
1418 EventBufData *data= m_allocated_data[j]->data;
1419 EventBufData *end_data= data+sz;
1420 for (; data < end_data; data++)
1421 {
1422 if (data->sdata)
1423 NdbMem_Free(data->sdata);
1424 }
1425 NdbMem_Free((char*)m_allocated_data[j]);
1426 }
1427
1428 NdbCondition_Destroy(p_cond);
1429 }
1430
1431 unsigned
get_eventbuffer_free_percent()1432 NdbEventBuffer::get_eventbuffer_free_percent()
1433 {
1434 return m_event_buffer_manager.get_eventbuffer_free_percent();
1435 }
1436
1437 void
set_eventbuffer_free_percent(unsigned free)1438 NdbEventBuffer::set_eventbuffer_free_percent(unsigned free)
1439 {
1440 m_event_buffer_manager.set_eventbuffer_free_percent(free);
1441 }
1442
1443 void
add_op()1444 NdbEventBuffer::add_op()
1445 {
1446 /*
1447 * When m_active_op_count is zero, SUB_GCP_COMPLETE_REP is
1448 * ignored and no event data will reach application.
1449 * Positive values will enable event data to reach application.
1450 */
1451 m_active_op_count++;
1452 }
1453
1454 void
remove_op()1455 NdbEventBuffer::remove_op()
1456 {
1457 m_active_op_count--;
1458 if(m_active_op_count == 0)
1459 {
1460 // Return EventBufData to free list and release GCI ops before clearing.
1461 for(unsigned i = 0; i < m_active_gci.size(); i++)
1462 {
1463 Gci_container& bucket = (Gci_container&)m_active_gci[i];
1464 if (!bucket.m_data.is_empty())
1465 {
1466 free_list(bucket.m_data);
1467 }
1468 }
1469 if (!m_complete_data.m_data.is_empty())
1470 {
1471 free_list(m_complete_data.m_data);
1472 }
1473 init_gci_containers();
1474 }
1475 }
1476
1477 void
init_gci_containers()1478 NdbEventBuffer::init_gci_containers()
1479 {
1480 m_startup_hack = true;
1481 bzero(&m_complete_data, sizeof(m_complete_data));
1482 m_latest_complete_GCI = m_latestGCI = 0;
1483 m_active_gci.clear();
1484 m_active_gci.fill(3, g_empty_gci_container);
1485 m_min_gci_index = m_max_gci_index = 1;
1486 Uint64 gci = 0;
1487 m_known_gci.clear();
1488 m_known_gci.fill(7, gci);
1489 // Reset cluster failure marker
1490 m_failure_detected= false;
1491 }
1492
expand(unsigned sz)1493 int NdbEventBuffer::expand(unsigned sz)
1494 {
1495 unsigned alloc_size=
1496 sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
1497 EventBufData_chunk *chunk_data=
1498 (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1499
1500 chunk_data->sz= sz;
1501 m_allocated_data.push_back(chunk_data);
1502
1503 EventBufData *data= chunk_data->data;
1504 EventBufData *end_data= data+sz;
1505 EventBufData *last_data= m_free_data;
1506
1507 bzero((void*)data, sz*sizeof(EventBufData));
1508 for (; data < end_data; data++)
1509 {
1510 data->m_next= last_data;
1511 last_data= data;
1512 }
1513 m_free_data= last_data;
1514
1515 m_sz+= sz;
1516 #ifdef VM_TRACE
1517 m_free_data_count+= sz;
1518 #endif
1519 return 0;
1520 }
1521
1522 int
pollEvents(int aMillisecondNumber,Uint64 * highestQueuedEpoch)1523 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *highestQueuedEpoch)
1524 {
1525 if (aMillisecondNumber < 0)
1526 {
1527 g_eventLogger->error("NdbEventBuffer::pollEvents: negative aMillisecondNumber %d 0x%x %s",
1528 aMillisecondNumber,
1529 m_ndb->getReference(),
1530 m_ndb->getNdbObjectName());
1531 return -1;
1532 }
1533 int ret= 1;
1534 #ifdef VM_TRACE
1535 const char *m_latest_command_save= m_latest_command;
1536 m_latest_command= "NdbEventBuffer::pollEvents";
1537 #endif
1538
1539 NdbMutex_Lock(m_mutex);
1540 NdbEventOperationImpl *ev_op= move_data();
1541 if (unlikely(ev_op == 0 && aMillisecondNumber))
1542 {
1543 NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1544 ev_op= move_data();
1545 }
1546 m_latest_poll_GCI= m_latestGCI;
1547 #ifdef VM_TRACE
1548 if (ev_op)
1549 {
1550 // m_mutex is locked
1551 // update event ops data counters
1552 ev_op->m_data_count-= ev_op->m_data_done_count;
1553 ev_op->m_data_done_count= 0;
1554 }
1555 m_latest_command= m_latest_command_save;
1556 #endif
1557 if (unlikely(ev_op == 0))
1558 {
1559 ret= 0; // applicable for both aMillisecondNumber >= 0
1560 /*
1561 gci's consumed up until m_latest_poll_GCI, so we can free all
1562 dropped event operations stopped up until that gci
1563 */
1564 deleteUsedEventOperations(m_latest_poll_GCI);
1565 }
1566 NdbMutex_Unlock(m_mutex); // we have moved the data
1567
1568 if (highestQueuedEpoch)
1569 *highestQueuedEpoch= m_latest_poll_GCI;
1570
1571 return ret;
1572 }
1573
1574 int
flushIncompleteEvents(Uint64 gci)1575 NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
1576 {
1577 /**
1578 * Find min complete gci
1579 */
1580 Uint64 * array = m_known_gci.getBase();
1581 Uint32 mask = m_known_gci.size() - 1;
1582 Uint32 minpos = m_min_gci_index;
1583 Uint32 maxpos = m_max_gci_index;
1584
1585 g_eventLogger->info("Flushing incomplete GCI:s < %u/%u",
1586 Uint32(gci >> 32), Uint32(gci));
1587 while (minpos != maxpos && array[minpos] < gci)
1588 {
1589 Gci_container* tmp = find_bucket(array[minpos]);
1590 assert(tmp);
1591 assert(maxpos == m_max_gci_index);
1592
1593 if(!tmp->m_data.is_empty())
1594 {
1595 free_list(tmp->m_data);
1596 }
1597 tmp->~Gci_container();
1598 bzero(tmp, sizeof(Gci_container));
1599 minpos = (minpos + 1) & mask;
1600 }
1601
1602 m_min_gci_index = minpos;
1603
1604 #ifdef VM_TRACE
1605 m_flush_gci = gci;
1606 #endif
1607
1608 return 0;
1609 }
1610
1611 void
free_consumed_event_data()1612 NdbEventBuffer::free_consumed_event_data()
1613 {
1614 if (m_used_data.m_count > 1024)
1615 {
1616 #ifdef VM_TRACE
1617 m_latest_command= "NdbEventBuffer::free_consumed_event_data (lock)";
1618 #endif
1619 NdbMutex_Lock(m_mutex);
1620 // return m_used_data to m_free_data
1621 free_list(m_used_data);
1622
1623 NdbMutex_Unlock(m_mutex);
1624 }
1625 }
1626
1627 void
move_head_event_data_item_to_used_data_queue(EventBufData * data)1628 NdbEventBuffer::move_head_event_data_item_to_used_data_queue(EventBufData *data)
1629 {
1630 // Move first available item to used queue prior to processing
1631 assert(data == m_available_data.m_head);
1632 Uint32 full_count, full_sz;
1633 m_available_data.remove_first(full_count, full_sz);
1634
1635 m_used_data.append_used_data(data, full_count, full_sz);
1636
1637 m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1638 }
1639
1640 EventBufData_list::Gci_ops*
remove_consumed_gci_ops(Uint64 firstKeepGci)1641 NdbEventBuffer::remove_consumed_gci_ops(Uint64 firstKeepGci)
1642 {
1643 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1644 while (gci_ops && gci_ops->m_gci < firstKeepGci)
1645 {
1646 gci_ops = m_available_data.delete_next_gci_ops();
1647 }
1648 return gci_ops;
1649 }
1650
1651 bool
is_exceptional_epoch(EventBufData * data)1652 NdbEventBuffer::is_exceptional_epoch(EventBufData *data)
1653 {
1654 Uint32 type = SubTableData::getOperation(data->sdata->requestInfo);
1655
1656 if (type == NdbDictionary::Event::_TE_EMPTY ||
1657 type >= NdbDictionary::Event::_TE_INCONSISTENT)
1658 {
1659 if (type != NdbDictionary::Event::_TE_EMPTY)
1660 {
1661 DBUG_PRINT_EVENT("info",
1662 ("detected excep. gci %u/%u (%u) 0x%x 0x%x %s",
1663 Uint32(gci >> 32), Uint32(gci),
1664 data->sdata->gci_lo|(Uint64(data->sdata->gci_hi) << 32),
1665 type,
1666 m_ndb->getReference(), m_ndb->getNdbObjectName()));
1667 }
1668 DBUG_RETURN_EVENT(true);
1669 }
1670 DBUG_RETURN_EVENT(false);
1671 }
1672
1673 NdbEventOperation *
nextEvent2()1674 NdbEventBuffer::nextEvent2()
1675 {
1676 DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent2");
1677 #ifdef VM_TRACE
1678 const char *m_latest_command_save= m_latest_command;
1679 #endif
1680
1681 free_consumed_event_data();
1682
1683 #ifdef VM_TRACE
1684 m_latest_command= "NdbEventBuffer::nextEvent2";
1685 #endif
1686
1687 EventBufData *data;
1688 while ((data= m_available_data.m_head))
1689 {
1690 move_head_event_data_item_to_used_data_queue(data);
1691
1692 NdbEventOperationImpl *op= data->m_event_op;
1693
1694 // all including exceptional event data must have an associated impl
1695 assert(op);
1696
1697 // set NdbEventOperation data
1698 op->m_data_item= data;
1699 Uint64 gci = op->getGCI();
1700
1701 if (is_exceptional_epoch(data))
1702 {
1703 DBUG_PRINT_EVENT("info", ("detected inconsistent gci %u 0x%x %s",
1704 gci, m_ndb->getReference(),
1705 m_ndb->getNdbObjectName()));
1706
1707 // Remove gci_ops belonging to the epochs consumed already
1708 // to conform with non-exceptional event data handling
1709 remove_consumed_gci_ops(gci);
1710 DBUG_RETURN_EVENT(op->m_facade);
1711 }
1712
1713 DBUG_PRINT_EVENT("info", ("available data=%p op=%p 0x%x %s",
1714 data, op, m_ndb->getReference(),
1715 m_ndb->getNdbObjectName()));
1716
1717 /*
1718 * If merge is on, blob part sub-events must not be seen on this level.
1719 * If merge is not on, there are no blob part sub-events.
1720 */
1721 assert(op->theMainOp == NULL);
1722
1723 #ifdef VM_TRACE
1724 op->m_data_done_count++;
1725 #endif
1726
1727 if (op->m_state == NdbEventOperation::EO_EXECUTING)
1728 {
1729 int r= op->receive_event();
1730 if (r > 0)
1731 {
1732 #ifdef VM_TRACE
1733 m_latest_command= m_latest_command_save;
1734 #endif
1735 NdbBlob* tBlob = op->theBlobList;
1736 while (tBlob != NULL)
1737 {
1738 (void)tBlob->atNextEvent();
1739 tBlob = tBlob->theNext;
1740 }
1741
1742 EventBufData_list::Gci_ops *gci_ops =
1743 remove_consumed_gci_ops(gci);
1744
1745 if (gci_ops && (gci != gci_ops->m_gci))
1746 {
1747 ndbout << "nextEvent2: gci_ops->m_gci " << gci_ops->m_gci
1748 << " (" << Uint32(gci_ops->m_gci >> 32)
1749 << "/" << Uint32(gci_ops->m_gci) << ") "
1750 << " gci " << gci
1751 << " (" << Uint32(gci >> 32)
1752 << "/" << Uint32(gci) << ") type "
1753 << hex << op->getEventType2()
1754 << " data's operation " << hex
1755 << SubTableData::getOperation(data->sdata->requestInfo)
1756 << " " << m_ndb->getNdbObjectName() << endl;
1757 }
1758
1759 assert(gci_ops && (gci == gci_ops->m_gci));
1760 (void) gci_ops; // To avoid compiler warning 'unused variable'
1761
1762 // to return TE_NUL it should be made into data event
1763 if (SubTableData::getOperation(data->sdata->requestInfo) ==
1764 NdbDictionary::Event::_TE_NUL)
1765 {
1766 DBUG_PRINT_EVENT("info", ("skip _TE_NUL 0x%x %s",
1767 m_ndb->getReference(),
1768 m_ndb->getNdbObjectName()));
1769 continue;
1770 }
1771 DBUG_RETURN_EVENT(op->m_facade);
1772 }
1773 // the next event belonged to an event op that is no
1774 // longer valid, skip to next
1775 continue;
1776 }
1777 #ifdef VM_TRACE
1778 m_latest_command= m_latest_command_save;
1779 #endif
1780 }
1781 m_error.code= 0;
1782 #ifdef VM_TRACE
1783 m_latest_command= m_latest_command_save;
1784 #endif
1785
1786 /*
1787 * gci's consumed up until m_latest_poll_GCI, so
1788 * - remove remaining gci_ops from the gci_ops list,
1789 * - free all dropped event operations stopped up until that gci
1790 */
1791 remove_consumed_gci_ops(UINT_MAX64);
1792 if (m_dropped_ev_op)
1793 {
1794 NdbMutex_Lock(m_mutex);
1795 deleteUsedEventOperations(m_latest_poll_GCI);
1796 NdbMutex_Unlock(m_mutex);
1797 }
1798 DBUG_RETURN_EVENT(0);
1799 }
1800
1801 bool
isConsistent(Uint64 & gci)1802 NdbEventBuffer::isConsistent(Uint64& gci)
1803 {
1804 DBUG_ENTER("NdbEventBuffer::isConsistent");
1805 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1806 while (gci_ops)
1807 {
1808 if (gci_ops->m_error == NdbDictionary::Event::_TE_INCONSISTENT)
1809 {
1810 gci = gci_ops->m_gci;
1811 DBUG_RETURN(false);
1812 }
1813 gci_ops = gci_ops->m_next;
1814 }
1815
1816 DBUG_RETURN(true);
1817 }
1818
1819 bool
isConsistentGCI(Uint64 gci)1820 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1821 {
1822 DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
1823 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1824 while (gci_ops)
1825 {
1826 if (gci_ops->m_gci == gci &&
1827 gci_ops->m_error == NdbDictionary::Event::_TE_INCONSISTENT)
1828 DBUG_RETURN(false);
1829 gci_ops = gci_ops->m_next;
1830 }
1831
1832 DBUG_RETURN(true);
1833 }
1834
1835
1836 NdbEventOperationImpl*
getGCIEventOperations(Uint32 * iter,Uint32 * event_types)1837 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1838 {
1839 DBUG_ENTER("NdbEventBuffer::getGCIEventOperations");
1840 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1841 if (*iter < gci_ops->m_gci_op_count)
1842 {
1843 EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
1844 if (event_types != NULL)
1845 *event_types = g.event_types;
1846 DBUG_PRINT("info", ("gci: %u g.op: 0x%lx g.event_types: 0x%lx 0x%x %s",
1847 (unsigned)gci_ops->m_gci, (long) g.op,
1848 (long) g.event_types, m_ndb->getReference(),
1849 m_ndb->getNdbObjectName()));
1850 DBUG_RETURN(g.op);
1851 }
1852 DBUG_RETURN(NULL);
1853 }
1854
1855 void
deleteUsedEventOperations(Uint64 last_consumed_gci)1856 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1857 {
1858 NdbEventOperationImpl *op= m_dropped_ev_op;
1859 while (op && op->m_stop_gci)
1860 {
1861 if (last_consumed_gci > op->m_stop_gci)
1862 {
1863 while (op)
1864 {
1865 NdbEventOperationImpl *next_op= op->m_next;
1866 op->m_stop_gci= 0;
1867 op->m_ref_count--;
1868 if (op->m_ref_count == 0)
1869 {
1870 if (op->m_next)
1871 op->m_next->m_prev = op->m_prev;
1872 if (op->m_prev)
1873 op->m_prev->m_next = op->m_next;
1874 else
1875 m_dropped_ev_op = op->m_next;
1876 delete op->m_facade;
1877 }
1878 op = next_op;
1879 }
1880 break;
1881 }
1882 op = op->m_next;
1883 }
1884 }
1885
1886 #ifdef VM_TRACE
1887 static
1888 NdbOut&
operator <<(NdbOut & out,const Gci_container & gci)1889 operator<<(NdbOut& out, const Gci_container& gci)
1890 {
1891 out << "[ GCI: " << (gci.m_gci >> 32) << "/" << (gci.m_gci & 0xFFFFFFFF)
1892 << " state: " << hex << gci.m_state
1893 << " head: " << hex << gci.m_data.m_head
1894 << " tail: " << hex << gci.m_data.m_tail
1895 #ifdef VM_TRACE
1896 << " cnt: " << dec << gci.m_data.m_count
1897 #endif
1898 << " gcp: " << dec << gci.m_gcp_complete_rep_count
1899 << "]";
1900 return out;
1901 }
1902
1903 static
1904 NdbOut&
operator <<(NdbOut & out,const Gci_container_pod & gci)1905 operator<<(NdbOut& out, const Gci_container_pod& gci)
1906 {
1907 Gci_container* ptr = (Gci_container*)&gci;
1908 out << *ptr;
1909 return out;
1910 }
1911 #endif
1912
1913 void
resize_known_gci()1914 NdbEventBuffer::resize_known_gci()
1915 {
1916 Uint32 minpos = m_min_gci_index;
1917 Uint32 maxpos = m_max_gci_index;
1918 Uint32 mask = m_known_gci.size() - 1;
1919
1920 Uint64 fill = 0;
1921 Uint32 newsize = 2 * (mask + 1);
1922 m_known_gci.fill(newsize - 1, fill);
1923 Uint64 * array = m_known_gci.getBase();
1924
1925 if (0)
1926 {
1927 printf("before (%u): ", minpos);
1928 for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1929 printf("%u/%u ",
1930 Uint32(array[i] >> 32),
1931 Uint32(array[i]));
1932 printf("\n");
1933 }
1934
1935 Uint32 idx = mask + 1; // Store eveything in "new" part of buffer
1936 if (0) printf("swapping ");
1937 while (minpos != maxpos)
1938 {
1939 if (0) printf("%u-%u ", minpos, idx);
1940 Uint64 tmp = array[idx];
1941 array[idx] = array[minpos];
1942 array[minpos] = tmp;
1943
1944 idx++;
1945 minpos = (minpos + 1) & mask; // NOTE old mask
1946 }
1947 if (0) printf("\n");
1948
1949 minpos = m_min_gci_index = mask + 1;
1950 maxpos = m_max_gci_index = idx;
1951 assert(minpos < maxpos);
1952
1953 if (0)
1954 {
1955 ndbout_c("resize_known_gci from %u to %u", (mask + 1), newsize);
1956 printf("after: ");
1957 for (Uint32 i = minpos; i < maxpos; i++)
1958 {
1959 printf("%u/%u ",
1960 Uint32(array[i] >> 32),
1961 Uint32(array[i]));
1962 }
1963 printf("\n");
1964 }
1965
1966 #ifdef VM_TRACE
1967 Uint64 gci = array[minpos];
1968 for (Uint32 i = minpos + 1; i<maxpos; i++)
1969 {
1970 assert(array[i] > gci);
1971 gci = array[i];
1972 }
1973 #endif
1974 }
1975
1976 #ifdef VM_TRACE
1977 void
verify_known_gci(bool allowempty)1978 NdbEventBuffer::verify_known_gci(bool allowempty)
1979 {
1980 Uint32 minpos = m_min_gci_index;
1981 Uint32 maxpos = m_max_gci_index;
1982 Uint32 mask = m_known_gci.size() - 1;
1983
1984 Uint32 line;
1985 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1986 if (m_min_gci_index == m_max_gci_index)
1987 {
1988 MMASSERT(allowempty);
1989 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1990 MMASSERT(((Gci_container*)(m_active_gci.getBase()+i))->m_gci == 0);
1991 return;
1992 }
1993
1994 {
1995 Uint64 last = m_known_gci[minpos];
1996 MMASSERT(last > m_latestGCI);
1997 MMASSERT(find_bucket(last) != 0);
1998 MMASSERT(maxpos == m_max_gci_index);
1999
2000 minpos = (minpos + 1) & mask;
2001 while (minpos != maxpos)
2002 {
2003 MMASSERT(m_known_gci[minpos] > last);
2004 last = m_known_gci[minpos];
2005 MMASSERT(find_bucket(last) != 0);
2006 MMASSERT(maxpos == m_max_gci_index);
2007 minpos = (minpos + 1) & mask;
2008 }
2009 }
2010
2011 {
2012 Gci_container* bucktets = (Gci_container*)(m_active_gci.getBase());
2013 for (Uint32 i = 0; i<m_active_gci.size(); i++)
2014 {
2015 if (bucktets[i].m_gci)
2016 {
2017 bool found = false;
2018 for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
2019 j = (j + 1) & mask)
2020 {
2021 if (m_known_gci[j] == bucktets[i].m_gci)
2022 {
2023 found = true;
2024 break;
2025 }
2026 }
2027 if (!found)
2028 ndbout_c("%u/%u not found",
2029 Uint32(bucktets[i].m_gci >> 32),
2030 Uint32(bucktets[i].m_gci));
2031 MMASSERT(found == true);
2032 }
2033 }
2034 }
2035
2036 return;
2037 fail:
2038 ndbout_c("assertion at %d", line);
2039 printf("known gci: ");
2040 for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
2041 {
2042 printf("%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
2043 }
2044
2045 printf("\nContainers");
2046 for (Uint32 i = 0; i<m_active_gci.size(); i++)
2047 ndbout << m_active_gci[i] << endl;
2048 abort();
2049 }
2050 #endif
2051
2052 Gci_container*
find_bucket_chained(Uint64 gci)2053 NdbEventBuffer::find_bucket_chained(Uint64 gci)
2054 {
2055 if (0)
2056 printf("find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
2057 if (unlikely(gci <= m_latestGCI))
2058 {
2059 /**
2060 * an already complete GCI
2061 */
2062 if (0)
2063 ndbout_c("already complete (%u/%u)",
2064 Uint32(m_latestGCI >> 32),
2065 Uint32(m_latestGCI));
2066 return 0;
2067 }
2068
2069 if (m_event_buffer_manager.isGcpCompleteToBeDiscarded(gci))
2070 {
2071 return 0; // gci belongs to a gap
2072 }
2073
2074 if (unlikely(m_total_buckets == 0))
2075 {
2076 return 0;
2077 }
2078
2079 Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
2080 Uint32 size = m_active_gci.size();
2081 Gci_container *buckets = (Gci_container*)(m_active_gci.getBase());
2082 while (pos < size)
2083 {
2084 Uint64 cmp = (buckets + pos)->m_gci;
2085 if (cmp == gci)
2086 {
2087 if (0)
2088 ndbout_c("found pos: %u", pos);
2089 return buckets + pos;
2090 }
2091
2092 if (cmp == 0)
2093 {
2094 if (0)
2095 ndbout_c("empty(%u) ", pos);
2096 Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
2097 while (search < size)
2098 {
2099 if ((buckets + search)->m_gci == gci)
2100 {
2101 memcpy(buckets + pos, buckets + search, sizeof(Gci_container));
2102 bzero(buckets + search, sizeof(Gci_container));
2103 if (0)
2104 printf("moved from %u to %u", search, pos);
2105 if (search == size - 1)
2106 {
2107 m_active_gci.erase(search);
2108 if (0)
2109 ndbout_c(" shrink");
2110 }
2111 else
2112 {
2113 if (0)
2114 printf("\n");
2115 }
2116 return buckets + pos;
2117 }
2118 search += ACTIVE_GCI_DIRECTORY_SIZE;
2119 }
2120 goto newbucket;
2121 }
2122 pos += ACTIVE_GCI_DIRECTORY_SIZE;
2123 }
2124
2125 /**
2126 * This is a new bucket...likely close to start
2127 */
2128 if (0)
2129 ndbout_c("new (with expand) ");
2130 m_active_gci.fill(pos, g_empty_gci_container);
2131 buckets = (Gci_container*)(m_active_gci.getBase());
2132 newbucket:
2133 Gci_container* bucket = buckets + pos;
2134 bucket->m_gci = gci;
2135 bucket->m_gcp_complete_rep_count = m_total_buckets;
2136
2137 Uint32 mask = m_known_gci.size() - 1;
2138 Uint64 * array = m_known_gci.getBase();
2139
2140 Uint32 minpos = m_min_gci_index;
2141 Uint32 maxpos = m_max_gci_index;
2142 bool full = ((maxpos + 1) & mask) == minpos;
2143 if (unlikely(full))
2144 {
2145 resize_known_gci();
2146 minpos = m_min_gci_index;
2147 maxpos = m_max_gci_index;
2148 mask = m_known_gci.size() - 1;
2149 array = m_known_gci.getBase();
2150 }
2151
2152 Uint32 maxindex = (maxpos - 1) & mask;
2153 Uint32 newmaxpos = (maxpos + 1) & mask;
2154 m_max_gci_index = newmaxpos;
2155 if (likely(minpos == maxpos || gci > array[maxindex]))
2156 {
2157 array[maxpos] = gci;
2158 #ifdef VM_TRACE
2159 verify_known_gci(false);
2160 #endif
2161 return bucket;
2162 }
2163
2164 for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
2165 {
2166 if (array[pos] > gci)
2167 break;
2168 }
2169
2170 if (0)
2171 ndbout_c("insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
2172 Uint32(gci >> 32),
2173 Uint32(gci),
2174 Uint32(array[maxindex] >> 32),
2175 Uint32(array[maxindex]),
2176 pos,
2177 m_min_gci_index, m_max_gci_index);
2178
2179 assert(pos != maxpos);
2180 Uint64 oldgci;
2181 do {
2182 oldgci = array[pos];
2183 array[pos] = gci;
2184 gci = oldgci;
2185 pos = (pos + 1) & mask;
2186 } while (pos != maxpos);
2187 array[pos] = gci;
2188
2189 #ifdef VM_TRACE
2190 verify_known_gci(false);
2191 #endif
2192 return bucket;
2193 }
2194
2195 void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container * bucket,const SubGcpCompleteRep * const rep,Uint32 replen,Uint32 remcnt,Uint32 repcnt) const2196 NdbEventBuffer::crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
2197 const SubGcpCompleteRep * const rep,
2198 Uint32 replen,
2199 Uint32 remcnt,
2200 Uint32 repcnt) const
2201 {
2202 ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
2203 // SubGcpCompleteRep
2204 ndbout_c("signal length: %u", replen);
2205 ndbout_c("gci: %u/%u", rep->gci_hi, rep->gci_lo);
2206 ndbout_c("senderRef: x%x", rep->senderRef);
2207 ndbout_c("count: %u", rep->gcp_complete_rep_count);
2208 ndbout_c("flags: x%x", rep->flags);
2209 if (rep->flags & rep->ON_DISK) ndbout_c("\tON_DISK");
2210 if (rep->flags & rep->IN_MEMORY) ndbout_c("\tIN_MEMORY");
2211 if (rep->flags & rep->MISSING_DATA) ndbout_c("\tMISSING_DATA");
2212 if (rep->flags & rep->ADD_CNT) ndbout_c("\tADD_CNT %u", rep->flags>>16);
2213 if (rep->flags & rep->SUB_CNT) ndbout_c("\tSUB_CNT %u", rep->flags>>16);
2214 if (rep->flags & rep->SUB_DATA_STREAMS_IN_SIGNAL)
2215 {
2216 ndbout_c("\tSUB_DATA_STREAMS_IN_SIGNAL");
2217 // Expected signal size with two stream id per word
2218 const Uint32 explen = rep->SignalLength + (rep->gcp_complete_rep_count + 1)/2;
2219 if (replen != explen)
2220 {
2221 ndbout_c("ERROR: Signal length %d words does not match expected %d! Corrupt signal?", replen, explen);
2222 }
2223 // Protect against corrupt signal length, max signal size is 25 words
2224 if (replen > 25) replen = 25;
2225 if (replen > rep->SignalLength)
2226 {
2227 const int words = replen - rep->SignalLength;
2228 for (int i=0; i < words; i++)
2229 {
2230 ndbout_c("\t\t%04x\t%04x", Uint32(rep->sub_data_streams[i]), Uint32(rep->sub_data_streams[i]>>16));
2231 }
2232 }
2233 }
2234 ndbout_c("remaining count: %u", remcnt);
2235 ndbout_c("report count (without duplicates): %u", repcnt);
2236 // Gci_container
2237 ndbout_c("bucket gci: %u/%u", Uint32(bucket->m_gci>>32), Uint32(bucket->m_gci));
2238 ndbout_c("bucket state: x%x", bucket->m_state);
2239 if (bucket->m_state & bucket->GC_COMPLETE) ndbout_c("\tGC_COMPLETE");
2240 if (bucket->m_state & bucket->GC_INCONSISTENT) ndbout_c("\tGC_INCONSISTENT");
2241 if (bucket->m_state & bucket->GC_CHANGE_CNT) ndbout_c("\tGC_CHANGE_CNT");
2242 if (bucket->m_state & bucket->GC_OUT_OF_MEMORY) ndbout_c("\tGC_OUT_OF_MEMORY");
2243 ndbout_c("bucket remain count: %u", bucket->m_gcp_complete_rep_count);
2244 ndbout_c("total buckets: %u", m_total_buckets);
2245 ndbout_c("startup hack: %u", m_startup_hack);
2246 for (int i=0; i < MAX_SUB_DATA_STREAMS; i++)
2247 {
2248 Uint16 id = m_sub_data_streams[i];
2249 if (id == 0) continue;
2250 ndbout_c("stream: idx %u, id %04x, counted %d", i, id, bucket->m_gcp_complete_rep_sub_data_streams.get(i));
2251 }
2252 abort();
2253 }
2254
2255 void
complete_empty_bucket_using_exceptional_event(Uint64 gci,Uint32 type)2256 NdbEventBuffer::complete_empty_bucket_using_exceptional_event(Uint64 gci,
2257 Uint32 type)
2258 {
2259 EventBufData *dummy_data= alloc_data();
2260 dummy_data->m_event_op = 0;
2261
2262 /** Add gci and event type to the inconsistent epoch event data,
2263 * such that nextEvent handles it correctly and makes it visible
2264 * to the consumer, such that consumer will be able to handle it.
2265 */
2266 LinearSectionPtr ptr[3];
2267 for (int i = 0; i < 3; i++)
2268 {
2269 ptr[i].p = NULL;
2270 ptr[i].sz = 0;
2271 }
2272 alloc_mem(dummy_data, ptr, 0);
2273 // dummy_data = ((Uint32*)NdbMem_Allocate(sizeof(SubTableData) + 3) >> 2);
2274
2275 SubTableData *sdata = (SubTableData*) dummy_data->memory;
2276 assert(dummy_data->memory);
2277 sdata->tableId = ~0;
2278 sdata->requestInfo = 0;
2279 sdata->gci_hi = Uint32(gci >> 32);
2280 sdata->gci_lo = Uint32(gci);
2281 SubTableData::setOperation(sdata->requestInfo, type);
2282
2283 // Add the first EventOperationImpl such that
2284 // the exceptional data can be interpreted by consumers
2285 NdbEventOperation* op = m_ndb->getEventOperation(0);
2286 dummy_data->m_event_op = &op->m_impl;
2287 assert(op != NULL);
2288
2289 // Add gci_ops for error epoch events to make the search for
2290 // inconsistent(Uint64& gci) to be effective (backward compatibility)
2291 {
2292 EventBufData_list dummy_event_list;
2293 dummy_event_list.append_used_data(dummy_data);
2294 dummy_event_list.m_is_not_multi_list = true;
2295 m_complete_data.m_data.append_list(&dummy_event_list, gci);
2296 assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
2297 if (type >= NdbDictionary::Event::_TE_INCONSISTENT)
2298 {
2299 m_complete_data.m_data.m_gci_ops_list_tail->m_error = type;
2300 }
2301 }
2302 }
2303
2304 void
discard_events_from_bucket(Gci_container * bucket)2305 NdbEventBuffer::discard_events_from_bucket(Gci_container* bucket)
2306 {
2307 // Empty the gci_op(s) list of the epoch from the bucket
2308 DBUG_PRINT_EVENT("info", ("discard_events_from_bucket: deleting m_gci_op_list: %p",
2309 bucket->m_data.m_gci_op_list));
2310 assert (bucket->m_data.m_is_not_multi_list); // bucket contains only one epoch
2311 delete [] bucket->m_data.m_gci_op_list;
2312
2313 bucket->m_data.m_gci_op_count = 0;
2314 bucket->m_data.m_gci_op_list = 0;
2315 bucket->m_data.m_gci_op_alloc = 0;
2316
2317 // Empty the event data from the bucket and return it to m_free_data
2318 free_list(bucket->m_data);
2319 }
2320
2321 void
complete_bucket(Gci_container * bucket)2322 NdbEventBuffer::complete_bucket(Gci_container* bucket)
2323 {
2324 const Uint64 gci = bucket->m_gci;
2325 Gci_container* buckets = (Gci_container*)m_active_gci.getBase();
2326
2327 if (0)
2328 ndbout_c("complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
2329 Uint32(bucket - buckets));
2330
2331 #ifdef VM_TRACE
2332 verify_known_gci(false);
2333 #endif
2334
2335 bool bucket_empty = bucket->m_data.is_empty();
2336 Uint32 type = 0;
2337 if (bucket->m_state & Gci_container::GC_INCONSISTENT)
2338 {
2339 type = NdbDictionary::Event::_TE_INCONSISTENT;
2340 }
2341 else if (bucket->m_state & Gci_container::GC_OUT_OF_MEMORY)
2342 {
2343 type = NdbDictionary::Event::_TE_OUT_OF_MEMORY;
2344 }
2345 else if (bucket_empty)
2346 {
2347 assert(!bucket->m_data.m_is_not_multi_list);
2348 assert(bucket->m_data.first_gci_ops() == 0);
2349 type = NdbDictionary::Event::_TE_EMPTY;
2350 }
2351
2352 if(!bucket_empty)
2353 {
2354 #ifdef VM_TRACE
2355 assert(bucket->m_data.m_count);
2356 #endif
2357
2358 if (bucket->hasError())
2359 {
2360 /*
2361 * Bucket marked as possibly missing data, probably due to
2362 * kernel running out of event_buffer during node failure.
2363 * Discard the partially-received event data.
2364 */
2365 discard_events_from_bucket(bucket);
2366 bucket_empty = true;
2367 }
2368 }
2369
2370 if (bucket_empty)
2371 {
2372 assert(type > 0);
2373 complete_empty_bucket_using_exceptional_event(gci, type);
2374 }
2375 else
2376 {
2377 // bucket is complete and consistent: add it to complete_data list
2378 m_complete_data.m_data.append_list(&bucket->m_data, gci);
2379 }
2380
2381 Uint32 minpos = m_min_gci_index;
2382 Uint32 mask = m_known_gci.size() - 1;
2383 assert((mask & (mask + 1)) == 0);
2384
2385 bzero(bucket, sizeof(Gci_container));
2386
2387 m_min_gci_index = (minpos + 1) & mask;
2388
2389 #ifdef VM_TRACE
2390 verify_known_gci(true);
2391 #endif
2392 }
2393
2394 void
execSUB_START_CONF(const SubStartConf * const rep,Uint32 len)2395 NdbEventBuffer::execSUB_START_CONF(const SubStartConf * const rep,
2396 Uint32 len)
2397 {
2398 Uint32 buckets;
2399 if (len >= SubStartConf::SignalLength)
2400 {
2401 buckets = rep->bucketCount;
2402 }
2403 else
2404 {
2405 /*
2406 * Pre-7.0 kernel nodes do not return the number of buckets
2407 * Assume it's == theNoOfDBnodes as was the case in 6.3
2408 */
2409 buckets = m_ndb->theImpl->theNoOfDBnodes;
2410 }
2411
2412 set_total_buckets(buckets);
2413
2414 add_op();
2415 }
2416
2417 void
execSUB_STOP_CONF(const SubStopConf * const rep,Uint32 len)2418 NdbEventBuffer::execSUB_STOP_CONF(const SubStopConf * const rep,
2419 Uint32 len)
2420 {
2421 remove_op();
2422 }
2423
2424 void
execSUB_STOP_REF(const SubStopRef * const rep,Uint32 len)2425 NdbEventBuffer::execSUB_STOP_REF(const SubStopRef * const rep,
2426 Uint32 len)
2427 {
2428 remove_op();
2429 }
2430
2431 void
execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,Uint32 len,int complete_cluster_failure)2432 NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,
2433 Uint32 len, int complete_cluster_failure)
2434 {
2435 Uint32 gci_hi = rep->gci_hi;
2436 Uint32 gci_lo = rep->gci_lo;
2437
2438 if (unlikely(len < SubGcpCompleteRep::SignalLength))
2439 {
2440 gci_lo = 0;
2441 }
2442
2443 const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2444 if (gci > m_highest_sub_gcp_complete_GCI)
2445 m_highest_sub_gcp_complete_GCI = gci;
2446
2447 if (!complete_cluster_failure)
2448 {
2449 m_alive_node_bit_mask.set(refToNode(rep->senderRef));
2450 // Reset cluster failure marker
2451 m_failure_detected= false;
2452
2453 if (unlikely(m_active_op_count == 0))
2454 {
2455 return;
2456 }
2457 }
2458
2459 DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
2460
2461 Uint32 cnt= rep->gcp_complete_rep_count;
2462
2463 Gci_container *bucket = find_bucket(gci);
2464
2465 if (0)
2466 ndbout_c("execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
2467 Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
2468 rep->flags);
2469
2470 if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
2471 SubGcpCompleteRep::SUB_CNT)))
2472 {
2473 handle_change_nodegroup(rep);
2474 }
2475
2476 if (unlikely(bucket == 0))
2477 {
2478 if (unlikely(gci <= m_latestGCI))
2479 {
2480 /**
2481 * Already completed GCI...
2482 * Possible in case of resend during NF handling
2483 */
2484 #ifdef VM_TRACE
2485 Uint64 minGCI = m_known_gci[m_min_gci_index];
2486 ndbout_c("bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2487 Uint32(gci >> 32), Uint32(gci),
2488 Uint32(minGCI >> 32), Uint32(minGCI),
2489 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2490 ndbout << " complete: " << m_complete_data << endl;
2491 for(Uint32 i = 0; i<m_active_gci.size(); i++)
2492 {
2493 if (((Gci_container*)(&m_active_gci[i]))->m_gci)
2494 ndbout << i << " - " << m_active_gci[i] << endl;
2495 }
2496 #endif
2497 }
2498 else
2499 {
2500 DBUG_PRINT_EVENT("info", ("bucket == 0 due to an ongoing gap, completed epoch: %u/%u (%llu)",
2501 Uint32(gci >> 32), Uint32(gci), gci));
2502 }
2503 DBUG_VOID_RETURN_EVENT;
2504 }
2505
2506 if (rep->flags & SubGcpCompleteRep::SUB_DATA_STREAMS_IN_SIGNAL)
2507 {
2508 Uint32 already_counted = 0;
2509 for(Uint32 i = 0; i < cnt; i ++)
2510 {
2511 Uint16 sub_data_stream;
2512 if ((i & 1) == 0)
2513 {
2514 sub_data_stream = rep->sub_data_streams[i / 2] & 0xFFFF;
2515 }
2516 else
2517 {
2518 sub_data_stream = (rep->sub_data_streams[i / 2] >> 16);
2519 }
2520 Uint32 sub_data_stream_number = find_sub_data_stream_number(sub_data_stream);
2521 if (bucket->m_gcp_complete_rep_sub_data_streams.get(sub_data_stream_number))
2522 {
2523 // Received earlier. This must be a duplicate from the takeover node.
2524 already_counted ++;
2525 }
2526 else
2527 {
2528 bucket->m_gcp_complete_rep_sub_data_streams.set(sub_data_stream_number);
2529 }
2530 }
2531 assert(already_counted <= cnt);
2532 if (already_counted <= cnt)
2533 {
2534 cnt -= already_counted;
2535 if (cnt == 0)
2536 {
2537 // All sub data streams are already reported as completed for epoch
2538 // So data for all streams reported in this signal have been sent
2539 // twice but from two different nodes. Ignore this duplicate report.
2540 DBUG_VOID_RETURN_EVENT;
2541 }
2542 }
2543 }
2544
2545 if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
2546 {
2547 bucket->m_state = Gci_container::GC_INCONSISTENT;
2548 }
2549
2550 Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
2551 if(unlikely(old_cnt == ~(Uint32)0))
2552 {
2553 old_cnt = m_total_buckets;
2554 }
2555
2556 //assert(old_cnt >= cnt);
2557 if (unlikely(! (old_cnt >= cnt)))
2558 {
2559 crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, len, old_cnt, cnt);
2560 }
2561 bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2562
2563 if(old_cnt == cnt)
2564 {
2565 Uint64 minGCI = m_known_gci[m_min_gci_index];
2566 if(likely(minGCI == 0 || gci == minGCI))
2567 {
2568 do_complete:
2569 m_startup_hack = false;
2570 bool gapBegins = false;
2571
2572 // if there is a gap, mark the gap boundary
2573 if (m_event_buffer_manager.onEpochCompleted(gci, gapBegins))
2574 reportStatus();
2575
2576 // if a new gap begins, mark the bucket.
2577 if (gapBegins)
2578 bucket->m_state |= Gci_container::GC_OUT_OF_MEMORY;
2579
2580 complete_bucket(bucket);
2581 m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
2582 reportStatus();
2583
2584 if(unlikely(m_latest_complete_GCI > gci))
2585 {
2586 complete_outof_order_gcis();
2587 }
2588
2589 // signal that somethings happened
2590
2591 NdbCondition_Signal(p_cond);
2592 }
2593 else
2594 {
2595 if (unlikely(m_startup_hack))
2596 {
2597 flushIncompleteEvents(gci);
2598 bucket = find_bucket(gci);
2599 assert(bucket);
2600 assert(bucket->m_gci == gci);
2601 goto do_complete;
2602 }
2603 /** out of order something */
2604 g_eventLogger->info("out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2605 (int)(bucket-(Gci_container*)m_active_gci.getBase()),
2606 Uint32(gci >> 32), Uint32(gci),
2607 Uint32(minGCI >> 32), Uint32(minGCI),
2608 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2609 bucket->m_state = Gci_container::GC_COMPLETE;
2610 bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
2611 m_latest_complete_GCI = gci;
2612 }
2613 }
2614
2615 DBUG_VOID_RETURN_EVENT;
2616 }
2617
2618 void
complete_outof_order_gcis()2619 NdbEventBuffer::complete_outof_order_gcis()
2620 {
2621 #ifdef VM_TRACE
2622 verify_known_gci(false);
2623 #endif
2624
2625 Uint64 * array = m_known_gci.getBase();
2626 Uint32 mask = m_known_gci.size() - 1;
2627 Uint32 minpos = m_min_gci_index;
2628 Uint32 maxpos = m_max_gci_index;
2629 Uint64 stop_gci = m_latest_complete_GCI;
2630
2631 Uint64 start_gci = array[minpos];
2632 g_eventLogger->info("complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2633 Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2634 Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2635
2636 assert(start_gci <= stop_gci);
2637 do
2638 {
2639 start_gci = array[minpos];
2640 Gci_container* bucket = find_bucket(start_gci);
2641 assert(bucket);
2642 assert(maxpos == m_max_gci_index);
2643 if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
2644 {
2645 #ifdef VM_TRACE
2646 verify_known_gci(false);
2647 #endif
2648 return;
2649 }
2650
2651 #ifdef VM_TRACE
2652 ndbout_c("complete_outof_order_gcis - completing %u/%u rows: %u",
2653 Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2654 #else
2655 ndbout_c("complete_outof_order_gcis - completing %u/%u",
2656 Uint32(start_gci >> 32), Uint32(start_gci));
2657 #endif
2658
2659 complete_bucket(bucket);
2660 m_latestGCI = m_complete_data.m_gci = start_gci;
2661
2662 #ifdef VM_TRACE
2663 verify_known_gci(true);
2664 #endif
2665 minpos = (minpos + 1) & mask;
2666 } while (start_gci != stop_gci);
2667 }
2668
2669 void
insert_event(NdbEventOperationImpl * impl,SubTableData & data,LinearSectionPtr * ptr,Uint32 & oid_ref)2670 NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
2671 SubTableData &data,
2672 LinearSectionPtr *ptr,
2673 Uint32 &oid_ref)
2674 {
2675 DBUG_PRINT("info", ("gci{hi/lo}: %u/%u 0x%x %s",
2676 data.gci_hi, data.gci_lo, m_ndb->getReference(),
2677 m_ndb->getNdbObjectName()));
2678 do
2679 {
2680 if (impl->m_stop_gci == ~Uint64(0))
2681 {
2682 oid_ref = impl->m_oid;
2683 insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2684 }
2685 NdbEventOperationImpl* blob_op = impl->theBlobOpList;
2686 while (blob_op != NULL)
2687 {
2688 if (blob_op->m_stop_gci == ~Uint64(0))
2689 {
2690 oid_ref = blob_op->m_oid;
2691 insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2692 }
2693 blob_op = blob_op->m_next;
2694 }
2695 } while((impl = impl->m_next));
2696 }
2697
2698 bool
find_max_known_gci(Uint64 * res) const2699 NdbEventBuffer::find_max_known_gci(Uint64 * res) const
2700 {
2701 const Uint64 * array = m_known_gci.getBase();
2702 Uint32 mask = m_known_gci.size() - 1;
2703 Uint32 minpos = m_min_gci_index;
2704 Uint32 maxpos = m_max_gci_index;
2705
2706 if (minpos == maxpos)
2707 return false;
2708
2709 if (res)
2710 {
2711 * res = array[(maxpos - 1) & mask];
2712 }
2713
2714 return true;
2715 }
2716
2717 void
handle_change_nodegroup(const SubGcpCompleteRep * rep)2718 NdbEventBuffer::handle_change_nodegroup(const SubGcpCompleteRep* rep)
2719 {
2720 Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2721 Uint32 cnt = (rep->flags >> 16);
2722 Uint64 * array = m_known_gci.getBase();
2723 Uint32 mask = m_known_gci.size() - 1;
2724 Uint32 minpos = m_min_gci_index;
2725 Uint32 maxpos = m_max_gci_index;
2726
2727 if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2728 {
2729 ndbout_c("handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2730 cnt, Uint32(gci >> 32), Uint32(gci));
2731
2732 Uint32 found = 0;
2733 Uint32 pos = minpos;
2734 for (; pos != maxpos; pos = (pos + 1) & mask)
2735 {
2736 if (array[pos] == gci)
2737 {
2738 Gci_container* tmp = find_bucket(array[pos]);
2739 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2740 {
2741 found = 1;
2742 ndbout_c(" - gci %u/%u already marked complete",
2743 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2744 break;
2745 }
2746 else
2747 {
2748 found = 2;
2749 ndbout_c(" - gci %u/%u marking (and increasing)",
2750 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2751 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2752 tmp->m_gcp_complete_rep_count += cnt;
2753 break;
2754 }
2755 }
2756 else
2757 {
2758 ndbout_c(" - ignore %u/%u",
2759 Uint32(array[pos] >> 32), Uint32(array[pos]));
2760 }
2761 }
2762
2763 if (found == 0)
2764 {
2765 ndbout_c(" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2766 return;
2767 }
2768
2769 if (found == 1)
2770 {
2771 return; // Nothing todo
2772 }
2773
2774 m_total_buckets += cnt;
2775
2776 pos = (pos + 1) & mask;
2777 for (; pos != maxpos; pos = (pos + 1) & mask)
2778 {
2779 assert(array[pos] > gci);
2780 Gci_container* tmp = find_bucket(array[pos]);
2781 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2782 tmp->m_gcp_complete_rep_count += cnt;
2783 ndbout_c(" - increasing cnt on %u/%u by %u",
2784 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2785 }
2786 }
2787 else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2788 {
2789 ndbout_c("handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2790 cnt, Uint32(gci >> 32), Uint32(gci));
2791
2792 Uint32 found = 0;
2793 Uint32 pos = minpos;
2794 for (; pos != maxpos; pos = (pos + 1) & mask)
2795 {
2796 if (array[pos] == gci)
2797 {
2798 Gci_container* tmp = find_bucket(array[pos]);
2799 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2800 {
2801 found = 1;
2802 ndbout_c(" - gci %u/%u already marked complete",
2803 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2804 break;
2805 }
2806 else
2807 {
2808 found = 2;
2809 ndbout_c(" - gci %u/%u marking",
2810 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2811 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2812 break;
2813 }
2814 }
2815 else
2816 {
2817 ndbout_c(" - ignore %u/%u",
2818 Uint32(array[pos] >> 32), Uint32(array[pos]));
2819 }
2820 }
2821
2822 if (found == 0)
2823 {
2824 ndbout_c(" - NOT FOUND");
2825 return;
2826 }
2827
2828 if (found == 1)
2829 {
2830 return; // Nothing todo
2831 }
2832
2833 m_total_buckets -= cnt;
2834
2835 pos = (pos + 1) & mask;
2836 for (; pos != maxpos; pos = (pos + 1) & mask)
2837 {
2838 assert(array[pos] > gci);
2839 Gci_container* tmp = find_bucket(array[pos]);
2840 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2841 tmp->m_gcp_complete_rep_count -= cnt;
2842 ndbout_c(" - decreasing cnt on %u/%u by %u to: %u",
2843 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2844 cnt,
2845 tmp->m_gcp_complete_rep_count);
2846 }
2847 }
2848 }
2849
2850 Uint16
find_sub_data_stream_number(Uint16 sub_data_stream)2851 NdbEventBuffer::find_sub_data_stream_number(Uint16 sub_data_stream)
2852 {
2853 /*
2854 * The stream_index calculated will be the one returned unless
2855 * Suma have been changed to calculate stream identifiers in a
2856 * non compatible way. In that case a linear search in the
2857 * fixed size hash table will resolve the correct index.
2858 */
2859 const Uint16 stream_index = (sub_data_stream % 256) + MAX_SUB_DATA_STREAMS_PER_GROUP * (sub_data_stream / 256 - 1);
2860 const Uint16 num0 = stream_index % NDB_ARRAY_SIZE(m_sub_data_streams);
2861 Uint32 num = num0;
2862 while (m_sub_data_streams[num] != sub_data_stream)
2863 {
2864 if (m_sub_data_streams[num] == 0)
2865 {
2866 m_sub_data_streams[num] = sub_data_stream;
2867 break;
2868 }
2869 num = (num + 1) % NDB_ARRAY_SIZE(m_sub_data_streams);
2870 require(num != num0);
2871 }
2872 return num;
2873 }
2874
2875 void
set_total_buckets(Uint32 cnt)2876 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2877 {
2878 if (m_total_buckets == cnt)
2879 return;
2880
2881 assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2882 m_total_buckets = cnt;
2883
2884 Uint64 * array = m_known_gci.getBase();
2885 Uint32 mask = m_known_gci.size() - 1;
2886 Uint32 minpos = m_min_gci_index;
2887 Uint32 maxpos = m_max_gci_index;
2888
2889 bool found = false;
2890 Uint32 pos = minpos;
2891 for (; pos != maxpos; pos = (pos + 1) & mask)
2892 {
2893 Gci_container* tmp = find_bucket(array[pos]);
2894 if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2895 {
2896 found = true;
2897 if (0)
2898 ndbout_c("set_total_buckets(%u) complete %u/%u",
2899 cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2900 tmp->m_gcp_complete_rep_count = 0;
2901 complete_bucket(tmp);
2902 }
2903 else
2904 {
2905 assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2906 tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2907 }
2908 }
2909 if (found)
2910 {
2911 NdbCondition_Signal(p_cond);
2912 }
2913 }
2914
2915 void
report_node_failure_completed(Uint32 node_id)2916 NdbEventBuffer::report_node_failure_completed(Uint32 node_id)
2917 {
2918 assert(node_id < 32 * m_alive_node_bit_mask.Size); // only data-nodes
2919 if (! (node_id < 32 * m_alive_node_bit_mask.Size))
2920 return;
2921
2922 m_alive_node_bit_mask.clear(node_id);
2923
2924 NdbEventOperation* op= m_ndb->getEventOperation(0);
2925 if (op == 0)
2926 return;
2927
2928 DBUG_ENTER("NdbEventBuffer::report_node_failure_completed");
2929 SubTableData data;
2930 LinearSectionPtr ptr[3];
2931 bzero(&data, sizeof(data));
2932 bzero(ptr, sizeof(ptr));
2933
2934 data.tableId = ~0;
2935 data.requestInfo = 0;
2936 SubTableData::setOperation(data.requestInfo,
2937 NdbDictionary::Event::_TE_NODE_FAILURE);
2938 SubTableData::setReqNodeId(data.requestInfo, node_id);
2939 SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2940 data.flags = SubTableData::LOG;
2941
2942 Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2943 find_max_known_gci(&gci);
2944
2945 data.gci_hi = Uint32(gci >> 32);
2946 data.gci_lo = Uint32(gci);
2947
2948 /**
2949 * Insert this event for each operation
2950 */
2951 // no need to lock()/unlock(), receive thread calls this
2952 insert_event(&op->m_impl, data, ptr, data.senderData);
2953
2954 if (!m_alive_node_bit_mask.isclear())
2955 DBUG_VOID_RETURN;
2956
2957 /*
2958 * Cluster failure
2959 */
2960
2961 DBUG_PRINT("info", ("Cluster failure 0x%x %s", m_ndb->getReference(),
2962 m_ndb->getNdbObjectName()));
2963
2964 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2965 bool found = find_max_known_gci(&gci);
2966
2967 Uint64 * array = m_known_gci.getBase();
2968 Uint32 mask = m_known_gci.size() - 1;
2969 Uint32 minpos = m_min_gci_index;
2970 Uint32 maxpos = m_max_gci_index;
2971
2972 while (minpos != maxpos && array[minpos] != gci)
2973 {
2974 Gci_container* tmp = find_bucket(array[minpos]);
2975 assert(tmp);
2976 assert(maxpos == m_max_gci_index);
2977
2978 if(!tmp->m_data.is_empty())
2979 {
2980 free_list(tmp->m_data);
2981 }
2982 tmp->~Gci_container();
2983 bzero(tmp, sizeof(Gci_container));
2984
2985 minpos = (minpos + 1) & mask;
2986 }
2987 m_min_gci_index = minpos;
2988 if (found)
2989 {
2990 assert(((minpos + 1) & mask) == maxpos);
2991 }
2992 else
2993 {
2994 assert(minpos == maxpos);
2995 }
2996
2997 /**
2998 * Inject new event
2999 */
3000 data.tableId = ~0;
3001 data.requestInfo = 0;
3002 SubTableData::setOperation(data.requestInfo,
3003 NdbDictionary::Event::_TE_CLUSTER_FAILURE);
3004
3005 /**
3006 * Insert this event for each operation
3007 */
3008 // no need to lock()/unlock(), receive thread calls this
3009 insert_event(&op->m_impl, data, ptr, data.senderData);
3010
3011 /**
3012 * Mark that event buffer is containing a failure event
3013 */
3014 m_failure_detected= true;
3015
3016 #ifdef VM_TRACE
3017 m_flush_gci = 0;
3018 #endif
3019
3020 /**
3021 * And finally complete this GCI
3022 */
3023 Gci_container* tmp = find_bucket(gci);
3024 assert(tmp);
3025 if (found)
3026 {
3027 assert(m_max_gci_index == maxpos); // shouldnt have changed...
3028 }
3029 else
3030 {
3031 assert(m_max_gci_index == ((maxpos + 1) & mask));
3032 }
3033 Uint32 cnt = tmp->m_gcp_complete_rep_count;
3034
3035 SubGcpCompleteRep rep;
3036 rep.gci_hi= (Uint32)(gci >> 32);
3037 rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
3038 rep.gcp_complete_rep_count= cnt;
3039 rep.flags = 0;
3040 execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength, 1);
3041
3042 DBUG_VOID_RETURN;
3043 }
3044
3045 Uint64
getLatestGCI()3046 NdbEventBuffer::getLatestGCI()
3047 {
3048 /*
3049 * TODO: Fix data race with m_latestGCI.
3050 * m_latestGCI is changed by receiver thread, and getLatestGCI
3051 * is called from application thread.
3052 */
3053 return m_latestGCI;
3054 }
3055
3056 Uint64
getHighestQueuedEpoch()3057 NdbEventBuffer::getHighestQueuedEpoch()
3058 {
3059 return m_latest_poll_GCI;
3060 }
3061
3062 int
insertDataL(NdbEventOperationImpl * op,const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3])3063 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
3064 const SubTableData * const sdata,
3065 Uint32 len,
3066 LinearSectionPtr ptr[3])
3067 {
3068 DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
3069 const Uint32 ri = sdata->requestInfo;
3070 const Uint32 operation = SubTableData::getOperation(ri);
3071 Uint32 gci_hi = sdata->gci_hi;
3072 Uint32 gci_lo = sdata->gci_lo;
3073
3074 if (unlikely(len < SubTableData::SignalLength))
3075 {
3076 gci_lo = 0;
3077 }
3078
3079 Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
3080 const bool is_data_event =
3081 operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
3082
3083 if (!is_data_event)
3084 {
3085 if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
3086 {
3087 /*
3088 Mark event as stopping. Subsequent dropEventOperation
3089 will add the event to the dropped list for delete
3090 */
3091 op->m_stop_gci = gci;
3092 }
3093 else if (operation == NdbDictionary::Event::_TE_ACTIVE)
3094 {
3095 // internal event, do not relay to user
3096 DBUG_PRINT("info",
3097 ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u 0x%x %s",
3098 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri),
3099 m_ndb->getReference(), m_ndb->getNdbObjectName()));
3100 DBUG_RETURN_EVENT(0);
3101 }
3102 else if (operation == NdbDictionary::Event::_TE_STOP)
3103 {
3104 // internal event, do not relay to user
3105 DBUG_PRINT("info",
3106 ("_TE_STOP: m_ref_count: %u for op: %p id: %u 0x%x %s",
3107 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri),
3108 m_ndb->getReference(), m_ndb->getNdbObjectName()));
3109 DBUG_RETURN_EVENT(0);
3110 }
3111 }
3112
3113 Uint32 memory_usage = (m_max_alloc == 0) ? 0 :
3114 (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_max_alloc);
3115
3116 if (m_event_buffer_manager.onEventDataReceived(memory_usage, gci))
3117 reportStatus(true/*force_report*/);
3118
3119 if (m_event_buffer_manager.isEventDataToBeDiscarded(gci))
3120 {
3121 DBUG_RETURN_EVENT(0);
3122 }
3123
3124 if ( likely((Uint32)op->mi_type & (1U << operation)))
3125 {
3126 Gci_container* bucket= find_bucket(gci);
3127
3128 DBUG_PRINT_EVENT("info", ("data insertion in eventId %d 0x%x %s",
3129 op->m_eventId, m_ndb->getReference(),
3130 m_ndb->getNdbObjectName()));
3131 DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
3132 sdata->gci, sdata->tableId,
3133 SubTableData::getOperation(sdata->requestInfo),
3134 SubTableData::getReqNodeId(sdata->requestInfo)));
3135
3136 if (unlikely(bucket == 0))
3137 {
3138 /**
3139 * Already completed GCI...
3140 * Possible in case of resend during NF handling
3141 */
3142 DBUG_RETURN_EVENT(0);
3143 }
3144
3145 const bool is_blob_event = (op->theMainOp != NULL);
3146 const bool use_hash = op->m_mergeEvents && is_data_event;
3147
3148 if (! is_data_event && is_blob_event)
3149 {
3150 // currently subscribed to but not used
3151 DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table 0x%x %s",
3152 m_ndb->getReference(), m_ndb->getNdbObjectName()));
3153 DBUG_RETURN_EVENT(0);
3154 }
3155
3156 // find position in bucket hash table
3157 EventBufData* data = 0;
3158 EventBufData_hash::Pos hpos;
3159 if (use_hash)
3160 {
3161 bucket->m_data_hash.search(hpos, op, ptr);
3162 data = hpos.data;
3163 }
3164
3165 if (data == 0)
3166 {
3167 // allocate new result buffer
3168 data = alloc_data(); // alloc_data crashes if allocation fails.
3169
3170 m_event_buffer_manager.onBufferingEpoch(gci);
3171
3172 // Initialize m_event_op, in case copy_data fails due to insufficient memory
3173 data->m_event_op = 0;
3174 if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
3175 {
3176 crashMemAllocError("insertDataL : copy_data failed.");
3177 }
3178 data->m_event_op = op;
3179 if (! is_blob_event || ! is_data_event)
3180 {
3181 bucket->m_data.append_data(data);
3182 }
3183 else
3184 {
3185 // find or create main event for this blob event
3186 EventBufData_hash::Pos main_hpos;
3187 int ret = get_main_data(bucket, main_hpos, data);
3188 if (ret == -1)
3189 {
3190 crashMemAllocError("insertDataL : get_main_data failed.");
3191 }
3192
3193 EventBufData* main_data = main_hpos.data;
3194 if (ret != 0) // main event was created
3195 {
3196 main_data->m_event_op = op->theMainOp;
3197 bucket->m_data.append_data(main_data);
3198 if (use_hash)
3199 {
3200 main_data->m_pkhash = main_hpos.pkhash;
3201 bucket->m_data_hash.append(main_hpos, main_data);
3202 }
3203 }
3204 // link blob event under main event
3205 add_blob_data(bucket, main_data, data);
3206 }
3207 if (use_hash)
3208 {
3209 data->m_pkhash = hpos.pkhash;
3210 bucket->m_data_hash.append(hpos, data);
3211 }
3212 #ifdef VM_TRACE
3213 op->m_data_count++;
3214 #endif
3215 }
3216 else
3217 {
3218 // event with same op, PK found, merge into old buffer
3219 if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
3220 {
3221 crashMemAllocError("insertDataL : merge_data failed.");
3222 }
3223
3224 // merge is on so we do not report blob part events
3225 if (! is_blob_event) {
3226 // report actual operation and the composite
3227 // there is no way to "fix" the flags for a composite op
3228 // since the flags represent multiple ops on multiple PKs
3229 // XXX fix by doing merge at end of epoch (extra mem cost)
3230 {
3231 EventBufData_list::Gci_op g = { op, (1U << operation) };
3232 bucket->m_data.add_gci_op(g);
3233 }
3234 {
3235 EventBufData_list::Gci_op
3236 g = { op,
3237 (1U << SubTableData::getOperation(data->sdata->requestInfo))};
3238 bucket->m_data.add_gci_op(g);
3239 }
3240 }
3241 }
3242 #ifdef NDB_EVENT_VERIFY_SIZE
3243 verify_size(bucket->m_data);
3244 #endif
3245 DBUG_RETURN_EVENT(0);
3246 }
3247
3248 #ifdef VM_TRACE
3249 if ((Uint32)op->m_eventImpl->mi_type & (1U << operation))
3250 {
3251 DBUG_PRINT_EVENT("info",("Data arrived before ready eventId %d 0x%x %s",
3252 op->m_eventId, m_ndb->getReference(),
3253 m_ndb->getNdbObjectName()));
3254 DBUG_RETURN_EVENT(0);
3255 }
3256 else {
3257 DBUG_PRINT_EVENT("info",("skipped 0x%x %s", m_ndb->getReference(),
3258 m_ndb->getNdbObjectName()));
3259 DBUG_RETURN_EVENT(0);
3260 }
3261 #else
3262 DBUG_RETURN_EVENT(0);
3263 #endif
3264 }
3265
3266 void
crashMemAllocError(const char * error_text)3267 NdbEventBuffer::crashMemAllocError(const char *error_text)
3268 {
3269 g_eventLogger->error("Ndb Event Buffer 0x%x %s", m_ndb->getReference(),
3270 m_ndb->getNdbObjectName());
3271 g_eventLogger->error("Ndb Event Buffer : %s", error_text);
3272 g_eventLogger->error("Ndb Event Buffer : Fatal error.");
3273 exit(-1);
3274 }
3275
3276 // allocate EventBufData
3277 EventBufData*
alloc_data()3278 NdbEventBuffer::alloc_data()
3279 {
3280 DBUG_ENTER_EVENT("alloc_data");
3281 EventBufData* data = m_free_data;
3282
3283 if (unlikely(data == 0))
3284 {
3285 #ifdef VM_TRACE
3286 assert(m_free_data_count == 0);
3287 assert(m_free_data_sz == 0);
3288 #endif
3289 expand(4000);
3290 reportStatus();
3291
3292 data = m_free_data;
3293 if (unlikely(data == 0))
3294 {
3295 #ifdef VM_TRACE
3296 printf("m_latest_command: %s 0x%x %s\n",
3297 m_latest_command, m_ndb->getReference(), m_ndb->getNdbObjectName());
3298 printf("no free data, m_latestGCI %u/%u\n",
3299 (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
3300 printf("m_free_data_count %d\n", m_free_data_count);
3301 printf("m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
3302 m_available_data.m_count,
3303 m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
3304 m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
3305 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
3306 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
3307 printf("m_used_data_count %d\n", m_used_data.m_count);
3308 #endif
3309 crashMemAllocError("alloc_data : Allocation of meta data failed.");
3310 }
3311 }
3312
3313 // remove data from free list
3314 if (data->m_next_blob == 0)
3315 m_free_data = data->m_next;
3316 else {
3317 EventBufData* data2 = data->m_next_blob;
3318 if (data2->m_next == 0) {
3319 data->m_next_blob = data2->m_next_blob;
3320 data = data2;
3321 } else {
3322 EventBufData* data3 = data2->m_next;
3323 data2->m_next = data3->m_next;
3324 data = data3;
3325 }
3326 }
3327 data->m_next = 0;
3328 data->m_next_blob = 0;
3329 #ifdef VM_TRACE
3330 m_free_data_count--;
3331 assert(m_free_data_sz >= data->sz);
3332 #endif
3333 m_free_data_sz -= data->sz;
3334 DBUG_RETURN_EVENT(data);
3335 }
3336
3337 // allocate initial or bigger memory area in EventBufData
3338 // takes sizes from given ptr and sets up data->ptr
3339 int
alloc_mem(EventBufData * data,LinearSectionPtr ptr[3],Uint32 * change_sz)3340 NdbEventBuffer::alloc_mem(EventBufData* data,
3341 LinearSectionPtr ptr[3],
3342 Uint32 * change_sz)
3343 {
3344 DBUG_ENTER("NdbEventBuffer::alloc_mem");
3345 DBUG_PRINT("info", ("ptr sz %u + %u + %u 0x%x %s",
3346 ptr[0].sz, ptr[1].sz, ptr[2].sz, m_ndb->getReference(),
3347 m_ndb->getNdbObjectName()));
3348 const Uint32 min_alloc_size = 128;
3349
3350 Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
3351 Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
3352 if (alloc_size < min_alloc_size)
3353 alloc_size = min_alloc_size;
3354
3355 if (data->sz < alloc_size)
3356 {
3357 Uint32 add_sz = alloc_size - data->sz;
3358
3359 NdbMem_Free((char*)data->memory);
3360 assert(m_total_alloc >= data->sz);
3361 data->memory = 0;
3362
3363 data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
3364 if (data->memory == 0)
3365 {
3366 goto out_of_mem_err;
3367 }
3368 data->sz = alloc_size;
3369 m_total_alloc += add_sz;
3370
3371 if (change_sz != NULL)
3372 *change_sz += add_sz;
3373 }
3374 {
3375 Uint32* memptr = data->memory;
3376 memptr += sz4;
3377 int i;
3378 for (i = 0; i <= 2; i++)
3379 {
3380 data->ptr[i].p = memptr;
3381 data->ptr[i].sz = ptr[i].sz;
3382 memptr += ptr[i].sz;
3383 }
3384 }
3385 DBUG_RETURN(0);
3386
3387 out_of_mem_err:
3388 // Dealloc succeeded, but alloc bigger size failed
3389 crashMemAllocError("Attempt to allocate memory from OS failed");
3390 DBUG_RETURN(-1);
3391 }
3392
3393 void
dealloc_mem(EventBufData * data,Uint32 * change_sz)3394 NdbEventBuffer::dealloc_mem(EventBufData* data,
3395 Uint32 * change_sz)
3396 {
3397 NdbMem_Free((char*)data->memory);
3398 assert(m_total_alloc >= data->sz);
3399 m_total_alloc -= data->sz;
3400 if (change_sz != NULL) {
3401 assert(*change_sz >= data->sz);
3402 *change_sz -= data->sz;
3403 }
3404 data->memory = 0;
3405 data->sz = 0;
3406 }
3407
3408 int
copy_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3],EventBufData * data,Uint32 * change_sz)3409 NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
3410 LinearSectionPtr ptr[3],
3411 EventBufData* data,
3412 Uint32 * change_sz)
3413 {
3414 DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
3415
3416 if (alloc_mem(data, ptr, change_sz) != 0)
3417 DBUG_RETURN_EVENT(-1);
3418 memcpy(data->sdata, sdata, sizeof(SubTableData));
3419
3420 if (unlikely(len < SubTableData::SignalLength))
3421 {
3422 data->sdata->gci_lo = 0;
3423 }
3424 if (len < SubTableData::SignalLengthWithTransId)
3425 {
3426 /* No TransId, set to uninit value */
3427 data->sdata->transId1 = ~Uint32(0);
3428 data->sdata->transId2 = ~Uint32(0);
3429 }
3430
3431 int i;
3432 for (i = 0; i <= 2; i++)
3433 memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
3434 DBUG_RETURN_EVENT(0);
3435 }
3436
3437 static struct Ev_t {
3438 enum {
3439 enum_INS = NdbDictionary::Event::_TE_INSERT,
3440 enum_DEL = NdbDictionary::Event::_TE_DELETE,
3441 enum_UPD = NdbDictionary::Event::_TE_UPDATE,
3442 enum_NUL = NdbDictionary::Event::_TE_NUL,
3443 enum_IDM = 254, // idempotent op possibly allowed on NF
3444 enum_ERR = 255 // always impossible
3445 };
3446 int t1, t2, t3;
3447 } ev_t[] = {
3448 { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
3449 { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok
3450 { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok
3451 { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok
3452 { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
3453 { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
3454 { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
3455 { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok
3456 { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD } //ok
3457 };
3458
3459 /*
3460 * | INS | DEL | UPD
3461 * 0 | pk ah + all ah | pk ah | pk ah + new ah
3462 * 1 | pk ad + all ad | old pk ad | new pk ad + new ad
3463 * 2 | empty | old non-pk ah+ad | old ah+ad
3464 */
3465
3466 static AttributeHeader
copy_head(Uint32 & i1,Uint32 * p1,Uint32 & i2,const Uint32 * p2,Uint32 flags)3467 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
3468 Uint32 flags)
3469 {
3470 AttributeHeader ah(p2[i2]);
3471 bool do_copy = (flags & 1);
3472 if (do_copy)
3473 p1[i1] = p2[i2];
3474 i1++;
3475 i2++;
3476 return ah;
3477 }
3478
3479 static void
copy_attr(AttributeHeader ah,Uint32 & j1,Uint32 * p1,Uint32 & j2,const Uint32 * p2,Uint32 flags)3480 copy_attr(AttributeHeader ah,
3481 Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
3482 Uint32 flags)
3483 {
3484 bool do_copy = (flags & 1);
3485 bool with_head = (flags & 2);
3486 Uint32 n = with_head + ah.getDataSize();
3487 if (do_copy)
3488 {
3489 Uint32 k;
3490 for (k = 0; k < n; k++)
3491 p1[j1 + k] = p2[j2 + k];
3492 }
3493 j1 += n;
3494 j2 += n;
3495 }
3496
3497 int
merge_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr2[3],EventBufData * data,Uint32 * change_sz)3498 NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
3499 LinearSectionPtr ptr2[3],
3500 EventBufData* data,
3501 Uint32 * change_sz)
3502 {
3503 DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
3504
3505 /* TODO : Consider how/if to merge multiple events/key with different
3506 * transid
3507 * Same consideration probably applies to AnyValue!
3508 */
3509
3510 Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
3511
3512 int t1 = SubTableData::getOperation(data->sdata->requestInfo);
3513 int t2 = SubTableData::getOperation(sdata->requestInfo);
3514 if (t1 == Ev_t::enum_NUL)
3515 DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
3516
3517 Ev_t* tp = 0;
3518 int i;
3519 for (i = 0; (uint) i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
3520 if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
3521 tp = &ev_t[i];
3522 break;
3523 }
3524 }
3525 assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
3526
3527 if (tp->t3 == Ev_t::enum_IDM) {
3528 LinearSectionPtr (&ptr1)[3] = data->ptr;
3529
3530 /*
3531 * TODO
3532 * - can get data in INS ptr2[2] which is supposed to be empty
3533 * - can get extra data in DEL ptr2[2]
3534 * - why does DBUG_PRINT not work in this file ???
3535 *
3536 * replication + bug#19872 can ignore this since merge is on
3537 * only for tables with explicit PK and before data is not used
3538 */
3539 const int maxsec = 1; // ignore section 2
3540
3541 int i;
3542 for (i = 0; i <= maxsec; i++) {
3543 if (ptr1[i].sz != ptr2[i].sz ||
3544 memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
3545 DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d 0x%x %s",
3546 tp->t1, tp->t2, i, m_ndb->getReference(),
3547 m_ndb->getNdbObjectName()));
3548 assert(false);
3549 DBUG_RETURN_EVENT(-1);
3550 }
3551 }
3552 DBUG_PRINT("info", ("idempotent op %d*%d data ok 0x%x %s",
3553 tp->t1, tp->t2, m_ndb->getReference(),
3554 m_ndb->getNdbObjectName()));
3555 DBUG_RETURN_EVENT(0);
3556 }
3557
3558 // TODO: use old data items, avoid malloc/free on each merge
3559
3560 // save old data
3561 EventBufData olddata = *data;
3562 data->memory = 0;
3563 data->sz = 0;
3564
3565 // compose ptr1 o ptr2 = ptr
3566 LinearSectionPtr (&ptr1)[3] = olddata.ptr;
3567 LinearSectionPtr (&ptr)[3] = data->ptr;
3568
3569 // loop twice where first loop only sets sizes
3570 int loop;
3571 int result = 0;
3572 for (loop = 0; loop <= 1; loop++)
3573 {
3574 if (loop == 1)
3575 {
3576 if (alloc_mem(data, ptr, change_sz) != 0)
3577 {
3578 result = -1;
3579 goto end;
3580 }
3581 *data->sdata = *sdata;
3582 SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
3583 }
3584
3585 ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
3586
3587 // copy pk from new version
3588 {
3589 AttributeHeader ah;
3590 Uint32 i = 0;
3591 Uint32 j = 0;
3592 Uint32 i2 = 0;
3593 Uint32 j2 = 0;
3594 while (i < nkey)
3595 {
3596 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
3597 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3598 }
3599 ptr[0].sz = i;
3600 ptr[1].sz = j;
3601 }
3602
3603 // merge after values, new version overrides
3604 if (tp->t3 != Ev_t::enum_DEL)
3605 {
3606 AttributeHeader ah;
3607 Uint32 i = ptr[0].sz;
3608 Uint32 j = ptr[1].sz;
3609 Uint32 i1 = 0;
3610 Uint32 j1 = 0;
3611 Uint32 i2 = nkey;
3612 Uint32 j2 = ptr[1].sz;
3613 while (i1 < nkey)
3614 {
3615 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
3616 }
3617 while (1)
3618 {
3619 bool b1 = (i1 < ptr1[0].sz);
3620 bool b2 = (i2 < ptr2[0].sz);
3621 if (b1 && b2)
3622 {
3623 Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
3624 Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
3625 if (id1 < id2)
3626 b2 = false;
3627 else if (id1 > id2)
3628 b1 = false;
3629 else
3630 {
3631 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
3632 b1 = false;
3633 }
3634 }
3635 if (b1)
3636 {
3637 ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
3638 copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
3639 }
3640 else if (b2)
3641 {
3642 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
3643 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3644 }
3645 else
3646 break;
3647 }
3648 ptr[0].sz = i;
3649 ptr[1].sz = j;
3650 }
3651
3652 // merge before values, old version overrides
3653 if (tp->t3 != Ev_t::enum_INS)
3654 {
3655 AttributeHeader ah;
3656 Uint32 k = 0;
3657 Uint32 k1 = 0;
3658 Uint32 k2 = 0;
3659 while (1)
3660 {
3661 bool b1 = (k1 < ptr1[2].sz);
3662 bool b2 = (k2 < ptr2[2].sz);
3663 if (b1 && b2)
3664 {
3665 Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
3666 Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
3667 if (id1 < id2)
3668 b2 = false;
3669 else if (id1 > id2)
3670 b1 = false;
3671 else
3672 {
3673 k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
3674 b2 = false;
3675 }
3676 }
3677 if (b1)
3678 {
3679 ah = AttributeHeader(ptr1[2].p[k1]);
3680 copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3681 }
3682 else if (b2)
3683 {
3684 ah = AttributeHeader(ptr2[2].p[k2]);
3685 copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3686 }
3687 else
3688 break;
3689 }
3690 ptr[2].sz = k;
3691 }
3692 }
3693
3694 end:
3695 dealloc_mem(&olddata, change_sz);
3696 DBUG_RETURN_EVENT(result);
3697 }
3698
3699 /*
3700 * Given blob part event, find main table event on inline part. It
3701 * should exist (force in TUP) but may arrive later. If so, create
3702 * NUL event on main table. The real event replaces it later.
3703 */
3704
3705 int
get_main_data(Gci_container * bucket,EventBufData_hash::Pos & hpos,EventBufData * blob_data)3706 NdbEventBuffer::get_main_data(Gci_container* bucket,
3707 EventBufData_hash::Pos& hpos,
3708 EventBufData* blob_data)
3709 {
3710 DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
3711
3712 int blobVersion = blob_data->m_event_op->theBlobVersion;
3713 assert(blobVersion == 1 || blobVersion == 2);
3714
3715 NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
3716 assert(main_op != NULL);
3717 const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3718
3719 // create LinearSectionPtr for main table key
3720 LinearSectionPtr ptr[3];
3721
3722 Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3723 Uint32* pk_data = blob_data->ptr[1].p;
3724 Uint32 pk_size = 0;
3725
3726 if (unlikely(blobVersion == 1)) {
3727 /*
3728 * Blob PK attribute 0 is concatenated table PK null padded
3729 * to fixed maximum size. The actual size and attributes of
3730 * table PK must be discovered.
3731 */
3732 Uint32 max_size = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3733
3734 Uint32 sz = 0; // words parsed so far
3735 Uint32 n = 0;
3736 Uint32 i;
3737 for (i = 0; n < mainTable->m_noOfKeys; i++) {
3738 const NdbColumnImpl* c = mainTable->getColumn(i);
3739 assert(c != NULL);
3740 if (! c->m_pk)
3741 continue;
3742
3743 Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3744 Uint32 lb, len;
3745 require(sz < max_size);
3746 bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_data[sz],
3747 bytesize, lb, len);
3748 if (!ok)
3749 {
3750 DBUG_RETURN_EVENT(-1);
3751 }
3752
3753 AttributeHeader ah(i, lb + len);
3754 pk_ah[n] = ah.m_value;
3755 sz += ah.getDataSize();
3756 n++;
3757 }
3758 assert(n == mainTable->m_noOfKeys);
3759 require(sz <= max_size);
3760 pk_size = sz;
3761 } else {
3762 /*
3763 * Blob PK starts with separate table PKs. Total size must be
3764 * counted and blob attribute ids changed to table attribute ids.
3765 */
3766 Uint32 sz = 0; // count size
3767 Uint32 n = 0;
3768 Uint32 i;
3769 for (i = 0; n < mainTable->m_noOfKeys; i++) {
3770 const NdbColumnImpl* c = mainTable->getColumn(i);
3771 assert(c != NULL);
3772 if (! c->m_pk)
3773 continue;
3774
3775 AttributeHeader ah(blob_data->ptr[0].p[n]);
3776 ah.setAttributeId(i);
3777 pk_ah[n] = ah.m_value;
3778 sz += ah.getDataSize();
3779 n++;
3780 }
3781 assert(n == mainTable->m_noOfKeys);
3782 pk_size = sz;
3783 }
3784
3785 ptr[0].sz = mainTable->m_noOfKeys;
3786 ptr[0].p = pk_ah;
3787 ptr[1].sz = pk_size;
3788 ptr[1].p = pk_data;
3789 ptr[2].sz = 0;
3790 ptr[2].p = 0;
3791
3792 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3793 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3794
3795 // search for main event buffer
3796 bucket->m_data_hash.search(hpos, main_op, ptr);
3797 if (hpos.data != NULL)
3798 DBUG_RETURN_EVENT(0);
3799
3800 // not found, create a place-holder
3801 EventBufData* main_data = alloc_data();
3802 if (main_data == NULL)
3803 DBUG_RETURN_EVENT(-1);
3804 SubTableData sdata = *blob_data->sdata;
3805 sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3806 SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3807 if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3808 DBUG_RETURN_EVENT(-1);
3809 hpos.data = main_data;
3810
3811 DBUG_RETURN_EVENT(1);
3812 }
3813
3814 void
add_blob_data(Gci_container * bucket,EventBufData * main_data,EventBufData * blob_data)3815 NdbEventBuffer::add_blob_data(Gci_container* bucket,
3816 EventBufData* main_data,
3817 EventBufData* blob_data)
3818 {
3819 DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
3820 DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p 0x%x %s",
3821 main_data, blob_data, m_ndb->getReference(),
3822 m_ndb->getNdbObjectName()));
3823 EventBufData* head;
3824 head = main_data->m_next_blob;
3825 while (head != NULL)
3826 {
3827 if (head->m_event_op == blob_data->m_event_op)
3828 break;
3829 head = head->m_next_blob;
3830 }
3831 if (head == NULL)
3832 {
3833 head = blob_data;
3834 head->m_next_blob = main_data->m_next_blob;
3835 main_data->m_next_blob = head;
3836 }
3837 else
3838 {
3839 blob_data->m_next = head->m_next;
3840 head->m_next = blob_data;
3841 }
3842 // adjust data list size
3843 bucket->m_data.m_count += 1;
3844 bucket->m_data.m_sz += blob_data->sz;
3845 DBUG_VOID_RETURN_EVENT;
3846 }
3847
3848 NdbEventOperationImpl *
move_data()3849 NdbEventBuffer::move_data()
3850 {
3851 // handle received data
3852 if (!m_complete_data.m_data.is_empty())
3853 {
3854 // move this list to last in m_available_data
3855 m_available_data.append_list(&m_complete_data.m_data, 0);
3856
3857 bzero(&m_complete_data, sizeof(m_complete_data));
3858 }
3859
3860 // handle used data
3861 if (!m_used_data.is_empty())
3862 {
3863 // return m_used_data to m_free_data
3864 free_list(m_used_data);
3865 }
3866 if (!m_available_data.is_empty())
3867 {
3868 DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
3869 #ifdef VM_TRACE
3870 DBUG_PRINT_EVENT("exit",("m_available_data_count %u 0x%x %s",
3871 m_available_data.m_count,
3872 m_ndb->getReference(), m_ndb->getNdbObjectName()));
3873 #endif
3874 DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3875 }
3876 return 0;
3877 }
3878
3879 void
free_list(EventBufData_list & list)3880 NdbEventBuffer::free_list(EventBufData_list &list)
3881 {
3882 #ifdef NDB_EVENT_VERIFY_SIZE
3883 verify_size(list);
3884 #endif
3885 // return list to m_free_data
3886 list.m_tail->m_next= m_free_data;
3887 m_free_data= list.m_head;
3888 #ifdef VM_TRACE
3889 m_free_data_count+= list.m_count;
3890 #endif
3891 m_free_data_sz+= list.m_sz;
3892
3893 list.m_head = list.m_tail = NULL;
3894 list.m_count = list.m_sz = 0;
3895
3896 list.~EventBufData_list(); // free gci ops
3897 }
3898
append_list(EventBufData_list * list,Uint64 gci)3899 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
3900 {
3901 #ifdef NDB_EVENT_VERIFY_SIZE
3902 NdbEventBuffer::verify_size(*list);
3903 #endif
3904 move_gci_ops(list, gci);
3905
3906 if (m_tail)
3907 m_tail->m_next= list->m_head;
3908 else
3909 m_head= list->m_head;
3910 m_tail= list->m_tail;
3911 m_count+= list->m_count;
3912 m_sz+= list->m_sz;
3913
3914 list->m_head = list->m_tail = NULL;
3915 list->m_count = list->m_sz = 0;
3916 }
3917
3918 void
add_gci_op(Gci_op g)3919 EventBufData_list::add_gci_op(Gci_op g)
3920 {
3921 DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
3922 DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types));
3923 assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
3924 Uint32 i;
3925 for (i = 0; i < m_gci_op_count; i++) {
3926 if (m_gci_op_list[i].op == g.op)
3927 break;
3928 }
3929 if (i < m_gci_op_count) {
3930 m_gci_op_list[i].event_types |= g.event_types;
3931 } else {
3932 if (m_gci_op_count == m_gci_op_alloc) {
3933 Uint32 n = 1 + 2 * m_gci_op_alloc;
3934 Gci_op* old_list = m_gci_op_list;
3935 m_gci_op_list = new Gci_op [n];
3936 if (m_gci_op_alloc != 0) {
3937 Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
3938 memcpy(m_gci_op_list, old_list, bytes);
3939 DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p",
3940 this, old_list));
3941 delete [] old_list;
3942 }
3943 else
3944 assert(old_list == 0);
3945 DBUG_PRINT_EVENT("info", ("this: %p new m_gci_op_list: %p",
3946 this, m_gci_op_list));
3947 m_gci_op_alloc = n;
3948 }
3949 assert(m_gci_op_count < m_gci_op_alloc);
3950 #ifndef NDEBUG
3951 i = m_gci_op_count;
3952 #endif
3953 m_gci_op_list[m_gci_op_count++] = g;
3954 }
3955 DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3956 DBUG_VOID_RETURN_EVENT;
3957 }
3958
3959 void
move_gci_ops(EventBufData_list * list,Uint64 gci)3960 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
3961 {
3962 DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops");
3963 DBUG_PRINT_EVENT("info", ("this: %p list: %p gci: %u/%u",
3964 this, list, (Uint32)(gci >> 32), (Uint32)gci));
3965 assert(!m_is_not_multi_list);
3966 if (!list->m_is_not_multi_list)
3967 {
3968 assert(gci == 0);
3969 if (m_gci_ops_list_tail)
3970 m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3971 else
3972 {
3973 m_gci_ops_list = list->m_gci_ops_list;
3974 }
3975 m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3976 goto end;
3977 }
3978 {
3979 Gci_ops *new_gci_ops = new Gci_ops;
3980 DBUG_PRINT_EVENT("info", ("this: %p m_gci_op_list: %p",
3981 new_gci_ops, list->m_gci_op_list));
3982 if (m_gci_ops_list_tail)
3983 m_gci_ops_list_tail->m_next = new_gci_ops;
3984 else
3985 {
3986 assert(m_gci_ops_list == 0);
3987 m_gci_ops_list = new_gci_ops;
3988 }
3989 m_gci_ops_list_tail = new_gci_ops;
3990
3991 new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3992 new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3993 new_gci_ops->m_gci = gci;
3994 new_gci_ops->m_next = 0;
3995 }
3996 end:
3997 list->m_gci_op_list = 0;
3998 list->m_gci_ops_list_tail = 0;
3999 list->m_gci_op_alloc = 0;
4000 DBUG_VOID_RETURN_EVENT;
4001 }
4002
4003 void
clear_event_queue()4004 NdbEventBuffer::clear_event_queue()
4005 {
4006 if(!m_available_data.is_empty())
4007 {
4008 free_list(m_available_data);
4009 }
4010 else
4011 {
4012 // no event data found, remove any lingering gci_ops
4013 // belonging to consumed epochs
4014 m_available_data.~EventBufData_list();
4015 }
4016 }
4017
4018 NdbEventOperation*
createEventOperation(const char * eventName,NdbError & theError)4019 NdbEventBuffer::createEventOperation(const char* eventName,
4020 NdbError &theError)
4021 {
4022 DBUG_ENTER("NdbEventBuffer::createEventOperation");
4023
4024 if (m_ndb->theImpl->m_ev_op == NULL)
4025 {
4026 clear_event_queue();
4027 }
4028
4029 NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
4030 if (tOp == 0)
4031 {
4032 theError.code= 4000;
4033 DBUG_RETURN(NULL);
4034 }
4035 if (tOp->getState() != NdbEventOperation::EO_CREATED) {
4036 theError.code= tOp->getNdbError().code;
4037 delete tOp;
4038 DBUG_RETURN(NULL);
4039 }
4040 // add user reference
4041 // removed in dropEventOperation
4042 getEventOperationImpl(tOp)->m_ref_count = 1;
4043 DBUG_PRINT("info", ("m_ref_count: %u for op: %p 0x%x %s",
4044 getEventOperationImpl(tOp)->m_ref_count,
4045 getEventOperationImpl(tOp), m_ndb->getReference(),
4046 m_ndb->getNdbObjectName()));
4047 DBUG_RETURN(tOp);
4048 }
4049
4050 NdbEventOperationImpl*
createEventOperationImpl(NdbEventImpl & evnt,NdbError & theError)4051 NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
4052 NdbError &theError)
4053 {
4054 DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
4055 NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
4056 if (tOp == 0)
4057 {
4058 theError.code= 4000;
4059 DBUG_RETURN(NULL);
4060 }
4061 if (tOp->getState() != NdbEventOperation::EO_CREATED) {
4062 theError.code= tOp->getNdbError().code;
4063 delete tOp;
4064 DBUG_RETURN(NULL);
4065 }
4066 DBUG_RETURN(tOp);
4067 }
4068
4069 void
dropEventOperation(NdbEventOperation * tOp)4070 NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
4071 {
4072 DBUG_ENTER("NdbEventBuffer::dropEventOperation");
4073 NdbEventOperationImpl* op= getEventOperationImpl(tOp);
4074
4075 op->stop();
4076 // stop blob event ops
4077 if (op->theMainOp == NULL)
4078 {
4079 Uint64 max_stop_gci = op->m_stop_gci;
4080 NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
4081 while (tBlobOp != NULL)
4082 {
4083 tBlobOp->stop();
4084 Uint64 stop_gci = tBlobOp->m_stop_gci;
4085 if (stop_gci > max_stop_gci)
4086 max_stop_gci = stop_gci;
4087 tBlobOp = tBlobOp->m_next;
4088 }
4089 tBlobOp = op->theBlobOpList;
4090 while (tBlobOp != NULL)
4091 {
4092 tBlobOp->m_stop_gci = max_stop_gci;
4093 tBlobOp = tBlobOp->m_next;
4094 }
4095 op->m_stop_gci = max_stop_gci;
4096 }
4097
4098 /**
4099 * Needs mutex lock as report_node_XXX accesses list...
4100 */
4101 NdbMutex_Lock(m_mutex);
4102
4103 // release blob handles now, further access is user error
4104 if (op->theMainOp == NULL)
4105 {
4106 while (op->theBlobList != NULL)
4107 {
4108 NdbBlob* tBlob = op->theBlobList;
4109 op->theBlobList = tBlob->theNext;
4110 m_ndb->releaseNdbBlob(tBlob);
4111 }
4112 }
4113
4114 if (op->m_next)
4115 op->m_next->m_prev= op->m_prev;
4116 if (op->m_prev)
4117 op->m_prev->m_next= op->m_next;
4118 else
4119 m_ndb->theImpl->m_ev_op= op->m_next;
4120
4121 assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
4122
4123 assert(op->m_ref_count > 0);
4124 // remove user reference
4125 // added in createEventOperation
4126 // user error to use reference after this
4127 op->m_ref_count--;
4128 DBUG_PRINT("info", ("m_ref_count: %u for op: %p 0x%x %s",
4129 op->m_ref_count, op, m_ndb->getReference(),
4130 m_ndb->getNdbObjectName()));
4131 if (op->m_ref_count == 0)
4132 {
4133 NdbMutex_Unlock(m_mutex);
4134 DBUG_PRINT("info", ("deleting op: %p 0x%x %s",
4135 op, m_ndb->getReference(), m_ndb->getNdbObjectName()));
4136 delete op->m_facade;
4137 }
4138 else
4139 {
4140 op->m_next= m_dropped_ev_op;
4141 op->m_prev= 0;
4142 if (m_dropped_ev_op)
4143 m_dropped_ev_op->m_prev= op;
4144 m_dropped_ev_op= op;
4145
4146 NdbMutex_Unlock(m_mutex);
4147 }
4148 DBUG_VOID_RETURN;
4149 }
4150
4151 void
reportStatus(bool force_report)4152 NdbEventBuffer::reportStatus(bool force_report)
4153 {
4154 EventBufData *apply_buf= m_available_data.m_head;
4155 Uint64 apply_gci, latest_gci= m_latestGCI;
4156 if (apply_buf == 0)
4157 apply_buf= m_complete_data.m_data.m_head;
4158 if (apply_buf && apply_buf->sdata)
4159 {
4160 Uint32 gci_hi = apply_buf->sdata->gci_hi;
4161 Uint32 gci_lo = apply_buf->sdata->gci_lo;
4162 apply_gci= gci_lo | (Uint64(gci_hi) << 32);
4163 }
4164 else
4165 apply_gci= latest_gci;
4166
4167 if (force_report)
4168 goto send_report;
4169
4170 if (m_free_thresh)
4171 {
4172 if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
4173 m_total_alloc > 1024*1024)
4174 {
4175 /* report less free buffer than m_free_thresh,
4176 next report when more free than 2 * m_free_thresh
4177 */
4178 m_min_free_thresh= 0;
4179 m_max_free_thresh= 2 * m_free_thresh;
4180 goto send_report;
4181 }
4182
4183 if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
4184 m_total_alloc > 1024*1024)
4185 {
4186 /* report more free than 2 * m_free_thresh
4187 next report when less free than m_free_thresh
4188 */
4189 m_min_free_thresh= m_free_thresh;
4190 m_max_free_thresh= 100;
4191 goto send_report;
4192 }
4193 }
4194 if (m_gci_slip_thresh &&
4195 (latest_gci-apply_gci >= m_gci_slip_thresh))
4196 {
4197 goto send_report;
4198 }
4199 return;
4200
4201 send_report:
4202 Uint32 data[8];
4203 data[0]= NDB_LE_EventBufferStatus;
4204 data[1]= m_total_alloc-m_free_data_sz;
4205 data[2]= m_total_alloc;
4206 data[3]= m_max_alloc;
4207 data[4]= (Uint32)(apply_gci);
4208 data[5]= (Uint32)(apply_gci >> 32);
4209 data[6]= (Uint32)(latest_gci);
4210 data[7]= (Uint32)(latest_gci >> 32);
4211 Ndb_internal::send_event_report(true, m_ndb, data,8);
4212 #ifdef VM_TRACE
4213 assert(m_total_alloc >= m_free_data_sz);
4214 #endif
4215 }
4216
4217 void
get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage & usage)4218 NdbEventBuffer::get_event_buffer_memory_usage(Ndb::EventBufferMemoryUsage& usage)
4219 {
4220 usage.allocated_bytes = m_total_alloc;
4221 usage.used_bytes = m_total_alloc - m_free_data_sz;
4222
4223 // If there's no configured max limit then
4224 // the percentage is a fraction of the total allocated.
4225
4226 Uint32 ret = 0;
4227 // m_max_alloc == 0 ==> unlimited usage,
4228 // m_total_alloc >= m_free_data_sz always.
4229 if (m_max_alloc > 0)
4230 ret = (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_max_alloc);
4231 else if (m_total_alloc > 0)
4232 ret = (Uint32)(100 * (m_total_alloc - m_free_data_sz) / m_total_alloc);
4233
4234 usage.usage_percent = ret;
4235 }
4236
4237 #ifdef VM_TRACE
4238 void
verify_size(const EventBufData * data,Uint32 count,Uint32 sz)4239 NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
4240 {
4241 #if 0
4242 Uint32 tmp_count = 0;
4243 Uint32 tmp_sz = 0;
4244 while (data != 0) {
4245 Uint32 full_count, full_sz;
4246 data->get_full_size(full_count, full_sz);
4247 tmp_count += full_count;
4248 tmp_sz += full_sz;
4249 data = data->m_next;
4250 }
4251 assert(tmp_count == count);
4252 assert(tmp_sz == sz);
4253 #endif
4254 }
4255 void
verify_size(const EventBufData_list & list)4256 NdbEventBuffer::verify_size(const EventBufData_list & list)
4257 {
4258 #if 0
4259 verify_size(list.m_head, list.m_count, list.m_sz);
4260 #endif
4261 }
4262 #endif
4263
4264 // hash table routines
4265
4266 // could optimize the all-fixed case
4267 Uint32
getpkhash(NdbEventOperationImpl * op,LinearSectionPtr ptr[3])4268 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
4269 {
4270 DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
4271 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
4272 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
4273
4274 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
4275
4276 // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
4277 // for pk update (to equivalent pk) post/pre values give same hash
4278 Uint32 nkey = tab->m_noOfKeys;
4279 assert(nkey != 0 && nkey <= ptr[0].sz);
4280 const Uint32* hptr = ptr[0].p;
4281 const uchar* dptr = (uchar*)ptr[1].p;
4282
4283 // hash registers
4284 ulong nr1 = 0;
4285 ulong nr2 = 0;
4286 while (nkey-- != 0)
4287 {
4288 AttributeHeader ah(*hptr++);
4289 Uint32 bytesize = ah.getByteSize();
4290 assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
4291
4292 Uint32 i = ah.getAttributeId();
4293 const NdbColumnImpl* col = tab->getColumn(i);
4294 require(col != 0);
4295
4296 Uint32 lb, len;
4297 bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
4298 require(ok);
4299
4300 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
4301 (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
4302 dptr += ((bytesize + 3) / 4) * 4;
4303 }
4304 DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
4305 DBUG_RETURN_EVENT(nr1);
4306 }
4307
4308 bool
getpkequal(NdbEventOperationImpl * op,LinearSectionPtr ptr1[3],LinearSectionPtr ptr2[3])4309 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
4310 {
4311 DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
4312 DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
4313 DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
4314 DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
4315 DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
4316
4317 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
4318
4319 Uint32 nkey = tab->m_noOfKeys;
4320 assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
4321 const Uint32* hptr1 = ptr1[0].p;
4322 const Uint32* hptr2 = ptr2[0].p;
4323 const uchar* dptr1 = (uchar*)ptr1[1].p;
4324 const uchar* dptr2 = (uchar*)ptr2[1].p;
4325
4326 bool equal = true;
4327
4328 while (nkey-- != 0)
4329 {
4330 AttributeHeader ah1(*hptr1++);
4331 AttributeHeader ah2(*hptr2++);
4332 // sizes can differ on update of varchar endspace
4333 Uint32 bytesize1 = ah1.getByteSize();
4334 Uint32 bytesize2 = ah2.getByteSize();
4335 assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
4336 assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
4337
4338 assert(ah1.getAttributeId() == ah2.getAttributeId());
4339 Uint32 i = ah1.getAttributeId();
4340 const NdbColumnImpl* col = tab->getColumn(i);
4341 assert(col != 0);
4342
4343 Uint32 lb1, len1;
4344 bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
4345 Uint32 lb2, len2;
4346 bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
4347 require(ok1 && ok2 && lb1 == lb2);
4348
4349 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
4350 int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
4351 if (res != 0)
4352 {
4353 equal = false;
4354 break;
4355 }
4356 dptr1 += ((bytesize1 + 3) / 4) * 4;
4357 dptr2 += ((bytesize2 + 3) / 4) * 4;
4358 }
4359
4360 DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
4361 DBUG_RETURN_EVENT(equal);
4362 }
4363
4364 void
search(Pos & hpos,NdbEventOperationImpl * op,LinearSectionPtr ptr[3])4365 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
4366 {
4367 DBUG_ENTER_EVENT("EventBufData_hash::search");
4368 Uint32 pkhash = getpkhash(op, ptr);
4369 Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
4370 EventBufData* data = m_hash[index];
4371 while (data != 0)
4372 {
4373 if (data->m_event_op == op &&
4374 data->m_pkhash == pkhash &&
4375 getpkequal(op, data->ptr, ptr))
4376 break;
4377 data = data->m_next_hash;
4378 }
4379 hpos.index = index;
4380 hpos.data = data;
4381 hpos.pkhash = pkhash;
4382 DBUG_PRINT_EVENT("info", ("search result=%p", data));
4383 DBUG_VOID_RETURN_EVENT;
4384 }
4385
4386 template class Vector<Gci_container_pod>;
4387 template class Vector<NdbEventBuffer::EventBufData_chunk*>;
4388