1 /*
2 Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27 #include <kernel_types.h>
28
29 #include "API.hpp"
30 #include <NdbOut.hpp>
31
32 #include <signaldata/CreateEvnt.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <SimpleProperties.hpp>
35 #include <Bitmask.hpp>
36 #include <AttributeHeader.hpp>
37 #include <AttributeList.hpp>
38 #include <NdbError.hpp>
39 #include <BaseString.hpp>
40 #include <UtilBuffer.hpp>
41 #include <portlib/NdbMem.h>
42 #include <signaldata/AlterTable.hpp>
43 #include "ndb_internal.hpp"
44
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47
48 #define TOTAL_BUCKETS_INIT (1 << 15)
49 static Gci_container_pod g_empty_gci_container;
50
51 #if defined(VM_TRACE) && defined(NOT_USED)
52 static void
print_std(const SubTableData * sdata,LinearSectionPtr ptr[3])53 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
54 {
55 printf("addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (void*)sdata,
56 sdata->gci_hi, sdata->gci_lo,
57 SubTableData::getOperation(sdata->requestInfo));
58 for (int i = 0; i <= 2; i++) {
59 printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
60 for (int j = 0; (uint) j < ptr[i].sz; j++)
61 printf("%08x ", ptr[i].p[j]);
62 printf("\n");
63 }
64 }
65 #endif
66
67 // EventBufData
68
69 void
add_part_size(Uint32 & full_count,Uint32 & full_sz) const70 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
71 {
72 Uint32 tmp_count = 0;
73 Uint32 tmp_sz = 0;
74 const EventBufData* data2 = m_next_blob;
75 while (data2 != 0) {
76 tmp_count++;
77 tmp_sz += data2->sz;
78 const EventBufData* data3 = data2->m_next;
79 while (data3 != 0) {
80 tmp_count++;
81 tmp_sz += data3->sz;
82 data3 = data3->m_next;
83 }
84 data2 = data2->m_next_blob;
85 }
86 full_count += tmp_count;
87 full_sz += tmp_sz;
88 }
89
90 /*
91 * Class NdbEventOperationImpl
92 *
93 *
94 */
95
96 // todo handle several ndb objects
97 // todo free allocated data when closing NdbEventBuffer
98
NdbEventOperationImpl(NdbEventOperation & f,Ndb * theNdb,const char * eventName)99 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f,
100 Ndb *theNdb,
101 const char* eventName) :
102 NdbEventOperation(*this),
103 m_facade(&f),
104 m_ndb(theNdb),
105 m_state(EO_ERROR),
106 m_oid(~(Uint32)0)
107 {
108 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
109
110 assert(m_ndb != NULL);
111 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
112 assert(myDict != NULL);
113
114 const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
115 if (!myEvnt)
116 {
117 m_error.code= myDict->getNdbError().code;
118 DBUG_VOID_RETURN;
119 }
120
121 init(myEvnt->m_impl);
122 DBUG_VOID_RETURN;
123 }
124
NdbEventOperationImpl(Ndb * theNdb,NdbEventImpl & evnt)125 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb,
126 NdbEventImpl& evnt) :
127 NdbEventOperation(*this),
128 m_facade(this),
129 m_ndb(theNdb),
130 m_state(EO_ERROR),
131 m_oid(~(Uint32)0)
132 {
133 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
134 init(evnt);
135 DBUG_VOID_RETURN;
136 }
137
138 void
init(NdbEventImpl & evnt)139 NdbEventOperationImpl::init(NdbEventImpl& evnt)
140 {
141 DBUG_ENTER("NdbEventOperationImpl::init");
142
143 m_magic_number = 0;
144 mi_type = 0;
145 m_change_mask = 0;
146 #ifdef VM_TRACE
147 m_data_done_count = 0;
148 m_data_count = 0;
149 #endif
150 m_next = 0;
151 m_prev = 0;
152
153 m_eventId = 0;
154 theFirstPkAttrs[0] = NULL;
155 theCurrentPkAttrs[0] = NULL;
156 theFirstPkAttrs[1] = NULL;
157 theCurrentPkAttrs[1] = NULL;
158 theFirstDataAttrs[0] = NULL;
159 theCurrentDataAttrs[0] = NULL;
160 theFirstDataAttrs[1] = NULL;
161 theCurrentDataAttrs[1] = NULL;
162
163 theBlobList = NULL;
164 theBlobOpList = NULL;
165 theMainOp = NULL;
166 theBlobVersion = 0;
167
168 m_data_item= NULL;
169 m_eventImpl = NULL;
170
171 m_custom_data= 0;
172 m_has_error= 1;
173
174 // we should lookup id in Dictionary, TODO
175 // also make sure we only have one listener on each event
176
177 m_eventImpl = &evnt;
178
179 m_eventId = m_eventImpl->m_eventId;
180
181 m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
182
183 m_state= EO_CREATED;
184
185 m_stop_gci = 0;
186 #ifdef ndb_event_stores_merge_events_flag
187 m_mergeEvents = m_eventImpl->m_mergeEvents;
188 #else
189 m_mergeEvents = false;
190 #endif
191 m_ref_count = 0;
192 DBUG_PRINT("info", ("m_ref_count = 0 for op: 0x%lx", (long) this));
193
194 m_has_error= 0;
195
196 DBUG_PRINT("exit",("this: 0x%lx oid: %u", (long) this, m_oid));
197 DBUG_VOID_RETURN;
198 }
199
~NdbEventOperationImpl()200 NdbEventOperationImpl::~NdbEventOperationImpl()
201 {
202 DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
203 m_magic_number= 0;
204
205 if (m_oid == ~(Uint32)0)
206 DBUG_VOID_RETURN;
207
208 stop();
209
210 if (theMainOp == NULL)
211 {
212 NdbEventOperationImpl* tBlobOp = theBlobOpList;
213 while (tBlobOp != NULL)
214 {
215 NdbEventOperationImpl *op = tBlobOp;
216 tBlobOp = tBlobOp->m_next;
217 delete op;
218 }
219 }
220
221 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
222 DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p",
223 this, m_facade, m_oid, theMainOp));
224
225 if (m_eventImpl)
226 {
227 delete m_eventImpl->m_facade;
228 m_eventImpl= 0;
229 }
230
231 DBUG_VOID_RETURN;
232 }
233
234 NdbEventOperation::State
getState()235 NdbEventOperationImpl::getState()
236 {
237 return m_state;
238 }
239
240 NdbRecAttr*
getValue(const char * colName,char * aValue,int n)241 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
242 {
243 DBUG_ENTER("NdbEventOperationImpl::getValue");
244 if (m_state != EO_CREATED) {
245 ndbout_c("NdbEventOperationImpl::getValue may only be called between "
246 "instantiation and execute()");
247 DBUG_RETURN(NULL);
248 }
249
250 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
251
252 if (tAttrInfo == NULL) {
253 ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
254 DBUG_RETURN(NULL);
255 }
256
257 DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
258 }
259
260 NdbRecAttr*
getValue(const NdbColumnImpl * tAttrInfo,char * aValue,int n)261 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
262 {
263 DBUG_ENTER("NdbEventOperationImpl::getValue");
264 // Insert Attribute Id into ATTRINFO part.
265
266 NdbRecAttr **theFirstAttr;
267 NdbRecAttr **theCurrentAttr;
268
269 if (tAttrInfo->getPrimaryKey())
270 {
271 theFirstAttr = &theFirstPkAttrs[n];
272 theCurrentAttr = &theCurrentPkAttrs[n];
273 }
274 else
275 {
276 theFirstAttr = &theFirstDataAttrs[n];
277 theCurrentAttr = &theCurrentDataAttrs[n];
278 }
279
280 /************************************************************************
281 * Get a Receive Attribute object and link it into the operation object.
282 ************************************************************************/
283 NdbRecAttr *tAttr = m_ndb->getRecAttr();
284 if (tAttr == NULL) {
285 exit(-1);
286 //setErrorCodeAbort(4000);
287 DBUG_RETURN(NULL);
288 }
289
290 /**********************************************************************
291 * Now set the attribute identity and the pointer to the data in
292 * the RecAttr object
293 * Also set attribute size, array size and attribute type
294 ********************************************************************/
295 if (tAttr->setup(tAttrInfo, aValue)) {
296 //setErrorCodeAbort(4000);
297 m_ndb->releaseRecAttr(tAttr);
298 exit(-1);
299 DBUG_RETURN(NULL);
300 }
301 //theErrorLine++;
302
303 tAttr->setUNDEFINED();
304
305 // We want to keep the list sorted to make data insertion easier later
306
307 if (*theFirstAttr == NULL) {
308 *theFirstAttr = tAttr;
309 *theCurrentAttr = tAttr;
310 tAttr->next(NULL);
311 } else {
312 Uint32 tAttrId = tAttrInfo->m_attrId;
313 if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
314 (*theCurrentAttr)->next(tAttr);
315 tAttr->next(NULL);
316 *theCurrentAttr = tAttr;
317 } else if ((*theFirstAttr)->next() == NULL || // only one in list
318 (*theFirstAttr)->attrId() > tAttrId) {// or first
319 tAttr->next(*theFirstAttr);
320 *theFirstAttr = tAttr;
321 } else { // at least 2 in list and not first and not last
322 NdbRecAttr *p = *theFirstAttr;
323 NdbRecAttr *p_next = p->next();
324 while (tAttrId > p_next->attrId()) {
325 p = p_next;
326 p_next = p->next();
327 }
328 if (tAttrId == p_next->attrId()) { // Using same attribute twice
329 tAttr->release(); // do I need to do this?
330 m_ndb->releaseRecAttr(tAttr);
331 exit(-1);
332 DBUG_RETURN(NULL);
333 }
334 // this is it, between p and p_next
335 p->next(tAttr);
336 tAttr->next(p_next);
337 }
338 }
339 DBUG_RETURN(tAttr);
340 }
341
342 NdbBlob*
getBlobHandle(const char * colName,int n)343 NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
344 {
345 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
346
347 assert(m_mergeEvents);
348
349 if (m_state != EO_CREATED) {
350 ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
351 "instantiation and execute()");
352 DBUG_RETURN(NULL);
353 }
354
355 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
356
357 if (tAttrInfo == NULL) {
358 ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
359 DBUG_RETURN(NULL);
360 }
361
362 NdbBlob* bh = getBlobHandle(tAttrInfo, n);
363 DBUG_RETURN(bh);
364 }
365
366 NdbBlob*
getBlobHandle(const NdbColumnImpl * tAttrInfo,int n)367 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
368 {
369 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
370 DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
371
372 // as in NdbOperation, create only one instance
373 NdbBlob* tBlob = theBlobList;
374 NdbBlob* tLastBlob = NULL;
375 while (tBlob != NULL) {
376 if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
377 DBUG_RETURN(tBlob);
378 tLastBlob = tBlob;
379 tBlob = tBlob->theNext;
380 }
381
382 NdbEventOperationImpl* tBlobOp = NULL;
383
384 const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
385 assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
386
387 if (! is_tinyblob) {
388 // blob event name
389 char bename[MAX_TAB_NAME_SIZE];
390 NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
391
392 // find blob event op if any (it serves both post and pre handles)
393 tBlobOp = theBlobOpList;
394 NdbEventOperationImpl* tLastBlopOp = NULL;
395 while (tBlobOp != NULL) {
396 if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
397 break;
398 }
399 tLastBlopOp = tBlobOp;
400 tBlobOp = tBlobOp->m_next;
401 }
402
403 DBUG_PRINT("info", ("%s blob event op for %s",
404 tBlobOp ? " reuse" : " create", bename));
405
406 // create blob event op if not found
407 if (tBlobOp == NULL) {
408 // get blob event
409 NdbDictionaryImpl& dict =
410 NdbDictionaryImpl::getImpl(*m_ndb->getDictionary());
411 NdbEventImpl* blobEvnt =
412 dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
413 if (blobEvnt == NULL) {
414 m_error.code = dict.m_error.code;
415 DBUG_RETURN(NULL);
416 }
417
418 // create blob event operation
419 tBlobOp =
420 m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
421 if (tBlobOp == NULL)
422 DBUG_RETURN(NULL);
423
424 // pointer to main table op
425 tBlobOp->theMainOp = this;
426 tBlobOp->m_mergeEvents = m_mergeEvents;
427 tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
428
429 // to hide blob op it is linked under main op, not under m_ndb
430 if (tLastBlopOp == NULL)
431 theBlobOpList = tBlobOp;
432 else
433 tLastBlopOp->m_next = tBlobOp;
434 tBlobOp->m_next = NULL;
435 }
436 }
437
438 tBlob = m_ndb->getNdbBlob();
439 if (tBlob == NULL) {
440 m_error.code = m_ndb->getNdbError().code;
441 DBUG_RETURN(NULL);
442 }
443
444 // calls getValue on inline and blob part
445 if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
446 m_error.code = tBlob->getNdbError().code;
447 m_ndb->releaseNdbBlob(tBlob);
448 DBUG_RETURN(NULL);
449 }
450
451 // add to list end
452 if (tLastBlob == NULL)
453 theBlobList = tBlob;
454 else
455 tLastBlob->theNext = tBlob;
456 tBlob->theNext = NULL;
457 DBUG_RETURN(tBlob);
458 }
459
460 Uint32
get_blob_part_no(bool hasDist)461 NdbEventOperationImpl::get_blob_part_no(bool hasDist)
462 {
463 assert(theBlobVersion == 1 || theBlobVersion == 2);
464 assert(theMainOp != NULL);
465 const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
466 assert(m_data_item != NULL);
467 LinearSectionPtr (&ptr)[3] = m_data_item->ptr;
468
469 uint pos = 0; // PK and possibly DIST to skip
470
471 if (unlikely(theBlobVersion == 1)) {
472 pos += AttributeHeader(ptr[0].p[0]).getDataSize();
473 assert(hasDist);
474 pos += AttributeHeader(ptr[0].p[1]).getDataSize();
475 } else {
476 uint n = mainTable->m_noOfKeys;
477 uint i;
478 for (i = 0; i < n; i++) {
479 pos += AttributeHeader(ptr[0].p[i]).getDataSize();
480 }
481 if (hasDist)
482 pos += AttributeHeader(ptr[0].p[n]).getDataSize();
483 }
484
485 assert(pos < ptr[1].sz);
486 Uint32 no = ptr[1].p[pos];
487 return no;
488 }
489
490 int
readBlobParts(char * buf,NdbBlob * blob,Uint32 part,Uint32 count,Uint16 * lenLoc)491 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
492 Uint32 part, Uint32 count, Uint16* lenLoc)
493 {
494 DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
495 DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
496 part, count, blob->theEventBlobVersion));
497
498 NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
499 const bool hasDist = (blob->theStripeSize != 0);
500
501 EventBufData* main_data = m_data_item;
502 DBUG_PRINT_EVENT("info", ("main_data=%p", main_data));
503 assert(main_data != NULL);
504
505 // search for blob parts list head
506 EventBufData* head;
507 assert(m_data_item != NULL);
508 head = m_data_item->m_next_blob;
509 while (head != NULL)
510 {
511 if (head->m_event_op == blob_op)
512 {
513 DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
514 break;
515 }
516 head = head->m_next_blob;
517 }
518
519 Uint32 nparts = 0;
520 Uint32 noutside = 0;
521 EventBufData* data = head;
522 // XXX optimize using part no ordering
523 while (data != NULL)
524 {
525 /*
526 * Hack part no directly out of buffer since it is not returned
527 * in pre data (PK buglet). For part data use receive_event().
528 * This means extra copy. XXX fix
529 */
530 blob_op->m_data_item = data;
531 int r = blob_op->receive_event();
532 assert(r > 0);
533 // XXX should be: no = blob->theBlobEventPartValue
534 Uint32 no = blob_op->get_blob_part_no(hasDist);
535
536 DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part", data, no));
537
538 if (part <= no && no < part + count)
539 {
540 DBUG_PRINT_EVENT("info", ("part within read range"));
541
542 const char* src = blob->theBlobEventDataBuf.data;
543 Uint32 sz = 0;
544 if (blob->theFixedDataFlag) {
545 sz = blob->thePartSize;
546 } else {
547 const uchar* p = (const uchar*)blob->theBlobEventDataBuf.data;
548 sz = p[0] + (p[1] << 8);
549 src += 2;
550 }
551 memcpy(buf + (no - part) * sz, src, sz);
552 nparts++;
553 if (lenLoc != NULL) {
554 assert(count == 1);
555 *lenLoc = sz;
556 } else {
557 assert(sz == blob->thePartSize);
558 }
559 }
560 else
561 {
562 DBUG_PRINT_EVENT("info", ("part outside read range"));
563 noutside++;
564 }
565 data = data->m_next;
566 }
567 if (unlikely(nparts != count))
568 {
569 ndbout_c("nparts: %u count: %u noutside: %u", nparts, count, noutside);
570 }
571 assert(nparts == count);
572
573 DBUG_RETURN_EVENT(0);
574 }
575
576 int
execute()577 NdbEventOperationImpl::execute()
578 {
579 DBUG_ENTER("NdbEventOperationImpl::execute");
580 m_ndb->theEventBuffer->add_drop_lock();
581 int r = execute_nolock();
582 m_ndb->theEventBuffer->add_drop_unlock();
583 DBUG_RETURN(r);
584 }
585
586 int
execute_nolock()587 NdbEventOperationImpl::execute_nolock()
588 {
589 DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
590 DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
591
592 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
593 if (!myDict) {
594 m_error.code= m_ndb->getNdbError().code;
595 DBUG_RETURN(-1);
596 }
597
598 bool schemaTrans = false;
599 if (m_ndb->theEventBuffer->m_total_buckets == TOTAL_BUCKETS_INIT)
600 {
601 int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(false);
602 if (res != 0)
603 {
604 switch(myDict->getNdbError().code){
605 case 711:
606 case 763:
607 // ignore;
608 break;
609 default:
610 m_error.code= myDict->getNdbError().code;
611 DBUG_RETURN(-1);
612 }
613 }
614 else
615 {
616 schemaTrans = true;
617 }
618 }
619
620 if (theFirstPkAttrs[0] == NULL &&
621 theFirstDataAttrs[0] == NULL) { // defaults to get all
622 }
623
624 m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
625 m_state= EO_EXECUTING;
626 mi_type= m_eventImpl->mi_type;
627 m_ndb->theEventBuffer->add_op();
628 // add kernel reference
629 // removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
630 m_ref_count++;
631 m_stop_gci= ~(Uint64)0;
632 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
633 Uint32 buckets = 0;
634 int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this,
635 buckets);
636 if (r == 0)
637 {
638 /* Pre-7.0 kernel nodes do not return the number of buckets
639 * Assume it's == theNoOfDBnodes as was the case in 6.3
640 */
641 if (buckets == ~ (Uint32)0)
642 buckets = m_ndb->theImpl->theNoOfDBnodes;
643
644 m_ndb->theEventBuffer->set_total_buckets(buckets);
645 if (schemaTrans)
646 {
647 schemaTrans = false;
648 myDict->endSchemaTrans(1);
649 }
650
651 if (theMainOp == NULL) {
652 DBUG_PRINT("info", ("execute blob ops"));
653 NdbEventOperationImpl* blob_op = theBlobOpList;
654 while (blob_op != NULL) {
655 r = blob_op->execute_nolock();
656 if (r != 0) {
657 // since main op is running and possibly some blob ops as well
658 // we can't just reset the main op. Instead return with error,
659 // main op (and blob ops) will be cleaned up when user calls
660 // dropEventOperation
661 m_error.code= myDict->getNdbError().code;
662 DBUG_RETURN(r);
663 }
664 blob_op = blob_op->m_next;
665 }
666 }
667 if (r == 0)
668 {
669 DBUG_RETURN(0);
670 }
671 }
672 // Error
673 // remove kernel reference
674 // added above
675 m_ref_count--;
676 m_stop_gci = 0;
677 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
678 m_state= EO_ERROR;
679 mi_type= 0;
680 m_magic_number= 0;
681 m_error.code= myDict->getNdbError().code;
682 m_ndb->theEventBuffer->remove_op();
683
684 if (schemaTrans)
685 {
686 schemaTrans = false;
687 myDict->endSchemaTrans(1);
688 }
689
690 DBUG_RETURN(r);
691 }
692
693 int
stop()694 NdbEventOperationImpl::stop()
695 {
696 DBUG_ENTER("NdbEventOperationImpl::stop");
697 int i;
698
699 for (i=0 ; i<2; i++) {
700 NdbRecAttr *p = theFirstPkAttrs[i];
701 while (p) {
702 NdbRecAttr *p_next = p->next();
703 m_ndb->releaseRecAttr(p);
704 p = p_next;
705 }
706 theFirstPkAttrs[i]= 0;
707 }
708 for (i=0 ; i<2; i++) {
709 NdbRecAttr *p = theFirstDataAttrs[i];
710 while (p) {
711 NdbRecAttr *p_next = p->next();
712 m_ndb->releaseRecAttr(p);
713 p = p_next;
714 }
715 theFirstDataAttrs[i]= 0;
716 }
717
718 if (m_state != EO_EXECUTING)
719 {
720 DBUG_RETURN(-1);
721 }
722
723 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
724 if (!myDict) {
725 m_error.code= m_ndb->getNdbError().code;
726 DBUG_RETURN(-1);
727 }
728
729 m_ndb->theEventBuffer->add_drop_lock();
730 int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
731 m_ndb->theEventBuffer->remove_op();
732 m_state= EO_DROPPED;
733 mi_type= 0;
734 if (r == 0) {
735 if (m_stop_gci == 0)
736 {
737 // response from old kernel
738 Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
739 if (gci)
740 {
741 // calculate a "safe" gci in the future to remove event op.
742 gci += Uint64(3) << 32;
743 }
744 else
745 {
746 // set highest value to ensure that operation does not get dropped
747 // too early. Note '-1' as ~Uint64(0) indicates active event
748 gci = ~Uint64(0)-1;
749 }
750 m_stop_gci = gci;
751 }
752 m_ndb->theEventBuffer->add_drop_unlock();
753 DBUG_RETURN(0);
754 }
755 //Error
756 m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
757 m_state= EO_ERROR;
758 m_ndb->theEventBuffer->add_drop_unlock();
759 DBUG_RETURN(r);
760 }
761
tableNameChanged() const762 bool NdbEventOperationImpl::tableNameChanged() const
763 {
764 return (bool)AlterTableReq::getNameFlag(m_change_mask);
765 }
766
tableFrmChanged() const767 bool NdbEventOperationImpl::tableFrmChanged() const
768 {
769 return (bool)AlterTableReq::getFrmFlag(m_change_mask);
770 }
771
tableFragmentationChanged() const772 bool NdbEventOperationImpl::tableFragmentationChanged() const
773 {
774 return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
775 }
776
tableRangeListChanged() const777 bool NdbEventOperationImpl::tableRangeListChanged() const
778 {
779 return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
780 }
781
782 Uint64
getGCI()783 NdbEventOperationImpl::getGCI()
784 {
785 Uint32 gci_hi = m_data_item->sdata->gci_hi;
786 Uint32 gci_lo = m_data_item->sdata->gci_lo;
787 return gci_lo | (Uint64(gci_hi) << 32);
788 }
789
790 Uint32
getAnyValue() const791 NdbEventOperationImpl::getAnyValue() const
792 {
793 return m_data_item->sdata->anyValue;
794 }
795
796 Uint64
getLatestGCI()797 NdbEventOperationImpl::getLatestGCI()
798 {
799 return m_ndb->theEventBuffer->getLatestGCI();
800 }
801
802 Uint64
getTransId() const803 NdbEventOperationImpl::getTransId() const
804 {
805 /* Return 64 bit composite */
806 Uint32 transId1 = m_data_item->sdata->transId1;
807 Uint32 transId2 = m_data_item->sdata->transId2;
808 return Uint64(transId1) << 32 | transId2;
809 }
810
811 bool
execSUB_TABLE_DATA(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])812 NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal,
813 const LinearSectionPtr ptr[3])
814 {
815 DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
816 const SubTableData * const sdata=
817 CAST_CONSTPTR(SubTableData, signal->getDataPtr());
818
819 if(signal->isFirstFragment()){
820 m_fragmentId = signal->getFragmentId();
821 m_buffer.grow(4 * sdata->totalLen);
822 } else {
823 if(m_fragmentId != signal->getFragmentId()){
824 abort();
825 }
826 }
827
828 const Uint32 i = SubTableData::DICT_TAB_INFO;
829 DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
830 4 * ptr[i].sz, m_fragmentId));
831 m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
832
833 if(!signal->isLastFragment()){
834 DBUG_RETURN(FALSE);
835 }
836
837 DBUG_RETURN(TRUE);
838 }
839
840
841 int
receive_event()842 NdbEventOperationImpl::receive_event()
843 {
844 Uint32 operation=
845 SubTableData::getOperation(m_data_item->sdata->requestInfo);
846 if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
847 {
848 DBUG_ENTER("NdbEventOperationImpl::receive_event");
849 DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this));
850 m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
851 if (operation == NdbDictionary::Event::_TE_ALTER)
852 {
853 // Parse the new table definition and
854 // create a table object
855 NdbDictInterface::Tx tx_unused;
856 NdbError error;
857 int warn;
858 NdbDictInterface dif(tx_unused, error, warn);
859 NdbTableImpl *at;
860 m_change_mask = m_data_item->sdata->changeMask;
861 error.code = dif.parseTableInfo(&at,
862 (Uint32*)m_buffer.get_data(),
863 m_buffer.length() / 4,
864 true);
865 m_buffer.clear();
866 if (unlikely(error.code))
867 {
868 DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
869 error.code));
870 ndbout_c("Failed to parse DictTabInfo error %u", error.code);
871 DBUG_RETURN(1);
872 }
873 at->buildColumnHash();
874
875 NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
876 m_eventImpl->m_tableImpl = at;
877
878 DBUG_PRINT("info", ("switching table impl 0x%lx -> 0x%lx",
879 (long) tmp_table_impl, (long) at));
880
881 // change the rec attrs to refer to the new table object
882 int i;
883 for (i = 0; i < 2; i++)
884 {
885 NdbRecAttr *p = theFirstPkAttrs[i];
886 while (p)
887 {
888 int no = p->getColumn()->getColumnNo();
889 NdbColumnImpl *tAttrInfo = at->getColumn(no);
890 DBUG_PRINT("info", ("rec_attr: 0x%lx "
891 "switching column impl 0x%lx -> 0x%lx",
892 (long) p, (long) p->m_column, (long) tAttrInfo));
893 p->m_column = tAttrInfo;
894 p = p->next();
895 }
896 }
897 for (i = 0; i < 2; i++)
898 {
899 NdbRecAttr *p = theFirstDataAttrs[i];
900 while (p)
901 {
902 int no = p->getColumn()->getColumnNo();
903 NdbColumnImpl *tAttrInfo = at->getColumn(no);
904 DBUG_PRINT("info", ("rec_attr: 0x%lx "
905 "switching column impl 0x%lx -> 0x%lx",
906 (long) p, (long) p->m_column, (long) tAttrInfo));
907 p->m_column = tAttrInfo;
908 p = p->next();
909 }
910 }
911 // change the blobHandle's to refer to the new table object.
912 NdbBlob *p = theBlobList;
913 while (p)
914 {
915 int no = p->getColumn()->getColumnNo();
916 NdbColumnImpl *tAttrInfo = at->getColumn(no);
917 DBUG_PRINT("info", ("blob_handle: 0x%lx "
918 "switching column impl 0x%lx -> 0x%lx",
919 (long) p, (long) p->theColumn, (long) tAttrInfo));
920 p->theColumn = tAttrInfo;
921 p = p->next();
922 }
923 if (tmp_table_impl)
924 delete tmp_table_impl;
925 }
926 DBUG_RETURN(1);
927 }
928
929 DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
930 DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this));
931 // now move the data into the RecAttrs
932 m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
933
934 int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
935
936 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
937 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
938 Uint32 *aDataPtr = m_data_item->ptr[1].p;
939
940 DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
941 DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
942
943 // copy data into the RecAttr's
944 // we assume that the respective attribute lists are sorted
945
946 // first the pk's
947 {
948 NdbRecAttr *tAttr= theFirstPkAttrs[0];
949 NdbRecAttr *tAttr1= theFirstPkAttrs[1];
950 while(tAttr)
951 {
952 assert(aAttrPtr < aAttrEndPtr);
953 unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
954 assert(tAttr->attrId() ==
955 AttributeHeader(*aAttrPtr).getAttributeId());
956 receive_data(tAttr, aDataPtr, tDataSz);
957 if (!is_insert)
958 receive_data(tAttr1, aDataPtr, tDataSz);
959 else
960 tAttr1->setUNDEFINED(); // do not leave unspecified
961 tAttr1= tAttr1->next();
962 // next
963 aAttrPtr++;
964 aDataPtr+= (tDataSz + 3) >> 2;
965 tAttr= tAttr->next();
966 }
967 }
968
969 NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
970
971 Uint32 tRecAttrId;
972 Uint32 tAttrId;
973 Uint32 tDataSz;
974 int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE);
975 while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
976 tRecAttrId = tWorkingRecAttr->attrId();
977 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
978 tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
979
980 while (tAttrId > tRecAttrId) {
981 DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
982 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
983 tWorkingRecAttr->setUNDEFINED();
984 tWorkingRecAttr = tWorkingRecAttr->next();
985 if (tWorkingRecAttr == NULL)
986 break;
987 tRecAttrId = tWorkingRecAttr->attrId();
988 }
989 if (tWorkingRecAttr == NULL)
990 break;
991
992 if (tAttrId == tRecAttrId) {
993 hasSomeData=1;
994
995 DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
996 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
997
998 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
999 tWorkingRecAttr = tWorkingRecAttr->next();
1000 }
1001 aAttrPtr++;
1002 aDataPtr += (tDataSz + 3) >> 2;
1003 }
1004
1005 while (tWorkingRecAttr != NULL) {
1006 tRecAttrId = tWorkingRecAttr->attrId();
1007 //printf("set undefined [%u] %u %u [%u]\n",
1008 // tAttrId, tDataSz, *aDataPtr, tRecAttrId);
1009 tWorkingRecAttr->setUNDEFINED();
1010 tWorkingRecAttr = tWorkingRecAttr->next();
1011 }
1012
1013 tWorkingRecAttr = theFirstDataAttrs[1];
1014 aDataPtr = m_data_item->ptr[2].p;
1015 Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1016 while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1017 tRecAttrId = tWorkingRecAttr->attrId();
1018 tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
1019 tDataSz = AttributeHeader(*aDataPtr).getByteSize();
1020 aDataPtr++;
1021 while (tAttrId > tRecAttrId) {
1022 tWorkingRecAttr->setUNDEFINED();
1023 tWorkingRecAttr = tWorkingRecAttr->next();
1024 if (tWorkingRecAttr == NULL)
1025 break;
1026 tRecAttrId = tWorkingRecAttr->attrId();
1027 }
1028 if (tWorkingRecAttr == NULL)
1029 break;
1030 if (tAttrId == tRecAttrId) {
1031 assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
1032 hasSomeData=1;
1033
1034 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1035 tWorkingRecAttr = tWorkingRecAttr->next();
1036 }
1037 aDataPtr += (tDataSz + 3) >> 2;
1038 }
1039 while (tWorkingRecAttr != NULL) {
1040 tWorkingRecAttr->setUNDEFINED();
1041 tWorkingRecAttr = tWorkingRecAttr->next();
1042 }
1043
1044 if (hasSomeData)
1045 {
1046 DBUG_RETURN_EVENT(1);
1047 }
1048
1049 DBUG_RETURN_EVENT(0);
1050 }
1051
1052 NdbDictionary::Event::TableEvent
getEventType()1053 NdbEventOperationImpl::getEventType()
1054 {
1055 return (NdbDictionary::Event::TableEvent)
1056 (1 << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1057 }
1058
1059
1060
1061 void
print()1062 NdbEventOperationImpl::print()
1063 {
1064 int i;
1065 ndbout << "EventId " << m_eventId << "\n";
1066
1067 for (i = 0; i < 2; i++) {
1068 NdbRecAttr *p = theFirstPkAttrs[i];
1069 ndbout << " %u " << i;
1070 while (p) {
1071 ndbout << " : " << p->attrId() << " = " << *p;
1072 p = p->next();
1073 }
1074 ndbout << "\n";
1075 }
1076 for (i = 0; i < 2; i++) {
1077 NdbRecAttr *p = theFirstDataAttrs[i];
1078 ndbout << " %u " << i;
1079 while (p) {
1080 ndbout << " : " << p->attrId() << " = " << *p;
1081 p = p->next();
1082 }
1083 ndbout << "\n";
1084 }
1085 }
1086
1087 void
printAll()1088 NdbEventOperationImpl::printAll()
1089 {
1090 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1091 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1092 Uint32 *aDataPtr = m_data_item->ptr[1].p;
1093
1094 //tRecAttr->setup(tAttrInfo, aValue)) {
1095
1096 Uint32 tAttrId;
1097 Uint32 tDataSz;
1098 for (; aAttrPtr < aAttrEndPtr; ) {
1099 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1100 tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
1101
1102 aAttrPtr++;
1103 aDataPtr += tDataSz;
1104 }
1105 }
1106
1107 /*
1108 * Class NdbEventBuffer
1109 * Each Ndb object has a Object.
1110 */
NdbEventBuffer(Ndb * ndb)1111 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
1112 m_total_buckets(TOTAL_BUCKETS_INIT),
1113 m_min_gci_index(0),
1114 m_max_gci_index(0),
1115 m_ndb(ndb),
1116 m_latestGCI(0), m_latest_complete_GCI(0),
1117 m_highest_sub_gcp_complete_GCI(0),
1118 m_latest_poll_GCI(0),
1119 m_total_alloc(0),
1120 m_free_thresh(0),
1121 m_min_free_thresh(0),
1122 m_max_free_thresh(0),
1123 m_gci_slip_thresh(0),
1124 m_dropped_ev_op(0),
1125 m_active_op_count(0),
1126 m_add_drop_mutex(0)
1127 {
1128 #ifdef VM_TRACE
1129 m_latest_command= "NdbEventBuffer::NdbEventBuffer";
1130 m_flush_gci = 0;
1131 #endif
1132
1133 if ((p_cond = NdbCondition_Create()) == NULL) {
1134 ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
1135 exit(-1);
1136 }
1137 m_mutex = 0; // Set in Ndb::init()
1138
1139 // ToDo set event buffer size
1140 // pre allocate event data array
1141 m_sz= 0;
1142 #ifdef VM_TRACE
1143 m_free_data_count= 0;
1144 #endif
1145 m_free_data= 0;
1146 m_free_data_sz= 0;
1147
1148 // get reference to mutex managed by current connection
1149 m_add_drop_mutex=
1150 m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1151
1152 // initialize lists
1153 bzero(&g_empty_gci_container, sizeof(Gci_container));
1154 init_gci_containers();
1155
1156 m_alive_node_bit_mask.clear();
1157 }
1158
~NdbEventBuffer()1159 NdbEventBuffer::~NdbEventBuffer()
1160 {
1161 // todo lock? what if receive thread writes here?
1162 NdbEventOperationImpl* op= m_dropped_ev_op;
1163 while ((op = m_dropped_ev_op))
1164 {
1165 m_dropped_ev_op = m_dropped_ev_op->m_next;
1166 delete op->m_facade;
1167 }
1168
1169 unsigned j;
1170 Uint32 sz= m_active_gci.size();
1171 Gci_container* array = (Gci_container*)m_active_gci.getBase();
1172 for(j = 0; j < sz; j++)
1173 {
1174 array[j].~Gci_container();
1175 }
1176
1177 for (j= 0; j < m_allocated_data.size(); j++)
1178 {
1179 unsigned sz= m_allocated_data[j]->sz;
1180 EventBufData *data= m_allocated_data[j]->data;
1181 EventBufData *end_data= data+sz;
1182 for (; data < end_data; data++)
1183 {
1184 if (data->sdata)
1185 NdbMem_Free(data->sdata);
1186 }
1187 NdbMem_Free((char*)m_allocated_data[j]);
1188 }
1189
1190 NdbCondition_Destroy(p_cond);
1191 }
1192
1193 void
add_op()1194 NdbEventBuffer::add_op()
1195 {
1196 if(m_active_op_count == 0)
1197 {
1198 init_gci_containers();
1199 }
1200 m_active_op_count++;
1201 }
1202
1203 void
remove_op()1204 NdbEventBuffer::remove_op()
1205 {
1206 m_active_op_count--;
1207 }
1208
1209 void
init_gci_containers()1210 NdbEventBuffer::init_gci_containers()
1211 {
1212 m_startup_hack = true;
1213 bzero(&m_complete_data, sizeof(m_complete_data));
1214 m_latest_complete_GCI = m_latestGCI = m_latest_poll_GCI = 0;
1215 m_active_gci.clear();
1216 m_active_gci.fill(3, g_empty_gci_container);
1217 m_min_gci_index = m_max_gci_index = 1;
1218 Uint64 gci = 0;
1219 m_known_gci.clear();
1220 m_known_gci.fill(7, gci);
1221 }
1222
expand(unsigned sz)1223 int NdbEventBuffer::expand(unsigned sz)
1224 {
1225 unsigned alloc_size=
1226 sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
1227 EventBufData_chunk *chunk_data=
1228 (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1229
1230 chunk_data->sz= sz;
1231 m_allocated_data.push_back(chunk_data);
1232
1233 EventBufData *data= chunk_data->data;
1234 EventBufData *end_data= data+sz;
1235 EventBufData *last_data= m_free_data;
1236
1237 bzero((void*)data, sz*sizeof(EventBufData));
1238 for (; data < end_data; data++)
1239 {
1240 data->m_next= last_data;
1241 last_data= data;
1242 }
1243 m_free_data= last_data;
1244
1245 m_sz+= sz;
1246 #ifdef VM_TRACE
1247 m_free_data_count+= sz;
1248 #endif
1249 return 0;
1250 }
1251
1252 int
pollEvents(int aMillisecondNumber,Uint64 * latestGCI)1253 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
1254 {
1255 int ret= 1;
1256 #ifdef VM_TRACE
1257 const char *m_latest_command_save= m_latest_command;
1258 m_latest_command= "NdbEventBuffer::pollEvents";
1259 #endif
1260
1261 NdbMutex_Lock(m_mutex);
1262 NdbEventOperationImpl *ev_op= move_data();
1263 if (unlikely(ev_op == 0 && aMillisecondNumber))
1264 {
1265 NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1266 ev_op= move_data();
1267 if (unlikely(ev_op == 0))
1268 ret= 0;
1269 }
1270 m_latest_poll_GCI= m_latestGCI;
1271 #ifdef VM_TRACE
1272 if (ev_op)
1273 {
1274 // m_mutex is locked
1275 // update event ops data counters
1276 ev_op->m_data_count-= ev_op->m_data_done_count;
1277 ev_op->m_data_done_count= 0;
1278 }
1279 m_latest_command= m_latest_command_save;
1280 #endif
1281 if (unlikely(ev_op == 0))
1282 {
1283 /*
1284 gci's consumed up until m_latest_poll_GCI, so we can free all
1285 dropped event operations stopped up until that gci
1286 */
1287 deleteUsedEventOperations(m_latest_poll_GCI);
1288 }
1289 NdbMutex_Unlock(m_mutex); // we have moved the data
1290
1291 if (latestGCI)
1292 *latestGCI= m_latest_poll_GCI;
1293
1294 return ret;
1295 }
1296
1297 int
flushIncompleteEvents(Uint64 gci)1298 NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
1299 {
1300 /**
1301 * Find min complete gci
1302 */
1303 Uint64 * array = m_known_gci.getBase();
1304 Uint32 mask = m_known_gci.size() - 1;
1305 Uint32 minpos = m_min_gci_index;
1306 Uint32 maxpos = m_max_gci_index;
1307
1308 g_eventLogger->info("Flushing incomplete GCI:s < %u/%u",
1309 Uint32(gci >> 32), Uint32(gci));
1310 while (minpos != maxpos && array[minpos] < gci)
1311 {
1312 Gci_container* tmp = find_bucket(array[minpos]);
1313 assert(tmp);
1314 assert(maxpos == m_max_gci_index);
1315
1316 if(!tmp->m_data.is_empty())
1317 {
1318 free_list(tmp->m_data);
1319 }
1320 tmp->~Gci_container();
1321 bzero(tmp, sizeof(Gci_container));
1322 minpos = (minpos + 1) & mask;
1323 }
1324
1325 m_min_gci_index = minpos;
1326
1327 #ifdef VM_TRACE
1328 m_flush_gci = gci;
1329 #endif
1330
1331 return 0;
1332 }
1333
1334 NdbEventOperation *
nextEvent()1335 NdbEventBuffer::nextEvent()
1336 {
1337 DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent");
1338 #ifdef VM_TRACE
1339 const char *m_latest_command_save= m_latest_command;
1340 #endif
1341
1342 if (m_used_data.m_count > 1024)
1343 {
1344 #ifdef VM_TRACE
1345 m_latest_command= "NdbEventBuffer::nextEvent (lock)";
1346 #endif
1347 NdbMutex_Lock(m_mutex);
1348 // return m_used_data to m_free_data
1349 free_list(m_used_data);
1350
1351 NdbMutex_Unlock(m_mutex);
1352 }
1353 #ifdef VM_TRACE
1354 m_latest_command= "NdbEventBuffer::nextEvent";
1355 #endif
1356
1357 EventBufData *data;
1358 Uint64 gci= 0;
1359 while ((data= m_available_data.m_head))
1360 {
1361 NdbEventOperationImpl *op= data->m_event_op;
1362
1363 /*
1364 * The data was not associated with an event operation,
1365 * possibly a dummy event list marking missing data
1366 */
1367 if (!op && !isConsistent(gci))
1368 {
1369 DBUG_PRINT_EVENT("info", ("detected inconsistent gci %u", gci));
1370 DBUG_RETURN_EVENT(0);
1371 }
1372
1373 DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
1374
1375 /*
1376 * If merge is on, blob part sub-events must not be seen on this level.
1377 * If merge is not on, there are no blob part sub-events.
1378 */
1379 assert(op->theMainOp == NULL);
1380
1381 // set NdbEventOperation data
1382 op->m_data_item= data;
1383
1384 // remove item from m_available_data and return size
1385 Uint32 full_count, full_sz;
1386 m_available_data.remove_first(full_count, full_sz);
1387
1388 // add it to used list
1389 m_used_data.append_used_data(data, full_count, full_sz);
1390
1391 m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1392
1393 #ifdef VM_TRACE
1394 op->m_data_done_count++;
1395 #endif
1396
1397 if (op->m_state == NdbEventOperation::EO_EXECUTING)
1398 {
1399 int r= op->receive_event();
1400 if (r > 0)
1401 {
1402 #ifdef VM_TRACE
1403 m_latest_command= m_latest_command_save;
1404 #endif
1405 NdbBlob* tBlob = op->theBlobList;
1406 while (tBlob != NULL)
1407 {
1408 (void)tBlob->atNextEvent();
1409 tBlob = tBlob->theNext;
1410 }
1411 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1412 while (gci_ops && op->getGCI() > gci_ops->m_gci)
1413 {
1414 gci_ops = m_available_data.delete_next_gci_ops();
1415 }
1416 if (!gci_ops->m_consistent)
1417 DBUG_RETURN_EVENT(0);
1418 assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
1419 // to return TE_NUL it should be made into data event
1420 if (SubTableData::getOperation(data->sdata->requestInfo) ==
1421 NdbDictionary::Event::_TE_NUL)
1422 {
1423 DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
1424 continue;
1425 }
1426 DBUG_RETURN_EVENT(op->m_facade);
1427 }
1428 // the next event belonged to an event op that is no
1429 // longer valid, skip to next
1430 continue;
1431 }
1432 #ifdef VM_TRACE
1433 m_latest_command= m_latest_command_save;
1434 #endif
1435 }
1436 m_error.code= 0;
1437 #ifdef VM_TRACE
1438 m_latest_command= m_latest_command_save;
1439 #endif
1440
1441 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1442 while (gci_ops)
1443 {
1444 gci_ops = m_available_data.delete_next_gci_ops();
1445 }
1446 /*
1447 gci's consumed up until m_latest_poll_GCI, so we can free all
1448 dropped event operations stopped up until that gci
1449 */
1450 if (m_dropped_ev_op)
1451 {
1452 NdbMutex_Lock(m_mutex);
1453 deleteUsedEventOperations(m_latest_poll_GCI);
1454 NdbMutex_Unlock(m_mutex);
1455 }
1456 DBUG_RETURN_EVENT(0);
1457 }
1458
1459 bool
isConsistent(Uint64 & gci)1460 NdbEventBuffer::isConsistent(Uint64& gci)
1461 {
1462 DBUG_ENTER("NdbEventBuffer::isConsistent");
1463 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1464 while (gci_ops)
1465 {
1466 if (!gci_ops->m_consistent)
1467 {
1468 gci = gci_ops->m_gci;
1469 DBUG_RETURN(false);
1470 }
1471 gci_ops = gci_ops->m_next;
1472 }
1473
1474 DBUG_RETURN(true);
1475 }
1476
1477 bool
isConsistentGCI(Uint64 gci)1478 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1479 {
1480 DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
1481 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1482 while (gci_ops)
1483 {
1484 if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
1485 DBUG_RETURN(false);
1486 gci_ops = gci_ops->m_next;
1487 }
1488
1489 DBUG_RETURN(true);
1490 }
1491
1492
1493 NdbEventOperationImpl*
getGCIEventOperations(Uint32 * iter,Uint32 * event_types)1494 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1495 {
1496 DBUG_ENTER("NdbEventBuffer::getGCIEventOperations");
1497 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1498 if (*iter < gci_ops->m_gci_op_count)
1499 {
1500 EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
1501 if (event_types != NULL)
1502 *event_types = g.event_types;
1503 DBUG_PRINT("info", ("gci: %u g.op: 0x%lx g.event_types: 0x%lx",
1504 (unsigned)gci_ops->m_gci, (long) g.op,
1505 (long) g.event_types));
1506 DBUG_RETURN(g.op);
1507 }
1508 DBUG_RETURN(NULL);
1509 }
1510
1511 void
deleteUsedEventOperations(Uint64 last_consumed_gci)1512 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1513 {
1514 NdbEventOperationImpl *op= m_dropped_ev_op;
1515 while (op && op->m_stop_gci)
1516 {
1517 if (last_consumed_gci > op->m_stop_gci)
1518 {
1519 while (op)
1520 {
1521 NdbEventOperationImpl *next_op= op->m_next;
1522 op->m_stop_gci= 0;
1523 op->m_ref_count--;
1524 if (op->m_ref_count == 0)
1525 {
1526 if (op->m_next)
1527 op->m_next->m_prev = op->m_prev;
1528 if (op->m_prev)
1529 op->m_prev->m_next = op->m_next;
1530 else
1531 m_dropped_ev_op = op->m_next;
1532 delete op->m_facade;
1533 }
1534 op = next_op;
1535 }
1536 break;
1537 }
1538 op = op->m_next;
1539 }
1540 }
1541
1542 #ifdef VM_TRACE
1543 static
1544 NdbOut&
operator <<(NdbOut & out,const Gci_container & gci)1545 operator<<(NdbOut& out, const Gci_container& gci)
1546 {
1547 out << "[ GCI: " << (gci.m_gci >> 32) << "/" << (gci.m_gci & 0xFFFFFFFF)
1548 << " state: " << hex << gci.m_state
1549 << " head: " << hex << gci.m_data.m_head
1550 << " tail: " << hex << gci.m_data.m_tail
1551 #ifdef VM_TRACE
1552 << " cnt: " << dec << gci.m_data.m_count
1553 #endif
1554 << " gcp: " << dec << gci.m_gcp_complete_rep_count
1555 << "]";
1556 return out;
1557 }
1558
1559 static
1560 NdbOut&
operator <<(NdbOut & out,const Gci_container_pod & gci)1561 operator<<(NdbOut& out, const Gci_container_pod& gci)
1562 {
1563 Gci_container* ptr = (Gci_container*)&gci;
1564 out << *ptr;
1565 return out;
1566 }
1567 #endif
1568
1569 void
resize_known_gci()1570 NdbEventBuffer::resize_known_gci()
1571 {
1572 Uint32 minpos = m_min_gci_index;
1573 Uint32 maxpos = m_max_gci_index;
1574 Uint32 mask = m_known_gci.size() - 1;
1575
1576 Uint64 fill = 0;
1577 Uint32 newsize = 2 * (mask + 1);
1578 m_known_gci.fill(newsize - 1, fill);
1579 Uint64 * array = m_known_gci.getBase();
1580
1581 if (0)
1582 {
1583 printf("before (%u): ", minpos);
1584 for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1585 printf("%u/%u ",
1586 Uint32(array[i] >> 32),
1587 Uint32(array[i]));
1588 printf("\n");
1589 }
1590
1591 Uint32 idx = mask + 1; // Store eveything in "new" part of buffer
1592 if (0) printf("swapping ");
1593 while (minpos != maxpos)
1594 {
1595 if (0) printf("%u-%u ", minpos, idx);
1596 Uint64 tmp = array[idx];
1597 array[idx] = array[minpos];
1598 array[minpos] = tmp;
1599
1600 idx++;
1601 minpos = (minpos + 1) & mask; // NOTE old mask
1602 }
1603 if (0) printf("\n");
1604
1605 minpos = m_min_gci_index = mask + 1;
1606 maxpos = m_max_gci_index = idx;
1607 assert(minpos < maxpos);
1608
1609 if (0)
1610 {
1611 ndbout_c("resize_known_gci from %u to %u", (mask + 1), newsize);
1612 printf("after: ");
1613 for (Uint32 i = minpos; i < maxpos; i++)
1614 {
1615 printf("%u/%u ",
1616 Uint32(array[i] >> 32),
1617 Uint32(array[i]));
1618 }
1619 printf("\n");
1620 }
1621
1622 #ifdef VM_TRACE
1623 Uint64 gci = array[minpos];
1624 for (Uint32 i = minpos + 1; i<maxpos; i++)
1625 {
1626 assert(array[i] > gci);
1627 gci = array[i];
1628 }
1629 #endif
1630 }
1631
1632 #ifdef VM_TRACE
1633 void
verify_known_gci(bool allowempty)1634 NdbEventBuffer::verify_known_gci(bool allowempty)
1635 {
1636 Uint32 minpos = m_min_gci_index;
1637 Uint32 maxpos = m_max_gci_index;
1638 Uint32 mask = m_known_gci.size() - 1;
1639
1640 Uint32 line;
1641 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1642 if (m_min_gci_index == m_max_gci_index)
1643 {
1644 MMASSERT(allowempty);
1645 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1646 MMASSERT(((Gci_container*)(m_active_gci.getBase()+i))->m_gci == 0);
1647 return;
1648 }
1649
1650 {
1651 Uint64 last = m_known_gci[minpos];
1652 MMASSERT(last > m_latestGCI);
1653 MMASSERT(find_bucket(last) != 0);
1654 MMASSERT(maxpos == m_max_gci_index);
1655
1656 minpos = (minpos + 1) & mask;
1657 while (minpos != maxpos)
1658 {
1659 MMASSERT(m_known_gci[minpos] > last);
1660 last = m_known_gci[minpos];
1661 MMASSERT(find_bucket(last) != 0);
1662 MMASSERT(maxpos == m_max_gci_index);
1663 minpos = (minpos + 1) & mask;
1664 }
1665 }
1666
1667 {
1668 Gci_container* bucktets = (Gci_container*)(m_active_gci.getBase());
1669 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1670 {
1671 if (bucktets[i].m_gci)
1672 {
1673 bool found = false;
1674 for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
1675 j = (j + 1) & mask)
1676 {
1677 if (m_known_gci[j] == bucktets[i].m_gci)
1678 {
1679 found = true;
1680 break;
1681 }
1682 }
1683 if (!found)
1684 ndbout_c("%u/%u not found",
1685 Uint32(bucktets[i].m_gci >> 32),
1686 Uint32(bucktets[i].m_gci));
1687 MMASSERT(found == true);
1688 }
1689 }
1690 }
1691
1692 return;
1693 fail:
1694 ndbout_c("assertion at %d", line);
1695 printf("known gci: ");
1696 for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
1697 {
1698 printf("%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
1699 }
1700
1701 printf("\nContainers");
1702 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1703 ndbout << m_active_gci[i] << endl;
1704 abort();
1705 }
1706 #endif
1707
1708 Gci_container*
find_bucket_chained(Uint64 gci)1709 NdbEventBuffer::find_bucket_chained(Uint64 gci)
1710 {
1711 if (0)
1712 printf("find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
1713 if (unlikely(gci <= m_latestGCI))
1714 {
1715 /**
1716 * an already complete GCI
1717 */
1718 if (0)
1719 ndbout_c("already complete (%u/%u)",
1720 Uint32(m_latestGCI >> 32),
1721 Uint32(m_latestGCI));
1722 return 0;
1723 }
1724
1725 if (unlikely(m_total_buckets == 0))
1726 {
1727 return 0;
1728 }
1729
1730 Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
1731 Uint32 size = m_active_gci.size();
1732 Gci_container *buckets = (Gci_container*)(m_active_gci.getBase());
1733 while (pos < size)
1734 {
1735 Uint64 cmp = (buckets + pos)->m_gci;
1736 if (cmp == gci)
1737 {
1738 if (0)
1739 ndbout_c("found pos: %u", pos);
1740 return buckets + pos;
1741 }
1742
1743 if (cmp == 0)
1744 {
1745 if (0)
1746 ndbout_c("empty(%u) ", pos);
1747 Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
1748 while (search < size)
1749 {
1750 if ((buckets + search)->m_gci == gci)
1751 {
1752 memcpy(buckets + pos, buckets + search, sizeof(Gci_container));
1753 bzero(buckets + search, sizeof(Gci_container));
1754 if (0)
1755 printf("moved from %u to %u", search, pos);
1756 if (search == size - 1)
1757 {
1758 m_active_gci.erase(search);
1759 if (0)
1760 ndbout_c(" shrink");
1761 }
1762 else
1763 {
1764 if (0)
1765 printf("\n");
1766 }
1767 return buckets + pos;
1768 }
1769 search += ACTIVE_GCI_DIRECTORY_SIZE;
1770 }
1771 goto newbucket;
1772 }
1773 pos += ACTIVE_GCI_DIRECTORY_SIZE;
1774 }
1775
1776 /**
1777 * This is a new bucket...likely close to start
1778 */
1779 if (0)
1780 ndbout_c("new (with expand) ");
1781 m_active_gci.fill(pos, g_empty_gci_container);
1782 buckets = (Gci_container*)(m_active_gci.getBase());
1783 newbucket:
1784 Gci_container* bucket = buckets + pos;
1785 bucket->m_gci = gci;
1786 bucket->m_gcp_complete_rep_count = m_total_buckets;
1787
1788 Uint32 mask = m_known_gci.size() - 1;
1789 Uint64 * array = m_known_gci.getBase();
1790
1791 Uint32 minpos = m_min_gci_index;
1792 Uint32 maxpos = m_max_gci_index;
1793 bool full = ((maxpos + 1) & mask) == minpos;
1794 if (unlikely(full))
1795 {
1796 resize_known_gci();
1797 minpos = m_min_gci_index;
1798 maxpos = m_max_gci_index;
1799 mask = m_known_gci.size() - 1;
1800 array = m_known_gci.getBase();
1801 }
1802
1803 Uint32 maxindex = (maxpos - 1) & mask;
1804 Uint32 newmaxpos = (maxpos + 1) & mask;
1805 m_max_gci_index = newmaxpos;
1806 if (likely(minpos == maxpos || gci > array[maxindex]))
1807 {
1808 array[maxpos] = gci;
1809 #ifdef VM_TRACE
1810 verify_known_gci(false);
1811 #endif
1812 return bucket;
1813 }
1814
1815 for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
1816 {
1817 if (array[pos] > gci)
1818 break;
1819 }
1820
1821 if (0)
1822 ndbout_c("insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
1823 Uint32(gci >> 32),
1824 Uint32(gci),
1825 Uint32(array[maxindex] >> 32),
1826 Uint32(array[maxindex]),
1827 pos,
1828 m_min_gci_index, m_max_gci_index);
1829
1830 assert(pos != maxpos);
1831 Uint64 oldgci;
1832 do {
1833 oldgci = array[pos];
1834 array[pos] = gci;
1835 gci = oldgci;
1836 pos = (pos + 1) & mask;
1837 } while (pos != maxpos);
1838 array[pos] = gci;
1839
1840 #ifdef VM_TRACE
1841 verify_known_gci(false);
1842 #endif
1843 return bucket;
1844 }
1845
1846 static
1847 void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container * bucket,const SubGcpCompleteRep * const rep,Uint32 buckets)1848 crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
1849 const SubGcpCompleteRep * const rep,
1850 Uint32 buckets)
1851 {
1852 Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1853
1854 ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
1855 ndbout_c("gci_hi: %u", rep->gci_hi);
1856 ndbout_c("gci_lo: %u", rep->gci_lo);
1857 ndbout_c("sender: %x", rep->senderRef);
1858 ndbout_c("count: %d", rep->gcp_complete_rep_count);
1859 ndbout_c("bucket count: %u", old_cnt);
1860 ndbout_c("total buckets: %u", buckets);
1861 abort();
1862 }
1863
1864 void
complete_bucket(Gci_container * bucket)1865 NdbEventBuffer::complete_bucket(Gci_container* bucket)
1866 {
1867 Uint64 gci = bucket->m_gci;
1868 Gci_container* buckets = (Gci_container*)m_active_gci.getBase();
1869
1870 if (0)
1871 ndbout_c("complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
1872 Uint32(bucket - buckets));
1873
1874 #ifdef VM_TRACE
1875 verify_known_gci(false);
1876 #endif
1877
1878 /**
1879 * Copy data
1880 */
1881 if(!bucket->m_data.is_empty())
1882 {
1883 #ifdef VM_TRACE
1884 assert(bucket->m_data.m_count);
1885 #endif
1886 m_complete_data.m_data.append_list(&bucket->m_data, gci);
1887 if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1888 {
1889 /*
1890 * Bucket marked as possibly missing data, probably due to
1891 * kernel running out of event_buffer during node failure.
1892 * Mark newly appended event list as inconsistent.
1893 */
1894 assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1895 m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1896 }
1897 }
1898 else // if (bucket->m_data.is_empty())
1899 {
1900 if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1901 {
1902 /*
1903 * Bucket marked as possibly missing data, probably due to
1904 * kernel running out of event_buffer during node failure
1905 * Bucket contained no data so we must add a dummy event list
1906 * as inconsistency marker.
1907 */
1908 EventBufData *dummy_data= alloc_data();
1909 EventBufData_list *dummy_event_list = new EventBufData_list;
1910 dummy_event_list->append_used_data(dummy_data);
1911 dummy_event_list->m_is_not_multi_list = true;
1912 m_complete_data.m_data.append_list(dummy_event_list, gci);
1913 assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1914 m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1915 }
1916 }
1917
1918 Uint32 minpos = m_min_gci_index;
1919 Uint32 mask = m_known_gci.size() - 1;
1920 assert((mask & (mask + 1)) == 0);
1921
1922 bzero(bucket, sizeof(Gci_container));
1923
1924 m_min_gci_index = (minpos + 1) & mask;
1925
1926 #ifdef VM_TRACE
1927 verify_known_gci(true);
1928 #endif
1929 }
1930
1931 void
execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,Uint32 len,int complete_cluster_failure)1932 NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,
1933 Uint32 len, int complete_cluster_failure)
1934 {
1935 Uint32 gci_hi = rep->gci_hi;
1936 Uint32 gci_lo = rep->gci_lo;
1937
1938 if (unlikely(len < SubGcpCompleteRep::SignalLength))
1939 {
1940 gci_lo = 0;
1941 }
1942
1943 const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
1944 if (gci > m_highest_sub_gcp_complete_GCI)
1945 m_highest_sub_gcp_complete_GCI = gci;
1946
1947 if (!complete_cluster_failure)
1948 {
1949 m_alive_node_bit_mask.set(refToNode(rep->senderRef));
1950
1951 if (unlikely(m_active_op_count == 0))
1952 {
1953 return;
1954 }
1955 }
1956
1957 DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
1958
1959 const Uint32 cnt= rep->gcp_complete_rep_count;
1960
1961 Gci_container *bucket = find_bucket(gci);
1962
1963 if (0)
1964 ndbout_c("execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
1965 Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
1966 rep->flags);
1967
1968 if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
1969 SubGcpCompleteRep::SUB_CNT)))
1970 {
1971 handle_change_nodegroup(rep);
1972 }
1973
1974 if (unlikely(bucket == 0))
1975 {
1976 /**
1977 * Already completed GCI...
1978 * Possible in case of resend during NF handling
1979 */
1980 #ifdef VM_TRACE
1981 Uint64 minGCI = m_known_gci[m_min_gci_index];
1982 ndbout_c("bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
1983 Uint32(gci >> 32), Uint32(gci),
1984 Uint32(minGCI >> 32), Uint32(minGCI),
1985 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
1986 ndbout << " complete: " << m_complete_data << endl;
1987 for(Uint32 i = 0; i<m_active_gci.size(); i++)
1988 {
1989 if (((Gci_container*)(&m_active_gci[i]))->m_gci)
1990 ndbout << i << " - " << m_active_gci[i] << endl;
1991 }
1992 #endif
1993 DBUG_VOID_RETURN_EVENT;
1994 }
1995
1996 if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
1997 {
1998 bucket->m_state = Gci_container::GC_INCONSISTENT;
1999 }
2000
2001 Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
2002 if(unlikely(old_cnt == ~(Uint32)0))
2003 {
2004 old_cnt = m_total_buckets;
2005 }
2006
2007 //assert(old_cnt >= cnt);
2008 if (unlikely(! (old_cnt >= cnt)))
2009 {
2010 crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_total_buckets);
2011 }
2012 bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2013
2014 if(old_cnt == cnt)
2015 {
2016 Uint64 minGCI = m_known_gci[m_min_gci_index];
2017 if(likely(minGCI == 0 || gci == minGCI))
2018 {
2019 do_complete:
2020 m_startup_hack = false;
2021 complete_bucket(bucket);
2022 m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
2023 reportStatus();
2024
2025 if(unlikely(m_latest_complete_GCI > gci))
2026 {
2027 complete_outof_order_gcis();
2028 }
2029
2030 // signal that somethings happened
2031
2032 NdbCondition_Signal(p_cond);
2033 }
2034 else
2035 {
2036 if (unlikely(m_startup_hack))
2037 {
2038 flushIncompleteEvents(gci);
2039 bucket = find_bucket(gci);
2040 assert(bucket);
2041 assert(bucket->m_gci == gci);
2042 goto do_complete;
2043 }
2044 /** out of order something */
2045 g_eventLogger->info("out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2046 (int)(bucket-(Gci_container*)m_active_gci.getBase()),
2047 Uint32(gci >> 32), Uint32(gci),
2048 Uint32(minGCI >> 32), Uint32(minGCI),
2049 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2050 bucket->m_state = Gci_container::GC_COMPLETE;
2051 bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
2052 m_latest_complete_GCI = gci;
2053 }
2054 }
2055
2056 DBUG_VOID_RETURN_EVENT;
2057 }
2058
2059 void
complete_outof_order_gcis()2060 NdbEventBuffer::complete_outof_order_gcis()
2061 {
2062 #ifdef VM_TRACE
2063 verify_known_gci(false);
2064 #endif
2065
2066 Uint64 * array = m_known_gci.getBase();
2067 Uint32 mask = m_known_gci.size() - 1;
2068 Uint32 minpos = m_min_gci_index;
2069 Uint32 maxpos = m_max_gci_index;
2070 Uint64 stop_gci = m_latest_complete_GCI;
2071
2072 Uint64 start_gci = array[minpos];
2073 g_eventLogger->info("complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2074 Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2075 Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2076
2077 assert(start_gci <= stop_gci);
2078 do
2079 {
2080 start_gci = array[minpos];
2081 Gci_container* bucket = find_bucket(start_gci);
2082 assert(bucket);
2083 assert(maxpos == m_max_gci_index);
2084 if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
2085 {
2086 #ifdef VM_TRACE
2087 verify_known_gci(false);
2088 #endif
2089 return;
2090 }
2091
2092 #ifdef VM_TRACE
2093 ndbout_c("complete_outof_order_gcis - completing %u/%u rows: %u",
2094 Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2095 #else
2096 ndbout_c("complete_outof_order_gcis - completing %u/%u",
2097 Uint32(start_gci >> 32), Uint32(start_gci));
2098 #endif
2099
2100 complete_bucket(bucket);
2101 m_latestGCI = m_complete_data.m_gci = start_gci;
2102
2103 #ifdef VM_TRACE
2104 verify_known_gci(true);
2105 #endif
2106 minpos = (minpos + 1) & mask;
2107 } while (start_gci != stop_gci);
2108 }
2109
2110 void
insert_event(NdbEventOperationImpl * impl,SubTableData & data,LinearSectionPtr * ptr,Uint32 & oid_ref)2111 NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
2112 SubTableData &data,
2113 LinearSectionPtr *ptr,
2114 Uint32 &oid_ref)
2115 {
2116 DBUG_PRINT("info", ("gci{hi/lo}: %u/%u", data.gci_hi, data.gci_lo));
2117 do
2118 {
2119 if (impl->m_stop_gci == ~Uint64(0))
2120 {
2121 oid_ref = impl->m_oid;
2122 insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2123 }
2124 NdbEventOperationImpl* blob_op = impl->theBlobOpList;
2125 while (blob_op != NULL)
2126 {
2127 if (blob_op->m_stop_gci == ~Uint64(0))
2128 {
2129 oid_ref = blob_op->m_oid;
2130 insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2131 }
2132 blob_op = blob_op->m_next;
2133 }
2134 } while((impl = impl->m_next));
2135 }
2136
2137 bool
find_max_known_gci(Uint64 * res) const2138 NdbEventBuffer::find_max_known_gci(Uint64 * res) const
2139 {
2140 const Uint64 * array = m_known_gci.getBase();
2141 Uint32 mask = m_known_gci.size() - 1;
2142 Uint32 minpos = m_min_gci_index;
2143 Uint32 maxpos = m_max_gci_index;
2144
2145 if (minpos == maxpos)
2146 return false;
2147
2148 if (res)
2149 {
2150 * res = array[(maxpos - 1) & mask];
2151 }
2152
2153 return true;
2154 }
2155
2156 void
handle_change_nodegroup(const SubGcpCompleteRep * rep)2157 NdbEventBuffer::handle_change_nodegroup(const SubGcpCompleteRep* rep)
2158 {
2159 Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2160 Uint32 cnt = (rep->flags >> 16);
2161 Uint64 * array = m_known_gci.getBase();
2162 Uint32 mask = m_known_gci.size() - 1;
2163 Uint32 minpos = m_min_gci_index;
2164 Uint32 maxpos = m_max_gci_index;
2165
2166 if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2167 {
2168 ndbout_c("handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2169 cnt, Uint32(gci >> 32), Uint32(gci));
2170
2171 Uint32 found = 0;
2172 Uint32 pos = minpos;
2173 for (; pos != maxpos; pos = (pos + 1) & mask)
2174 {
2175 if (array[pos] == gci)
2176 {
2177 Gci_container* tmp = find_bucket(array[pos]);
2178 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2179 {
2180 found = 1;
2181 ndbout_c(" - gci %u/%u already marked complete",
2182 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2183 break;
2184 }
2185 else
2186 {
2187 found = 2;
2188 ndbout_c(" - gci %u/%u marking (and increasing)",
2189 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2190 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2191 tmp->m_gcp_complete_rep_count += cnt;
2192 break;
2193 }
2194 }
2195 else
2196 {
2197 ndbout_c(" - ignore %u/%u",
2198 Uint32(array[pos] >> 32), Uint32(array[pos]));
2199 }
2200 }
2201
2202 if (found == 0)
2203 {
2204 ndbout_c(" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2205 return;
2206 }
2207
2208 if (found == 1)
2209 {
2210 return; // Nothing todo
2211 }
2212
2213 m_total_buckets += cnt;
2214
2215 pos = (pos + 1) & mask;
2216 for (; pos != maxpos; pos = (pos + 1) & mask)
2217 {
2218 assert(array[pos] > gci);
2219 Gci_container* tmp = find_bucket(array[pos]);
2220 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2221 tmp->m_gcp_complete_rep_count += cnt;
2222 ndbout_c(" - increasing cnt on %u/%u by %u",
2223 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2224 }
2225 }
2226 else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2227 {
2228 ndbout_c("handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2229 cnt, Uint32(gci >> 32), Uint32(gci));
2230
2231 Uint32 found = 0;
2232 Uint32 pos = minpos;
2233 for (; pos != maxpos; pos = (pos + 1) & mask)
2234 {
2235 if (array[pos] == gci)
2236 {
2237 Gci_container* tmp = find_bucket(array[pos]);
2238 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2239 {
2240 found = 1;
2241 ndbout_c(" - gci %u/%u already marked complete",
2242 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2243 break;
2244 }
2245 else
2246 {
2247 found = 2;
2248 ndbout_c(" - gci %u/%u marking",
2249 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2250 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2251 break;
2252 }
2253 }
2254 else
2255 {
2256 ndbout_c(" - ignore %u/%u",
2257 Uint32(array[pos] >> 32), Uint32(array[pos]));
2258 }
2259 }
2260
2261 if (found == 0)
2262 {
2263 ndbout_c(" - NOT FOUND");
2264 return;
2265 }
2266
2267 if (found == 1)
2268 {
2269 return; // Nothing todo
2270 }
2271
2272 m_total_buckets -= cnt;
2273
2274 pos = (pos + 1) & mask;
2275 for (; pos != maxpos; pos = (pos + 1) & mask)
2276 {
2277 assert(array[pos] > gci);
2278 Gci_container* tmp = find_bucket(array[pos]);
2279 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2280 tmp->m_gcp_complete_rep_count -= cnt;
2281 ndbout_c(" - decreasing cnt on %u/%u by %u to: %u",
2282 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2283 cnt,
2284 tmp->m_gcp_complete_rep_count);
2285 }
2286 }
2287 }
2288
2289 void
set_total_buckets(Uint32 cnt)2290 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2291 {
2292 if (m_total_buckets == cnt)
2293 return;
2294
2295 assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2296 m_total_buckets = cnt;
2297
2298 Uint64 * array = m_known_gci.getBase();
2299 Uint32 mask = m_known_gci.size() - 1;
2300 Uint32 minpos = m_min_gci_index;
2301 Uint32 maxpos = m_max_gci_index;
2302
2303 bool found = false;
2304 Uint32 pos = minpos;
2305 for (; pos != maxpos; pos = (pos + 1) & mask)
2306 {
2307 Gci_container* tmp = find_bucket(array[pos]);
2308 if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2309 {
2310 found = true;
2311 if (0)
2312 ndbout_c("set_total_buckets(%u) complete %u/%u",
2313 cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2314 tmp->m_gcp_complete_rep_count = 0;
2315 complete_bucket(tmp);
2316 }
2317 else
2318 {
2319 assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2320 tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2321 }
2322 }
2323 if (found)
2324 {
2325 NdbCondition_Signal(p_cond);
2326 }
2327 }
2328
2329 void
report_node_failure_completed(Uint32 node_id)2330 NdbEventBuffer::report_node_failure_completed(Uint32 node_id)
2331 {
2332 m_alive_node_bit_mask.clear(node_id);
2333
2334 NdbEventOperation* op= m_ndb->getEventOperation(0);
2335 if (op == 0)
2336 return;
2337
2338 DBUG_ENTER("NdbEventBuffer::report_node_failure_completed");
2339 SubTableData data;
2340 LinearSectionPtr ptr[3];
2341 bzero(&data, sizeof(data));
2342 bzero(ptr, sizeof(ptr));
2343
2344 data.tableId = ~0;
2345 data.requestInfo = 0;
2346 SubTableData::setOperation(data.requestInfo,
2347 NdbDictionary::Event::_TE_NODE_FAILURE);
2348 SubTableData::setReqNodeId(data.requestInfo, node_id);
2349 SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2350 data.flags = SubTableData::LOG;
2351
2352 Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2353 find_max_known_gci(&gci);
2354
2355 data.gci_hi = Uint32(gci >> 32);
2356 data.gci_lo = Uint32(gci);
2357
2358 /**
2359 * Insert this event for each operation
2360 */
2361 // no need to lock()/unlock(), receive thread calls this
2362 insert_event(&op->m_impl, data, ptr, data.senderData);
2363
2364 if (!m_alive_node_bit_mask.isclear())
2365 DBUG_VOID_RETURN;
2366
2367 /*
2368 * Cluster failure
2369 */
2370
2371 DBUG_PRINT("info", ("Cluster failure"));
2372
2373 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2374 bool found = find_max_known_gci(&gci);
2375
2376 Uint64 * array = m_known_gci.getBase();
2377 Uint32 mask = m_known_gci.size() - 1;
2378 Uint32 minpos = m_min_gci_index;
2379 Uint32 maxpos = m_max_gci_index;
2380
2381 while (minpos != maxpos && array[minpos] != gci)
2382 {
2383 Gci_container* tmp = find_bucket(array[minpos]);
2384 assert(tmp);
2385 assert(maxpos == m_max_gci_index);
2386
2387 if(!tmp->m_data.is_empty())
2388 {
2389 free_list(tmp->m_data);
2390 }
2391 tmp->~Gci_container();
2392 bzero(tmp, sizeof(Gci_container));
2393
2394 minpos = (minpos + 1) & mask;
2395 }
2396 m_min_gci_index = minpos;
2397 if (found)
2398 {
2399 assert(((minpos + 1) & mask) == maxpos);
2400 }
2401 else
2402 {
2403 assert(minpos == maxpos);
2404 }
2405
2406 /**
2407 * Inject new event
2408 */
2409 data.tableId = ~0;
2410 data.requestInfo = 0;
2411 SubTableData::setOperation(data.requestInfo,
2412 NdbDictionary::Event::_TE_CLUSTER_FAILURE);
2413
2414 /**
2415 * Insert this event for each operation
2416 */
2417 // no need to lock()/unlock(), receive thread calls this
2418 insert_event(&op->m_impl, data, ptr, data.senderData);
2419
2420 #ifdef VM_TRACE
2421 m_flush_gci = 0;
2422 #endif
2423
2424 /**
2425 * And finally complete this GCI
2426 */
2427 Gci_container* tmp = find_bucket(gci);
2428 assert(tmp);
2429 if (found)
2430 {
2431 assert(m_max_gci_index == maxpos); // shouldnt have changed...
2432 }
2433 else
2434 {
2435 assert(m_max_gci_index == ((maxpos + 1) & mask));
2436 }
2437 Uint32 cnt = tmp->m_gcp_complete_rep_count;
2438
2439 SubGcpCompleteRep rep;
2440 rep.gci_hi= (Uint32)(gci >> 32);
2441 rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
2442 rep.gcp_complete_rep_count= cnt;
2443 rep.flags = 0;
2444 execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength, 1);
2445
2446 DBUG_VOID_RETURN;
2447 }
2448
2449 Uint64
getLatestGCI()2450 NdbEventBuffer::getLatestGCI()
2451 {
2452 return m_latestGCI;
2453 }
2454
2455 int
insertDataL(NdbEventOperationImpl * op,const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3])2456 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
2457 const SubTableData * const sdata,
2458 Uint32 len,
2459 LinearSectionPtr ptr[3])
2460 {
2461 DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
2462 const Uint32 ri = sdata->requestInfo;
2463 const Uint32 operation = SubTableData::getOperation(ri);
2464 Uint32 gci_hi = sdata->gci_hi;
2465 Uint32 gci_lo = sdata->gci_lo;
2466
2467 if (unlikely(len < SubTableData::SignalLength))
2468 {
2469 gci_lo = 0;
2470 }
2471
2472 Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2473 const bool is_data_event =
2474 operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
2475
2476 if (!is_data_event)
2477 {
2478 if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
2479 {
2480 /*
2481 Mark event as stopping. Subsequent dropEventOperation
2482 will add the event to the dropped list for delete
2483 */
2484 op->m_stop_gci = gci;
2485 }
2486 else if (operation == NdbDictionary::Event::_TE_ACTIVE)
2487 {
2488 // internal event, do not relay to user
2489 DBUG_PRINT("info",
2490 ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
2491 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2492 DBUG_RETURN_EVENT(0);
2493 }
2494 else if (operation == NdbDictionary::Event::_TE_STOP)
2495 {
2496 // internal event, do not relay to user
2497 DBUG_PRINT("info",
2498 ("_TE_STOP: m_ref_count: %u for op: %p id: %u",
2499 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2500 DBUG_RETURN_EVENT(0);
2501 }
2502 }
2503
2504 if ( likely((Uint32)op->mi_type & (1 << operation)))
2505 {
2506 Gci_container* bucket= find_bucket(gci);
2507
2508 DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId));
2509 DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
2510 sdata->gci, sdata->tableId,
2511 SubTableData::getOperation(sdata->requestInfo),
2512 SubTableData::getReqNodeId(sdata->requestInfo)));
2513
2514 if (unlikely(bucket == 0))
2515 {
2516 /**
2517 * Already completed GCI...
2518 * Possible in case of resend during NF handling
2519 */
2520 DBUG_RETURN_EVENT(0);
2521 }
2522
2523 const bool is_blob_event = (op->theMainOp != NULL);
2524 const bool use_hash = op->m_mergeEvents && is_data_event;
2525
2526 if (! is_data_event && is_blob_event)
2527 {
2528 // currently subscribed to but not used
2529 DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table"));
2530 DBUG_RETURN_EVENT(0);
2531 }
2532
2533 // find position in bucket hash table
2534 EventBufData* data = 0;
2535 EventBufData_hash::Pos hpos;
2536 if (use_hash)
2537 {
2538 bucket->m_data_hash.search(hpos, op, ptr);
2539 data = hpos.data;
2540 }
2541
2542 if (data == 0)
2543 {
2544 // allocate new result buffer
2545 data = alloc_data();
2546 if (unlikely(data == 0))
2547 {
2548 op->m_has_error = 2;
2549 DBUG_RETURN_EVENT(-1);
2550 }
2551 if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
2552 {
2553 op->m_has_error = 3;
2554 DBUG_RETURN_EVENT(-1);
2555 }
2556 data->m_event_op = op;
2557 if (! is_blob_event || ! is_data_event)
2558 {
2559 bucket->m_data.append_data(data);
2560 }
2561 else
2562 {
2563 // find or create main event for this blob event
2564 EventBufData_hash::Pos main_hpos;
2565 int ret = get_main_data(bucket, main_hpos, data);
2566 if (ret == -1)
2567 {
2568 op->m_has_error = 4;
2569 DBUG_RETURN_EVENT(-1);
2570 }
2571 EventBufData* main_data = main_hpos.data;
2572 if (ret != 0) // main event was created
2573 {
2574 main_data->m_event_op = op->theMainOp;
2575 bucket->m_data.append_data(main_data);
2576 if (use_hash)
2577 {
2578 main_data->m_pkhash = main_hpos.pkhash;
2579 bucket->m_data_hash.append(main_hpos, main_data);
2580 }
2581 }
2582 // link blob event under main event
2583 add_blob_data(bucket, main_data, data);
2584 }
2585 if (use_hash)
2586 {
2587 data->m_pkhash = hpos.pkhash;
2588 bucket->m_data_hash.append(hpos, data);
2589 }
2590 #ifdef VM_TRACE
2591 op->m_data_count++;
2592 #endif
2593 }
2594 else
2595 {
2596 // event with same op, PK found, merge into old buffer
2597 if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
2598 {
2599 op->m_has_error = 3;
2600 DBUG_RETURN_EVENT(-1);
2601 }
2602 // merge is on so we do not report blob part events
2603 if (! is_blob_event) {
2604 // report actual operation and the composite
2605 // there is no way to "fix" the flags for a composite op
2606 // since the flags represent multiple ops on multiple PKs
2607 // XXX fix by doing merge at end of epoch (extra mem cost)
2608 {
2609 EventBufData_list::Gci_op g = { op, (1 << operation) };
2610 bucket->m_data.add_gci_op(g);
2611 }
2612 {
2613 EventBufData_list::Gci_op
2614 g = { op,
2615 (1 << SubTableData::getOperation(data->sdata->requestInfo))};
2616 bucket->m_data.add_gci_op(g);
2617 }
2618 }
2619 }
2620 #ifdef NDB_EVENT_VERIFY_SIZE
2621 verify_size(bucket->m_data);
2622 #endif
2623 DBUG_RETURN_EVENT(0);
2624 }
2625
2626 #ifdef VM_TRACE
2627 if ((Uint32)op->m_eventImpl->mi_type & (1 << operation))
2628 {
2629 DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
2630 DBUG_RETURN_EVENT(0);
2631 }
2632 else {
2633 DBUG_PRINT_EVENT("info",("skipped"));
2634 DBUG_RETURN_EVENT(0);
2635 }
2636 #else
2637 DBUG_RETURN_EVENT(0);
2638 #endif
2639 }
2640
2641 // allocate EventBufData
2642 EventBufData*
alloc_data()2643 NdbEventBuffer::alloc_data()
2644 {
2645 DBUG_ENTER_EVENT("alloc_data");
2646 EventBufData* data = m_free_data;
2647
2648 if (unlikely(data == 0))
2649 {
2650 #ifdef VM_TRACE
2651 assert(m_free_data_count == 0);
2652 assert(m_free_data_sz == 0);
2653 #endif
2654 expand(4000);
2655 reportStatus();
2656
2657 data = m_free_data;
2658 if (unlikely(data == 0))
2659 {
2660 #ifdef VM_TRACE
2661 printf("m_latest_command: %s\n", m_latest_command);
2662 printf("no free data, m_latestGCI %u/%u\n",
2663 (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
2664 printf("m_free_data_count %d\n", m_free_data_count);
2665 printf("m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
2666 m_available_data.m_count,
2667 m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
2668 m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
2669 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
2670 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
2671 printf("m_used_data_count %d\n", m_used_data.m_count);
2672 #endif
2673 DBUG_RETURN_EVENT(0); // TODO handle this, overrun, or, skip?
2674 }
2675 }
2676
2677 // remove data from free list
2678 if (data->m_next_blob == 0)
2679 m_free_data = data->m_next;
2680 else {
2681 EventBufData* data2 = data->m_next_blob;
2682 if (data2->m_next == 0) {
2683 data->m_next_blob = data2->m_next_blob;
2684 data = data2;
2685 } else {
2686 EventBufData* data3 = data2->m_next;
2687 data2->m_next = data3->m_next;
2688 data = data3;
2689 }
2690 }
2691 data->m_next = 0;
2692 data->m_next_blob = 0;
2693 #ifdef VM_TRACE
2694 m_free_data_count--;
2695 assert(m_free_data_sz >= data->sz);
2696 #endif
2697 m_free_data_sz -= data->sz;
2698 DBUG_RETURN_EVENT(data);
2699 }
2700
2701 // allocate initial or bigger memory area in EventBufData
2702 // takes sizes from given ptr and sets up data->ptr
2703 int
alloc_mem(EventBufData * data,LinearSectionPtr ptr[3],Uint32 * change_sz)2704 NdbEventBuffer::alloc_mem(EventBufData* data,
2705 LinearSectionPtr ptr[3],
2706 Uint32 * change_sz)
2707 {
2708 DBUG_ENTER("NdbEventBuffer::alloc_mem");
2709 DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
2710 const Uint32 min_alloc_size = 128;
2711
2712 Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
2713 Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
2714 if (alloc_size < min_alloc_size)
2715 alloc_size = min_alloc_size;
2716
2717 if (data->sz < alloc_size)
2718 {
2719 Uint32 add_sz = alloc_size - data->sz;
2720
2721 NdbMem_Free((char*)data->memory);
2722 assert(m_total_alloc >= data->sz);
2723 data->memory = 0;
2724 data->sz = 0;
2725
2726 data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
2727 if (data->memory == 0)
2728 {
2729 m_total_alloc -= data->sz;
2730 DBUG_RETURN(-1);
2731 }
2732 data->sz = alloc_size;
2733 m_total_alloc += add_sz;
2734
2735 if (change_sz != NULL)
2736 *change_sz += add_sz;
2737 }
2738
2739 Uint32* memptr = data->memory;
2740 memptr += sz4;
2741 int i;
2742 for (i = 0; i <= 2; i++)
2743 {
2744 data->ptr[i].p = memptr;
2745 data->ptr[i].sz = ptr[i].sz;
2746 memptr += ptr[i].sz;
2747 }
2748
2749 DBUG_RETURN(0);
2750 }
2751
2752 void
dealloc_mem(EventBufData * data,Uint32 * change_sz)2753 NdbEventBuffer::dealloc_mem(EventBufData* data,
2754 Uint32 * change_sz)
2755 {
2756 NdbMem_Free((char*)data->memory);
2757 assert(m_total_alloc >= data->sz);
2758 m_total_alloc -= data->sz;
2759 if (change_sz != NULL) {
2760 assert(*change_sz >= data->sz);
2761 *change_sz -= data->sz;
2762 }
2763 data->memory = 0;
2764 data->sz = 0;
2765 }
2766
2767 int
copy_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr[3],EventBufData * data,Uint32 * change_sz)2768 NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
2769 LinearSectionPtr ptr[3],
2770 EventBufData* data,
2771 Uint32 * change_sz)
2772 {
2773 DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
2774
2775 if (alloc_mem(data, ptr, change_sz) != 0)
2776 DBUG_RETURN_EVENT(-1);
2777 memcpy(data->sdata, sdata, sizeof(SubTableData));
2778
2779 if (unlikely(len < SubTableData::SignalLength))
2780 {
2781 data->sdata->gci_lo = 0;
2782 }
2783 if (len < SubTableData::SignalLengthWithTransId)
2784 {
2785 /* No TransId, set to uninit value */
2786 data->sdata->transId1 = ~Uint32(0);
2787 data->sdata->transId2 = ~Uint32(0);
2788 }
2789
2790 int i;
2791 for (i = 0; i <= 2; i++)
2792 memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
2793 DBUG_RETURN_EVENT(0);
2794 }
2795
2796 static struct Ev_t {
2797 enum {
2798 enum_INS = NdbDictionary::Event::_TE_INSERT,
2799 enum_DEL = NdbDictionary::Event::_TE_DELETE,
2800 enum_UPD = NdbDictionary::Event::_TE_UPDATE,
2801 enum_NUL = NdbDictionary::Event::_TE_NUL,
2802 enum_IDM = 254, // idempotent op possibly allowed on NF
2803 enum_ERR = 255 // always impossible
2804 };
2805 int t1, t2, t3;
2806 } ev_t[] = {
2807 { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
2808 { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok
2809 { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok
2810 { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok
2811 { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
2812 { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
2813 { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
2814 { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok
2815 { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD } //ok
2816 };
2817
2818 /*
2819 * | INS | DEL | UPD
2820 * 0 | pk ah + all ah | pk ah | pk ah + new ah
2821 * 1 | pk ad + all ad | old pk ad | new pk ad + new ad
2822 * 2 | empty | old non-pk ah+ad | old ah+ad
2823 */
2824
2825 static AttributeHeader
copy_head(Uint32 & i1,Uint32 * p1,Uint32 & i2,const Uint32 * p2,Uint32 flags)2826 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
2827 Uint32 flags)
2828 {
2829 AttributeHeader ah(p2[i2]);
2830 bool do_copy = (flags & 1);
2831 if (do_copy)
2832 p1[i1] = p2[i2];
2833 i1++;
2834 i2++;
2835 return ah;
2836 }
2837
2838 static void
copy_attr(AttributeHeader ah,Uint32 & j1,Uint32 * p1,Uint32 & j2,const Uint32 * p2,Uint32 flags)2839 copy_attr(AttributeHeader ah,
2840 Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
2841 Uint32 flags)
2842 {
2843 bool do_copy = (flags & 1);
2844 bool with_head = (flags & 2);
2845 Uint32 n = with_head + ah.getDataSize();
2846 if (do_copy)
2847 {
2848 Uint32 k;
2849 for (k = 0; k < n; k++)
2850 p1[j1 + k] = p2[j2 + k];
2851 }
2852 j1 += n;
2853 j2 += n;
2854 }
2855
2856 int
merge_data(const SubTableData * const sdata,Uint32 len,LinearSectionPtr ptr2[3],EventBufData * data,Uint32 * change_sz)2857 NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
2858 LinearSectionPtr ptr2[3],
2859 EventBufData* data,
2860 Uint32 * change_sz)
2861 {
2862 DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
2863
2864 /* TODO : Consider how/if to merge multiple events/key with different
2865 * transid
2866 * Same consideration probably applies to AnyValue!
2867 */
2868
2869 Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
2870
2871 int t1 = SubTableData::getOperation(data->sdata->requestInfo);
2872 int t2 = SubTableData::getOperation(sdata->requestInfo);
2873 if (t1 == Ev_t::enum_NUL)
2874 DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
2875
2876 Ev_t* tp = 0;
2877 int i;
2878 for (i = 0; (uint) i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
2879 if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
2880 tp = &ev_t[i];
2881 break;
2882 }
2883 }
2884 assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
2885
2886 if (tp->t3 == Ev_t::enum_IDM) {
2887 LinearSectionPtr (&ptr1)[3] = data->ptr;
2888
2889 /*
2890 * TODO
2891 * - can get data in INS ptr2[2] which is supposed to be empty
2892 * - can get extra data in DEL ptr2[2]
2893 * - why does DBUG_PRINT not work in this file ???
2894 *
2895 * replication + bug#19872 can ignore this since merge is on
2896 * only for tables with explicit PK and before data is not used
2897 */
2898 const int maxsec = 1; // ignore section 2
2899
2900 int i;
2901 for (i = 0; i <= maxsec; i++) {
2902 if (ptr1[i].sz != ptr2[i].sz ||
2903 memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
2904 DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d",
2905 tp->t1, tp->t2, i));
2906 assert(false);
2907 DBUG_RETURN_EVENT(-1);
2908 }
2909 }
2910 DBUG_PRINT("info", ("idempotent op %d*%d data ok", tp->t1, tp->t2));
2911 DBUG_RETURN_EVENT(0);
2912 }
2913
2914 // TODO: use old data items, avoid malloc/free on each merge
2915
2916 // save old data
2917 EventBufData olddata = *data;
2918 data->memory = 0;
2919 data->sz = 0;
2920
2921 // compose ptr1 o ptr2 = ptr
2922 LinearSectionPtr (&ptr1)[3] = olddata.ptr;
2923 LinearSectionPtr (&ptr)[3] = data->ptr;
2924
2925 // loop twice where first loop only sets sizes
2926 int loop;
2927 int result = 0;
2928 for (loop = 0; loop <= 1; loop++)
2929 {
2930 if (loop == 1)
2931 {
2932 if (alloc_mem(data, ptr, change_sz) != 0)
2933 {
2934 result = -1;
2935 goto end;
2936 }
2937 *data->sdata = *sdata;
2938 SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
2939 }
2940
2941 ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
2942
2943 // copy pk from new version
2944 {
2945 AttributeHeader ah;
2946 Uint32 i = 0;
2947 Uint32 j = 0;
2948 Uint32 i2 = 0;
2949 Uint32 j2 = 0;
2950 while (i < nkey)
2951 {
2952 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2953 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
2954 }
2955 ptr[0].sz = i;
2956 ptr[1].sz = j;
2957 }
2958
2959 // merge after values, new version overrides
2960 if (tp->t3 != Ev_t::enum_DEL)
2961 {
2962 AttributeHeader ah;
2963 Uint32 i = ptr[0].sz;
2964 Uint32 j = ptr[1].sz;
2965 Uint32 i1 = 0;
2966 Uint32 j1 = 0;
2967 Uint32 i2 = nkey;
2968 Uint32 j2 = ptr[1].sz;
2969 while (i1 < nkey)
2970 {
2971 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2972 }
2973 while (1)
2974 {
2975 bool b1 = (i1 < ptr1[0].sz);
2976 bool b2 = (i2 < ptr2[0].sz);
2977 if (b1 && b2)
2978 {
2979 Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
2980 Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
2981 if (id1 < id2)
2982 b2 = false;
2983 else if (id1 > id2)
2984 b1 = false;
2985 else
2986 {
2987 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2988 b1 = false;
2989 }
2990 }
2991 if (b1)
2992 {
2993 ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
2994 copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
2995 }
2996 else if (b2)
2997 {
2998 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2999 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3000 }
3001 else
3002 break;
3003 }
3004 ptr[0].sz = i;
3005 ptr[1].sz = j;
3006 }
3007
3008 // merge before values, old version overrides
3009 if (tp->t3 != Ev_t::enum_INS)
3010 {
3011 AttributeHeader ah;
3012 Uint32 k = 0;
3013 Uint32 k1 = 0;
3014 Uint32 k2 = 0;
3015 while (1)
3016 {
3017 bool b1 = (k1 < ptr1[2].sz);
3018 bool b2 = (k2 < ptr2[2].sz);
3019 if (b1 && b2)
3020 {
3021 Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
3022 Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
3023 if (id1 < id2)
3024 b2 = false;
3025 else if (id1 > id2)
3026 b1 = false;
3027 else
3028 {
3029 k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
3030 b2 = false;
3031 }
3032 }
3033 if (b1)
3034 {
3035 ah = AttributeHeader(ptr1[2].p[k1]);
3036 copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3037 }
3038 else if (b2)
3039 {
3040 ah = AttributeHeader(ptr2[2].p[k2]);
3041 copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3042 }
3043 else
3044 break;
3045 }
3046 ptr[2].sz = k;
3047 }
3048 }
3049
3050 end:
3051 dealloc_mem(&olddata, change_sz);
3052 DBUG_RETURN_EVENT(result);
3053 }
3054
3055 /*
3056 * Given blob part event, find main table event on inline part. It
3057 * should exist (force in TUP) but may arrive later. If so, create
3058 * NUL event on main table. The real event replaces it later.
3059 */
3060
3061 int
get_main_data(Gci_container * bucket,EventBufData_hash::Pos & hpos,EventBufData * blob_data)3062 NdbEventBuffer::get_main_data(Gci_container* bucket,
3063 EventBufData_hash::Pos& hpos,
3064 EventBufData* blob_data)
3065 {
3066 DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
3067
3068 int blobVersion = blob_data->m_event_op->theBlobVersion;
3069 assert(blobVersion == 1 || blobVersion == 2);
3070
3071 NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
3072 assert(main_op != NULL);
3073 const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3074
3075 // create LinearSectionPtr for main table key
3076 LinearSectionPtr ptr[3];
3077
3078 Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3079 Uint32* pk_data = blob_data->ptr[1].p;
3080 Uint32 pk_size = 0;
3081
3082 if (unlikely(blobVersion == 1)) {
3083 /*
3084 * Blob PK attribute 0 is concatenated table PK null padded
3085 * to fixed maximum size. The actual size and attributes of
3086 * table PK must be discovered.
3087 */
3088 Uint32 max_size = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3089
3090 Uint32 sz = 0; // words parsed so far
3091 Uint32 n = 0;
3092 Uint32 i;
3093 for (i = 0; n < mainTable->m_noOfKeys; i++) {
3094 const NdbColumnImpl* c = mainTable->getColumn(i);
3095 assert(c != NULL);
3096 if (! c->m_pk)
3097 continue;
3098
3099 Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3100 Uint32 lb, len;
3101 assert(sz < max_size);
3102 bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_data[sz],
3103 bytesize, lb, len);
3104 assert(ok);
3105
3106 AttributeHeader ah(i, lb + len);
3107 pk_ah[n] = ah.m_value;
3108 sz += ah.getDataSize();
3109 n++;
3110 }
3111 assert(n == mainTable->m_noOfKeys);
3112 assert(sz <= max_size);
3113 pk_size = sz;
3114 } else {
3115 /*
3116 * Blob PK starts with separate table PKs. Total size must be
3117 * counted and blob attribute ids changed to table attribute ids.
3118 */
3119 Uint32 sz = 0; // count size
3120 Uint32 n = 0;
3121 Uint32 i;
3122 for (i = 0; n < mainTable->m_noOfKeys; i++) {
3123 const NdbColumnImpl* c = mainTable->getColumn(i);
3124 assert(c != NULL);
3125 if (! c->m_pk)
3126 continue;
3127
3128 AttributeHeader ah(blob_data->ptr[0].p[n]);
3129 ah.setAttributeId(i);
3130 pk_ah[n] = ah.m_value;
3131 sz += ah.getDataSize();
3132 n++;
3133 }
3134 assert(n == mainTable->m_noOfKeys);
3135 pk_size = sz;
3136 }
3137
3138 ptr[0].sz = mainTable->m_noOfKeys;
3139 ptr[0].p = pk_ah;
3140 ptr[1].sz = pk_size;
3141 ptr[1].p = pk_data;
3142 ptr[2].sz = 0;
3143 ptr[2].p = 0;
3144
3145 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3146 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3147
3148 // search for main event buffer
3149 bucket->m_data_hash.search(hpos, main_op, ptr);
3150 if (hpos.data != NULL)
3151 DBUG_RETURN_EVENT(0);
3152
3153 // not found, create a place-holder
3154 EventBufData* main_data = alloc_data();
3155 if (main_data == NULL)
3156 DBUG_RETURN_EVENT(-1);
3157 SubTableData sdata = *blob_data->sdata;
3158 sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3159 SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3160 if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3161 DBUG_RETURN_EVENT(-1);
3162 hpos.data = main_data;
3163
3164 DBUG_RETURN_EVENT(1);
3165 }
3166
3167 void
add_blob_data(Gci_container * bucket,EventBufData * main_data,EventBufData * blob_data)3168 NdbEventBuffer::add_blob_data(Gci_container* bucket,
3169 EventBufData* main_data,
3170 EventBufData* blob_data)
3171 {
3172 DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
3173 DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data));
3174 EventBufData* head;
3175 head = main_data->m_next_blob;
3176 while (head != NULL)
3177 {
3178 if (head->m_event_op == blob_data->m_event_op)
3179 break;
3180 head = head->m_next_blob;
3181 }
3182 if (head == NULL)
3183 {
3184 head = blob_data;
3185 head->m_next_blob = main_data->m_next_blob;
3186 main_data->m_next_blob = head;
3187 }
3188 else
3189 {
3190 blob_data->m_next = head->m_next;
3191 head->m_next = blob_data;
3192 }
3193 // adjust data list size
3194 bucket->m_data.m_count += 1;
3195 bucket->m_data.m_sz += blob_data->sz;
3196 DBUG_VOID_RETURN_EVENT;
3197 }
3198
3199 NdbEventOperationImpl *
move_data()3200 NdbEventBuffer::move_data()
3201 {
3202 // handle received data
3203 if (!m_complete_data.m_data.is_empty())
3204 {
3205 // move this list to last in m_available_data
3206 m_available_data.append_list(&m_complete_data.m_data, 0);
3207
3208 bzero(&m_complete_data, sizeof(m_complete_data));
3209 }
3210
3211 // handle used data
3212 if (!m_used_data.is_empty())
3213 {
3214 // return m_used_data to m_free_data
3215 free_list(m_used_data);
3216 }
3217 if (!m_available_data.is_empty())
3218 {
3219 DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
3220 #ifdef VM_TRACE
3221 DBUG_PRINT_EVENT("exit",("m_available_data_count %u", m_available_data.m_count));
3222 #endif
3223 DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3224 }
3225 return 0;
3226 }
3227
3228 void
free_list(EventBufData_list & list)3229 NdbEventBuffer::free_list(EventBufData_list &list)
3230 {
3231 #ifdef NDB_EVENT_VERIFY_SIZE
3232 verify_size(list);
3233 #endif
3234 // return list to m_free_data
3235 list.m_tail->m_next= m_free_data;
3236 m_free_data= list.m_head;
3237 #ifdef VM_TRACE
3238 m_free_data_count+= list.m_count;
3239 #endif
3240 m_free_data_sz+= list.m_sz;
3241
3242 list.m_head = list.m_tail = NULL;
3243 list.m_count = list.m_sz = 0;
3244 }
3245
append_list(EventBufData_list * list,Uint64 gci)3246 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
3247 {
3248 #ifdef NDB_EVENT_VERIFY_SIZE
3249 NdbEventBuffer::verify_size(*list);
3250 #endif
3251 move_gci_ops(list, gci);
3252
3253 if (m_tail)
3254 m_tail->m_next= list->m_head;
3255 else
3256 m_head= list->m_head;
3257 m_tail= list->m_tail;
3258 m_count+= list->m_count;
3259 m_sz+= list->m_sz;
3260 }
3261
3262 void
add_gci_op(Gci_op g)3263 EventBufData_list::add_gci_op(Gci_op g)
3264 {
3265 DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
3266 DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types));
3267 assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
3268 Uint32 i;
3269 for (i = 0; i < m_gci_op_count; i++) {
3270 if (m_gci_op_list[i].op == g.op)
3271 break;
3272 }
3273 if (i < m_gci_op_count) {
3274 m_gci_op_list[i].event_types |= g.event_types;
3275 } else {
3276 if (m_gci_op_count == m_gci_op_alloc) {
3277 Uint32 n = 1 + 2 * m_gci_op_alloc;
3278 Gci_op* old_list = m_gci_op_list;
3279 m_gci_op_list = new Gci_op [n];
3280 if (m_gci_op_alloc != 0) {
3281 Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
3282 memcpy(m_gci_op_list, old_list, bytes);
3283 DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p",
3284 this, old_list));
3285 delete [] old_list;
3286 }
3287 else
3288 assert(old_list == 0);
3289 DBUG_PRINT_EVENT("info", ("this: %p new m_gci_op_list: %p",
3290 this, m_gci_op_list));
3291 m_gci_op_alloc = n;
3292 }
3293 assert(m_gci_op_count < m_gci_op_alloc);
3294 #ifndef DBUG_OFF
3295 i = m_gci_op_count;
3296 #endif
3297 m_gci_op_list[m_gci_op_count++] = g;
3298 }
3299 DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3300 DBUG_VOID_RETURN_EVENT;
3301 }
3302
3303 void
move_gci_ops(EventBufData_list * list,Uint64 gci)3304 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
3305 {
3306 DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops");
3307 DBUG_PRINT_EVENT("info", ("this: %p list: %p gci: %u/%u",
3308 this, list, (Uint32)(gci >> 32), (Uint32)gci));
3309 assert(!m_is_not_multi_list);
3310 if (!list->m_is_not_multi_list)
3311 {
3312 assert(gci == 0);
3313 if (m_gci_ops_list_tail)
3314 m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3315 else
3316 {
3317 m_gci_ops_list = list->m_gci_ops_list;
3318 }
3319 m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3320 goto end;
3321 }
3322 {
3323 Gci_ops *new_gci_ops = new Gci_ops;
3324 DBUG_PRINT_EVENT("info", ("this: %p m_gci_op_list: %p",
3325 new_gci_ops, list->m_gci_op_list));
3326 if (m_gci_ops_list_tail)
3327 m_gci_ops_list_tail->m_next = new_gci_ops;
3328 else
3329 {
3330 assert(m_gci_ops_list == 0);
3331 m_gci_ops_list = new_gci_ops;
3332 }
3333 m_gci_ops_list_tail = new_gci_ops;
3334
3335 new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3336 new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3337 new_gci_ops->m_gci = gci;
3338 new_gci_ops->m_next = 0;
3339 }
3340 end:
3341 list->m_gci_op_list = 0;
3342 list->m_gci_ops_list_tail = 0;
3343 list->m_gci_op_alloc = 0;
3344 DBUG_VOID_RETURN_EVENT;
3345 }
3346
3347 NdbEventOperation*
createEventOperation(const char * eventName,NdbError & theError)3348 NdbEventBuffer::createEventOperation(const char* eventName,
3349 NdbError &theError)
3350 {
3351 DBUG_ENTER("NdbEventBuffer::createEventOperation");
3352 NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
3353 if (tOp == 0)
3354 {
3355 theError.code= 4000;
3356 DBUG_RETURN(NULL);
3357 }
3358 if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3359 theError.code= tOp->getNdbError().code;
3360 delete tOp;
3361 DBUG_RETURN(NULL);
3362 }
3363 // add user reference
3364 // removed in dropEventOperation
3365 getEventOperationImpl(tOp)->m_ref_count = 1;
3366 DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
3367 getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
3368 DBUG_RETURN(tOp);
3369 }
3370
3371 NdbEventOperationImpl*
createEventOperationImpl(NdbEventImpl & evnt,NdbError & theError)3372 NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
3373 NdbError &theError)
3374 {
3375 DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
3376 NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
3377 if (tOp == 0)
3378 {
3379 theError.code= 4000;
3380 DBUG_RETURN(NULL);
3381 }
3382 if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3383 theError.code= tOp->getNdbError().code;
3384 delete tOp;
3385 DBUG_RETURN(NULL);
3386 }
3387 DBUG_RETURN(tOp);
3388 }
3389
3390 void
dropEventOperation(NdbEventOperation * tOp)3391 NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
3392 {
3393 DBUG_ENTER("NdbEventBuffer::dropEventOperation");
3394 NdbEventOperationImpl* op= getEventOperationImpl(tOp);
3395
3396 op->stop();
3397 // stop blob event ops
3398 if (op->theMainOp == NULL)
3399 {
3400 Uint64 max_stop_gci = op->m_stop_gci;
3401 NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
3402 while (tBlobOp != NULL)
3403 {
3404 tBlobOp->stop();
3405 Uint64 stop_gci = tBlobOp->m_stop_gci;
3406 if (stop_gci > max_stop_gci)
3407 max_stop_gci = stop_gci;
3408 tBlobOp = tBlobOp->m_next;
3409 }
3410 tBlobOp = op->theBlobOpList;
3411 while (tBlobOp != NULL)
3412 {
3413 tBlobOp->m_stop_gci = max_stop_gci;
3414 tBlobOp = tBlobOp->m_next;
3415 }
3416 op->m_stop_gci = max_stop_gci;
3417 }
3418
3419 /**
3420 * Needs mutex lock as report_node_XXX accesses list...
3421 */
3422 NdbMutex_Lock(m_mutex);
3423
3424 // release blob handles now, further access is user error
3425 if (op->theMainOp == NULL)
3426 {
3427 while (op->theBlobList != NULL)
3428 {
3429 NdbBlob* tBlob = op->theBlobList;
3430 op->theBlobList = tBlob->theNext;
3431 m_ndb->releaseNdbBlob(tBlob);
3432 }
3433 }
3434
3435 if (op->m_next)
3436 op->m_next->m_prev= op->m_prev;
3437 if (op->m_prev)
3438 op->m_prev->m_next= op->m_next;
3439 else
3440 m_ndb->theImpl->m_ev_op= op->m_next;
3441
3442 assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
3443
3444 DBUG_ASSERT(op->m_ref_count > 0);
3445 // remove user reference
3446 // added in createEventOperation
3447 // user error to use reference after this
3448 op->m_ref_count--;
3449 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
3450 if (op->m_ref_count == 0)
3451 {
3452 NdbMutex_Unlock(m_mutex);
3453 DBUG_PRINT("info", ("deleting op: %p", op));
3454 delete op->m_facade;
3455 }
3456 else
3457 {
3458 op->m_next= m_dropped_ev_op;
3459 op->m_prev= 0;
3460 if (m_dropped_ev_op)
3461 m_dropped_ev_op->m_prev= op;
3462 m_dropped_ev_op= op;
3463
3464 NdbMutex_Unlock(m_mutex);
3465 }
3466 DBUG_VOID_RETURN;
3467 }
3468
3469 void
reportStatus()3470 NdbEventBuffer::reportStatus()
3471 {
3472 EventBufData *apply_buf= m_available_data.m_head;
3473 Uint64 apply_gci, latest_gci= m_latestGCI;
3474 if (apply_buf == 0)
3475 apply_buf= m_complete_data.m_data.m_head;
3476 if (apply_buf && apply_buf->sdata)
3477 {
3478 Uint32 gci_hi = apply_buf->sdata->gci_hi;
3479 Uint32 gci_lo = apply_buf->sdata->gci_lo;
3480 apply_gci= gci_lo | (Uint64(gci_hi) << 32);
3481 }
3482 else
3483 apply_gci= latest_gci;
3484
3485 if (m_free_thresh)
3486 {
3487 if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
3488 m_total_alloc > 1024*1024)
3489 {
3490 /* report less free buffer than m_free_thresh,
3491 next report when more free than 2 * m_free_thresh
3492 */
3493 m_min_free_thresh= 0;
3494 m_max_free_thresh= 2 * m_free_thresh;
3495 goto send_report;
3496 }
3497
3498 if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
3499 m_total_alloc > 1024*1024)
3500 {
3501 /* report more free than 2 * m_free_thresh
3502 next report when less free than m_free_thresh
3503 */
3504 m_min_free_thresh= m_free_thresh;
3505 m_max_free_thresh= 100;
3506 goto send_report;
3507 }
3508 }
3509 if (m_gci_slip_thresh &&
3510 (latest_gci-apply_gci >= m_gci_slip_thresh))
3511 {
3512 goto send_report;
3513 }
3514 return;
3515
3516 send_report:
3517 Uint32 data[8];
3518 data[0]= NDB_LE_EventBufferStatus;
3519 data[1]= m_total_alloc-m_free_data_sz;
3520 data[2]= m_total_alloc;
3521 data[3]= 0;
3522 data[4]= (Uint32)(apply_gci);
3523 data[5]= (Uint32)(apply_gci >> 32);
3524 data[6]= (Uint32)(latest_gci);
3525 data[7]= (Uint32)(latest_gci >> 32);
3526 Ndb_internal::send_event_report(true, m_ndb, data,8);
3527 #ifdef VM_TRACE
3528 assert(m_total_alloc >= m_free_data_sz);
3529 #endif
3530 }
3531
3532 #ifdef VM_TRACE
3533 void
verify_size(const EventBufData * data,Uint32 count,Uint32 sz)3534 NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
3535 {
3536 #if 0
3537 Uint32 tmp_count = 0;
3538 Uint32 tmp_sz = 0;
3539 while (data != 0) {
3540 Uint32 full_count, full_sz;
3541 data->get_full_size(full_count, full_sz);
3542 tmp_count += full_count;
3543 tmp_sz += full_sz;
3544 data = data->m_next;
3545 }
3546 assert(tmp_count == count);
3547 assert(tmp_sz == sz);
3548 #endif
3549 }
3550 void
verify_size(const EventBufData_list & list)3551 NdbEventBuffer::verify_size(const EventBufData_list & list)
3552 {
3553 #if 0
3554 verify_size(list.m_head, list.m_count, list.m_sz);
3555 #endif
3556 }
3557 #endif
3558
3559 // hash table routines
3560
3561 // could optimize the all-fixed case
3562 Uint32
getpkhash(NdbEventOperationImpl * op,LinearSectionPtr ptr[3])3563 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3564 {
3565 DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
3566 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3567 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3568
3569 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3570
3571 // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
3572 // for pk update (to equivalent pk) post/pre values give same hash
3573 Uint32 nkey = tab->m_noOfKeys;
3574 assert(nkey != 0 && nkey <= ptr[0].sz);
3575 const Uint32* hptr = ptr[0].p;
3576 const uchar* dptr = (uchar*)ptr[1].p;
3577
3578 // hash registers
3579 ulong nr1 = 0;
3580 ulong nr2 = 0;
3581 while (nkey-- != 0)
3582 {
3583 AttributeHeader ah(*hptr++);
3584 Uint32 bytesize = ah.getByteSize();
3585 assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
3586
3587 Uint32 i = ah.getAttributeId();
3588 const NdbColumnImpl* col = tab->getColumn(i);
3589 assert(col != 0);
3590
3591 Uint32 lb, len;
3592 bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
3593 assert(ok);
3594
3595 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3596 (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
3597 dptr += ((bytesize + 3) / 4) * 4;
3598 }
3599 DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
3600 DBUG_RETURN_EVENT(nr1);
3601 }
3602
3603 bool
getpkequal(NdbEventOperationImpl * op,LinearSectionPtr ptr1[3],LinearSectionPtr ptr2[3])3604 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
3605 {
3606 DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
3607 DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
3608 DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
3609 DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
3610 DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
3611
3612 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3613
3614 Uint32 nkey = tab->m_noOfKeys;
3615 assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
3616 const Uint32* hptr1 = ptr1[0].p;
3617 const Uint32* hptr2 = ptr2[0].p;
3618 const uchar* dptr1 = (uchar*)ptr1[1].p;
3619 const uchar* dptr2 = (uchar*)ptr2[1].p;
3620
3621 bool equal = true;
3622
3623 while (nkey-- != 0)
3624 {
3625 AttributeHeader ah1(*hptr1++);
3626 AttributeHeader ah2(*hptr2++);
3627 // sizes can differ on update of varchar endspace
3628 Uint32 bytesize1 = ah1.getByteSize();
3629 Uint32 bytesize2 = ah2.getByteSize();
3630 assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
3631 assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
3632
3633 assert(ah1.getAttributeId() == ah2.getAttributeId());
3634 Uint32 i = ah1.getAttributeId();
3635 const NdbColumnImpl* col = tab->getColumn(i);
3636 assert(col != 0);
3637
3638 Uint32 lb1, len1;
3639 bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
3640 Uint32 lb2, len2;
3641 bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
3642 assert(ok1 && ok2 && lb1 == lb2);
3643
3644 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3645 int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
3646 if (res != 0)
3647 {
3648 equal = false;
3649 break;
3650 }
3651 dptr1 += ((bytesize1 + 3) / 4) * 4;
3652 dptr2 += ((bytesize2 + 3) / 4) * 4;
3653 }
3654
3655 DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
3656 DBUG_RETURN_EVENT(equal);
3657 }
3658
3659 void
search(Pos & hpos,NdbEventOperationImpl * op,LinearSectionPtr ptr[3])3660 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3661 {
3662 DBUG_ENTER_EVENT("EventBufData_hash::search");
3663 Uint32 pkhash = getpkhash(op, ptr);
3664 Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
3665 EventBufData* data = m_hash[index];
3666 while (data != 0)
3667 {
3668 if (data->m_event_op == op &&
3669 data->m_pkhash == pkhash &&
3670 getpkequal(op, data->ptr, ptr))
3671 break;
3672 data = data->m_next_hash;
3673 }
3674 hpos.index = index;
3675 hpos.data = data;
3676 hpos.pkhash = pkhash;
3677 DBUG_PRINT_EVENT("info", ("search result=%p", data));
3678 DBUG_VOID_RETURN_EVENT;
3679 }
3680
3681 template class Vector<Gci_container_pod>;
3682 template class Vector<NdbEventBuffer::EventBufData_chunk*>;
3683