1 /*
2 Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
24 */
25
26 #include <NdbHistory.hpp>
27 #include <NdbOut.hpp>
28
WorkerIdentifier()29 WorkerIdentifier:: WorkerIdentifier():
30 m_totalWorkers(0),
31 m_nextWorker(0)
32 {}
33
34 void
init(const Uint32 totalWorkers)35 WorkerIdentifier::init(const Uint32 totalWorkers)
36 {
37 assert(totalWorkers != 0);
38 lock();
39 {
40 m_totalWorkers = totalWorkers;
41 m_nextWorker = 0;
42 }
43 unlock();
44 }
45
46 Uint32
getTotalWorkers() const47 WorkerIdentifier::getTotalWorkers() const
48 {
49 return m_totalWorkers;
50 }
51
52 Uint32
getNextWorkerId()53 WorkerIdentifier::getNextWorkerId()
54 {
55 Uint32 r;
56 lock();
57 {
58 assert(m_nextWorker < m_totalWorkers);
59 r = m_nextWorker++;
60 }
61 unlock();
62 return r;
63 }
64
65 void
dump() const66 EpochRange::dump() const
67 {
68 ndbout_c("[%u/%u,%u/%u)",
69 hi(m_start), lo(m_start),
70 hi(m_end), lo(m_end));
71 }
72
73 bool
equal(const RecordState & other) const74 NdbHistory::RecordState::equal(const RecordState& other) const
75 {
76 if (m_state != other.m_state)
77 {
78 return false;
79 }
80 if (m_state == RS_EXISTS &&
81 m_updatesValue != other.m_updatesValue)
82 {
83 return false;
84 }
85 return true;
86 }
87
Version(RecordRange range)88 NdbHistory::Version::Version(RecordRange range) :
89 m_range(range)
90 {
91 m_states = new RecordState[m_range.m_len];
92 memset(m_states, 0, (sizeof(RecordState) * m_range.m_len));
93 }
94
Version(const Version * other)95 NdbHistory::Version::Version(const Version* other) :
96 m_range(other->m_range)
97 {
98 m_states = new RecordState[m_range.m_len];
99 memcpy(m_states, other->m_states, (sizeof(RecordState) * m_range.m_len));
100 }
101
~Version()102 NdbHistory::Version::~Version()
103 {
104 delete [] m_states;
105 m_states = NULL;
106 }
107
108 void
assign(const Version * other)109 NdbHistory::Version::assign(const Version* other)
110 {
111 assert(m_range.m_start == other->m_range.m_start);
112 assert(m_range.m_len == other->m_range.m_len);
113
114 memcpy(m_states, other->m_states, (sizeof(RecordState) * m_range.m_len));
115 }
116
117 void
setRows(const Uint32 start,const Uint32 updatesValue,const Uint32 len)118 NdbHistory::Version::setRows(const Uint32 start,
119 const Uint32 updatesValue,
120 const Uint32 len)
121 {
122 setRowsImpl(start,
123 RecordState::RS_EXISTS,
124 updatesValue,
125 len);
126 }
127
128 void
clearRows(const Uint32 start,const Uint32 len)129 NdbHistory::Version::clearRows(const Uint32 start,
130 const Uint32 len)
131 {
132 setRowsImpl(start,
133 RecordState::RS_NOT_EXISTS,
134 0,
135 len);
136 }
137
138 Uint32
diffRowCount(const Version * other) const139 NdbHistory::Version::diffRowCount(const Version* other) const
140 {
141 if ((m_range.m_start == other->m_range.m_start) &&
142 (m_range.m_len == other->m_range.m_len))
143 {
144 Uint32 count = 0;
145 for (Uint32 i=0; i < m_range.m_len; i++)
146 {
147 if (!m_states[i].equal(other->m_states[i]))
148 {
149 /* Note no notion of 'distance' for updatesValue */
150 count++;
151 }
152 }
153 return count;
154 }
155 abort();
156 return ~Uint32(0);
157 }
158
159 bool
equal(const Version * other) const160 NdbHistory::Version::equal(const Version* other) const
161 {
162 return (diffRowCount(other) == 0);
163 }
164
165 /* dumpV print helper */
dumpV(const char * indent,const Uint32 start,const Uint32 end,const NdbHistory::RecordState * rs)166 static void dumpV(const char* indent,
167 const Uint32 start,
168 const Uint32 end,
169 const NdbHistory::RecordState* rs)
170 {
171 if (rs->m_state == NdbHistory::RecordState::RS_NOT_EXISTS)
172 {
173 ndbout_c("%s r %5u -> %5u : -",
174 indent,
175 start,
176 end);
177 }
178 else
179 {
180 ndbout_c("%s r %5u -> %5u : %u",
181 indent,
182 start,
183 end,
184 rs->m_updatesValue);
185 }
186 }
187
188 void
dump(bool full,const char * indent) const189 NdbHistory::Version::dump(bool full,
190 const char* indent) const
191 {
192 ndbout_c("%sRange start %u len %u",
193 indent,
194 m_range.m_start,
195 m_range.m_len);
196
197 const Uint32 offset = m_range.m_start;
198
199 Uint32 sameStart = 0;
200
201 for (Uint32 i=0; i < m_range.m_len; i++)
202 {
203 const RecordState* rs = &m_states[i];
204 if (full)
205 {
206 /* TODO : Put > 1 version per line */
207 dumpV(indent,
208 offset+i,
209 offset+i,
210 rs);
211 }
212 else
213 {
214 if (i > 0)
215 {
216 /* Output contiguous ranges of same value */
217 bool same;
218 const RecordState* prev = &m_states[sameStart];
219
220 same = ((rs->m_state == prev->m_state) &&
221 (rs->m_updatesValue == prev->m_updatesValue));
222
223 if (!same)
224 {
225 /* Output line for previous */
226 dumpV(indent,
227 offset + sameStart,
228 offset + i - 1,
229 prev);
230
231 sameStart = i;
232 }
233 }
234
235 const bool last = (i == (m_range.m_len - 1));
236 if (last)
237 {
238 /* Output line for last set */
239 dumpV(indent,
240 offset + sameStart,
241 offset + i,
242 rs);
243 }
244 }
245 }
246 }
247
248 void
dumpDiff(const Version * other) const249 NdbHistory::Version::dumpDiff(const Version* other) const
250 {
251 assert(m_range.m_start == other->m_range.m_start);
252 assert(m_range.m_len == other->m_range.m_len);
253
254 const Uint32 offset = m_range.m_start;
255
256 /* Simple - full diff view attm */
257 for (Uint32 i=0; i < m_range.m_len; i++)
258 {
259 if (m_states[i].equal(other->m_states[i]))
260 {
261 dumpV(" ",
262 offset + i,
263 offset + i,
264 &m_states[i]);
265 }
266 else
267 {
268 dumpV("DIFF A",
269 offset + i,
270 offset + i,
271 &m_states[i]);
272 dumpV("DIFF B",
273 offset + i,
274 offset + i,
275 &other->m_states[i]);
276 }
277 }
278
279 }
280
281 void
setRowsImpl(const Uint32 start,const Uint32 rowState,const Uint32 updatesValue,const Uint32 len)282 NdbHistory::Version::setRowsImpl(const Uint32 start,
283 const Uint32 rowState,
284 const Uint32 updatesValue,
285 const Uint32 len)
286 {
287 assert(start >= m_range.m_start);
288 assert((start + len) <= (m_range.m_start + m_range.m_len));
289
290 const Uint32 offset = start - m_range.m_start;
291 for (Uint32 i=0; i < len; i++)
292 {
293 RecordState* rs = &m_states[offset + i];
294
295 rs->m_state = rowState;
296 rs->m_updatesValue = updatesValue;
297 }
298 }
299
300 const char*
getVersionTypeName(const VersionType vt)301 NdbHistory::getVersionTypeName(const VersionType vt)
302 {
303 switch(vt)
304 {
305 case VT_LATEST: return "VT_LATEST";
306 case VT_END_OF_GCI : return "VT_END_OF_GCI";
307 case VT_END_OF_EPOCH : return "VT_END_OF_EPOCH";
308 case VT_OTHER : return "VT_OTHER";
309 default:
310 ndbout_c("Unknown versionType %u", vt);
311 abort();
312 return NULL;
313 }
314 }
315
316 void
dump() const317 NdbHistory::VersionMeta::dump() const
318 {
319 ndbout_c(" -- VERSION %llu %s %u/%u --",
320 m_number,
321 getVersionTypeName(m_type),
322 Uint32(m_latest_epoch >> 32),
323 Uint32(m_latest_epoch & 0xffffffff));
324 }
325
326
NdbHistory(const Granularity granularity,const RecordRange range)327 NdbHistory::NdbHistory(const Granularity granularity,
328 const RecordRange range):
329 m_granularity(granularity),
330 m_range(range),
331 m_nextNumber(0)
332 {
333 /* Add initial version with nothing present in the range */
334 StoredVersion start;
335 start.m_meta.m_number = m_nextNumber++;
336 start.m_meta.m_type = VT_LATEST;
337 start.m_meta.m_latest_epoch = 0;
338
339 start.m_version = new Version(m_range);
340 m_storedVersions.push_back(start);
341 }
342
~NdbHistory()343 NdbHistory::~NdbHistory()
344 {
345 for (Uint32 i=0; i < m_storedVersions.size(); i++)
346 {
347 delete m_storedVersions[i].m_version;
348 }
349 }
350
351 bool
checkVersionBoundary(const Uint64 epoch,VersionType & lastVersionType) const352 NdbHistory::checkVersionBoundary(const Uint64 epoch,
353 VersionType& lastVersionType) const
354 {
355 /* Check epoch compared to last then
356 * decide what to do based on
357 * recording granularity
358 */
359 if (m_granularity == GR_LATEST_ONLY)
360 {
361 /* Latest always represented as one version */
362 return false;
363 }
364
365 const StoredVersion& lastVersion = m_storedVersions.back();
366 const Uint64 lastEpoch = lastVersion.m_meta.m_latest_epoch;
367 assert(epoch >= lastEpoch);
368 const bool sameEpoch = (epoch == lastEpoch);
369 const bool sameGci = ((epoch >> 32) ==
370 (lastEpoch >> 32));
371
372 if (m_granularity == GR_LATEST_GCI && sameGci)
373 {
374 /* No boundary, same version */
375 return false;
376 }
377 else if (m_granularity == GR_LATEST_GCI_EPOCH && sameEpoch)
378 {
379 /* No boundary, same version */
380 return false;
381 }
382
383 /**
384 * Some kind of boundary, determine implied
385 * type of last version
386 */
387 if (!sameGci)
388 {
389 /* Gci boundary */
390 lastVersionType = VT_END_OF_GCI;
391 }
392 else if (!sameEpoch)
393 {
394 /* Epoch boundary */
395 lastVersionType = VT_END_OF_EPOCH;
396 }
397 else
398 {
399 lastVersionType = VT_OTHER;
400 }
401
402 return true;
403 }
404
405 void
commitVersion(const Version * version,Uint64 commitEpoch)406 NdbHistory::commitVersion(const Version* version,
407 Uint64 commitEpoch)
408 {
409 assert(m_range.m_start == version->m_range.m_start);
410 assert(m_range.m_len == version->m_range.m_len);
411
412 VersionType lastVersionType;
413 StoredVersion& lastVersion = m_storedVersions.back();
414
415 bool newVersion = checkVersionBoundary(commitEpoch,
416 lastVersionType);
417
418 if (newVersion)
419 {
420 /**
421 * Epoch is sufficiently different to current latest,
422 * so 'save' current latest, and create a new copy for
423 * storing this change
424 */
425
426 /* TODO : Optionally include */
427 if (false)
428 {
429 ndbout_c("New version (%llu) "
430 "prev epoch (%u/%u) new epoch (%u/%u) "
431 "prev epoch type %s",
432 m_nextNumber + 1,
433 Uint32(lastVersion.m_meta.m_latest_epoch >> 32),
434 Uint32(lastVersion.m_meta.m_latest_epoch & 0xffffffff),
435 Uint32(commitEpoch >> 32),
436 Uint32(commitEpoch & 0xffffffff),
437 getVersionTypeName(lastVersion.m_meta.m_type));
438 }
439
440 /* Set type of last version based on boundary type */
441 lastVersion.m_meta.m_type = lastVersionType;
442
443 StoredVersion newVersion;
444 newVersion.m_meta.m_number = m_nextNumber++;
445 newVersion.m_meta.m_type = VT_LATEST;
446 newVersion.m_meta.m_latest_epoch = commitEpoch;
447 newVersion.m_version = new Version(version);
448
449 m_storedVersions.push_back(newVersion);
450 }
451 else
452 {
453 /* Update current latest version */
454 lastVersion.m_version->assign(version);
455 lastVersion.m_meta.m_latest_epoch = commitEpoch;
456 }
457 }
458
459 const NdbHistory::Version*
getLatestVersion() const460 NdbHistory::getLatestVersion() const
461 {
462 return m_storedVersions.back().m_version;
463 }
464
465
466 const NdbHistory::Version*
findFirstClosestMatch(const Version * match,VersionMeta & vm) const467 NdbHistory::findFirstClosestMatch(const Version* match,
468 VersionMeta& vm) const
469 {
470 VersionIterator vi(*this);
471 const Version* v;
472 VersionMeta meta;
473 const Version* closest = NULL;
474 Uint32 minDistance = ~Uint32(0);
475
476 while((v = vi.next(meta)))
477 {
478 const Uint32 distance = match->diffRowCount(v);
479 if (distance < minDistance)
480 {
481 minDistance = distance;
482 closest = v;
483 vm = meta;
484 }
485 }
486
487 return closest;
488 }
489
490
VersionIterator(const NdbHistory & history)491 NdbHistory::VersionIterator::VersionIterator(const NdbHistory& history):
492 m_history(history),
493 m_index(0)
494 {
495 }
496
497
498 const NdbHistory::Version*
next(VersionMeta & vm)499 NdbHistory::VersionIterator::next(VersionMeta& vm)
500 {
501 if (m_index < m_history.m_storedVersions.size())
502 {
503 const StoredVersion& sv = m_history.m_storedVersions[m_index];
504
505 vm = sv.m_meta;
506 m_index++;
507
508 return sv.m_version;
509 }
510 return NULL;
511 }
512
513 void
reset()514 NdbHistory::VersionIterator::reset()
515 {
516 m_index = 0;
517 }
518
519
VersionMatchIterator(const NdbHistory & history,const Version * match)520 NdbHistory::VersionMatchIterator::VersionMatchIterator(const NdbHistory& history,
521 const Version* match) :
522 m_vi(history),
523 m_match(match)
524 {
525 assert(history.m_range.m_start ==
526 match->m_range.m_start);
527 assert(history.m_range.m_len ==
528 match->m_range.m_len);
529 }
530
531 const NdbHistory::Version*
next(VersionMeta & vm)532 NdbHistory::VersionMatchIterator::next(VersionMeta& vm)
533 {
534 const Version* v;
535 while ((v = m_vi.next(vm)) != NULL)
536 {
537 if (m_match->equal(v))
538 {
539 return v;
540 }
541 }
542 return NULL;
543 }
544
545 void
reset()546 NdbHistory::VersionMatchIterator::reset()
547 {
548 m_vi.reset();
549 }
550
551
552 NdbHistory::MatchingEpochRangeIterator::
MatchingEpochRangeIterator(const NdbHistory & history,const Version * match)553 MatchingEpochRangeIterator(const NdbHistory& history,
554 const Version* match):
555 m_vi(history),
556 m_match(match)
557 {
558 assert(history.m_range.m_start ==
559 match->m_range.m_start);
560 assert(history.m_range.m_len ==
561 match->m_range.m_len);
562 }
563
564 bool
next(EpochRange & er)565 NdbHistory::MatchingEpochRangeIterator::next(EpochRange& er)
566 {
567 const Version* v;
568 VersionMeta vm;
569 const Version* matchStart = NULL;
570 VersionMeta matchStartVm;
571
572 while ((v = m_vi.next(vm)) != NULL)
573 {
574 if (m_match->equal(v))
575 {
576 if (matchStart == NULL)
577 {
578 /* Start of matching range */
579 matchStart = v;
580 matchStartVm = vm;
581 }
582 /* else continuing range */
583 }
584 else
585 {
586 if (matchStart != NULL)
587 {
588 /* End of matching range */
589 /* Does it include an epoch boundary? */
590 if (vm.m_latest_epoch ==
591 matchStartVm.m_latest_epoch)
592 {
593 /* No epoch boundary, skip it and continue */
594 matchStart = NULL;
595 }
596 else
597 {
598 /* Boundary */
599 break;
600 }
601 }
602 }
603 }
604
605 if (matchStart)
606 {
607 /* Have a match */
608 er.m_start = matchStartVm.m_latest_epoch;
609
610 if (v != NULL)
611 {
612 er.m_end = vm.m_latest_epoch;
613 }
614 else
615 {
616 /* Latest is a kind of implicit epoch boundary */
617 er.m_end = EpochRange::MAX_EPOCH;
618 }
619
620 return true;
621 }
622
623 return false;
624 }
625
626 void
reset()627 NdbHistory::MatchingEpochRangeIterator::reset()
628 {
629 m_vi.reset();
630 }
631
632
633 const char*
getGranularityName(const Granularity gr)634 NdbHistory::getGranularityName(const Granularity gr)
635 {
636 switch (gr)
637 {
638 case GR_LATEST_ONLY: return "GR_LATEST_ONLY";
639 case GR_LATEST_GCI : return "GR_LATEST_GCI";
640 case GR_LATEST_GCI_EPOCH : return "GR_LATEST_GCI_EPOCH";
641 case GR_ALL : return "GR_ALL";
642 default:
643 ndbout_c("Unknown granularity : %u", gr);
644 abort();
645 return NULL;
646 }
647 }
648
649 void
dump(const bool full) const650 NdbHistory::dump(const bool full) const
651 {
652 ndbout_c("NdbHistory %p", this);
653 ndbout_c(" Granularity : %s", getGranularityName(m_granularity));
654 ndbout_c(" Range start %u len %u",
655 m_range.m_start,
656 m_range.m_len);
657 ndbout_c(" Num versions stored %u ", m_storedVersions.size());
658 const Uint32 lastVersion = m_storedVersions.size() - 1;
659 ndbout_c(" Commit epoch range %u/%u -> %u/%u",
660 Uint32(m_storedVersions[0].m_meta.m_latest_epoch >> 32),
661 Uint32(m_storedVersions[0].m_meta.m_latest_epoch & 0xffffffff),
662 Uint32(m_storedVersions[lastVersion].m_meta.m_latest_epoch >> 32),
663 Uint32(m_storedVersions[lastVersion].m_meta.m_latest_epoch & 0xffffffff));
664
665 if (full)
666 {
667 ndbout_c("Contained versions first->last : ");
668 VersionIterator vi(*this);
669 VersionMeta vm;
670 const Version* v;
671
672 while ((v = vi.next(vm)))
673 {
674 vm.dump();
675 v->dump(false, " ");
676 }
677 ndbout_c("End of versions");
678 }
679 }
680
681 void
dumpClosestMatch(const Version * target) const682 NdbHistory::dumpClosestMatch(const Version* target) const
683 {
684 /* Dump some useful info about an attempted match */
685 const Version* closestMatch = NULL;
686 NdbHistory::VersionMeta closestMatchMeta;
687
688 if ((closestMatch = findFirstClosestMatch(target,
689 closestMatchMeta)))
690 {
691 ndbout_c("Closest version in history :");
692 closestMatchMeta.dump();
693 closestMatch->dump();
694
695 // TODO : Dump a version diff, with good diff format
696 }
697 else
698 {
699 ndbout_c("Failed to find a close match in history");
700 }
701 }
702