1 /*
2    Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 
24 */
25 
26 #include <NdbHistory.hpp>
27 #include <NdbOut.hpp>
28 
WorkerIdentifier()29 WorkerIdentifier:: WorkerIdentifier():
30     m_totalWorkers(0),
31     m_nextWorker(0)
32 {}
33 
34 void
init(const Uint32 totalWorkers)35 WorkerIdentifier::init(const Uint32 totalWorkers)
36 {
37   assert(totalWorkers != 0);
38   lock();
39   {
40     m_totalWorkers = totalWorkers;
41     m_nextWorker = 0;
42   }
43   unlock();
44 }
45 
46 Uint32
getTotalWorkers() const47 WorkerIdentifier::getTotalWorkers() const
48 {
49   return m_totalWorkers;
50 }
51 
52 Uint32
getNextWorkerId()53 WorkerIdentifier::getNextWorkerId()
54 {
55   Uint32 r;
56   lock();
57   {
58     assert(m_nextWorker < m_totalWorkers);
59     r = m_nextWorker++;
60   }
61   unlock();
62   return r;
63 }
64 
65 void
dump() const66 EpochRange::dump() const
67 {
68   ndbout_c("[%u/%u,%u/%u)",
69            hi(m_start), lo(m_start),
70            hi(m_end), lo(m_end));
71 }
72 
73 bool
equal(const RecordState & other) const74 NdbHistory::RecordState::equal(const RecordState& other) const
75 {
76   if (m_state != other.m_state)
77   {
78     return false;
79   }
80   if (m_state == RS_EXISTS &&
81       m_updatesValue != other.m_updatesValue)
82   {
83     return false;
84   }
85   return true;
86 }
87 
Version(RecordRange range)88 NdbHistory::Version::Version(RecordRange range) :
89   m_range(range)
90 {
91   m_states = new RecordState[m_range.m_len];
92   memset(m_states, 0, (sizeof(RecordState) * m_range.m_len));
93 }
94 
Version(const Version * other)95 NdbHistory::Version::Version(const Version* other) :
96   m_range(other->m_range)
97 {
98   m_states = new RecordState[m_range.m_len];
99   memcpy(m_states, other->m_states, (sizeof(RecordState) * m_range.m_len));
100 }
101 
~Version()102 NdbHistory::Version::~Version()
103 {
104   delete [] m_states;
105   m_states = NULL;
106 }
107 
108 void
assign(const Version * other)109 NdbHistory::Version::assign(const Version* other)
110 {
111   assert(m_range.m_start == other->m_range.m_start);
112   assert(m_range.m_len == other->m_range.m_len);
113 
114   memcpy(m_states, other->m_states, (sizeof(RecordState) * m_range.m_len));
115 }
116 
117 void
setRows(const Uint32 start,const Uint32 updatesValue,const Uint32 len)118 NdbHistory::Version::setRows(const Uint32 start,
119                              const Uint32 updatesValue,
120                              const Uint32 len)
121 {
122   setRowsImpl(start,
123               RecordState::RS_EXISTS,
124               updatesValue,
125               len);
126 }
127 
128 void
clearRows(const Uint32 start,const Uint32 len)129 NdbHistory::Version::clearRows(const Uint32 start,
130                                const Uint32 len)
131 {
132   setRowsImpl(start,
133               RecordState::RS_NOT_EXISTS,
134               0,
135               len);
136 }
137 
138 Uint32
diffRowCount(const Version * other) const139 NdbHistory::Version::diffRowCount(const Version* other) const
140 {
141   if ((m_range.m_start == other->m_range.m_start) &&
142       (m_range.m_len == other->m_range.m_len))
143   {
144     Uint32 count = 0;
145     for (Uint32 i=0; i < m_range.m_len; i++)
146     {
147       if (!m_states[i].equal(other->m_states[i]))
148       {
149         /* Note no notion of 'distance' for updatesValue */
150         count++;
151       }
152     }
153     return count;
154   }
155   abort();
156   return ~Uint32(0);
157 }
158 
159 bool
equal(const Version * other) const160 NdbHistory::Version::equal(const Version* other) const
161 {
162   return (diffRowCount(other) == 0);
163 }
164 
165 /* dumpV print helper */
dumpV(const char * indent,const Uint32 start,const Uint32 end,const NdbHistory::RecordState * rs)166 static void dumpV(const char* indent,
167                   const Uint32 start,
168                   const Uint32 end,
169                   const NdbHistory::RecordState* rs)
170 {
171   if (rs->m_state == NdbHistory::RecordState::RS_NOT_EXISTS)
172   {
173     ndbout_c("%s  r %5u -> %5u : -",
174              indent,
175              start,
176              end);
177   }
178   else
179   {
180     ndbout_c("%s  r %5u -> %5u : %u",
181              indent,
182              start,
183              end,
184              rs->m_updatesValue);
185   }
186 }
187 
188 void
dump(bool full,const char * indent) const189 NdbHistory::Version::dump(bool full,
190                        const char* indent) const
191 {
192   ndbout_c("%sRange start %u len %u",
193            indent,
194            m_range.m_start,
195            m_range.m_len);
196 
197   const Uint32 offset = m_range.m_start;
198 
199   Uint32 sameStart = 0;
200 
201   for (Uint32 i=0; i < m_range.m_len; i++)
202   {
203     const RecordState* rs = &m_states[i];
204     if (full)
205     {
206       /* TODO : Put > 1 version per line */
207       dumpV(indent,
208             offset+i,
209             offset+i,
210             rs);
211     }
212     else
213     {
214       if (i > 0)
215       {
216         /* Output contiguous ranges of same value */
217         bool same;
218         const RecordState* prev = &m_states[sameStart];
219 
220         same = ((rs->m_state == prev->m_state) &&
221                 (rs->m_updatesValue == prev->m_updatesValue));
222 
223         if (!same)
224         {
225           /* Output line for previous */
226           dumpV(indent,
227                 offset + sameStart,
228                 offset + i - 1,
229                 prev);
230 
231           sameStart = i;
232         }
233       }
234 
235       const bool last = (i == (m_range.m_len - 1));
236       if (last)
237       {
238         /* Output line for last set */
239         dumpV(indent,
240               offset + sameStart,
241               offset + i,
242               rs);
243       }
244     }
245   }
246 }
247 
248 void
dumpDiff(const Version * other) const249 NdbHistory::Version::dumpDiff(const Version* other) const
250 {
251   assert(m_range.m_start == other->m_range.m_start);
252   assert(m_range.m_len == other->m_range.m_len);
253 
254   const Uint32 offset = m_range.m_start;
255 
256   /* Simple - full diff view attm */
257   for (Uint32 i=0; i < m_range.m_len; i++)
258   {
259     if (m_states[i].equal(other->m_states[i]))
260     {
261       dumpV("      ",
262             offset + i,
263             offset + i,
264             &m_states[i]);
265     }
266     else
267     {
268       dumpV("DIFF A",
269             offset + i,
270             offset + i,
271             &m_states[i]);
272       dumpV("DIFF B",
273             offset + i,
274             offset + i,
275             &other->m_states[i]);
276     }
277   }
278 
279 }
280 
281 void
setRowsImpl(const Uint32 start,const Uint32 rowState,const Uint32 updatesValue,const Uint32 len)282 NdbHistory::Version::setRowsImpl(const Uint32 start,
283                                  const Uint32 rowState,
284                                  const Uint32 updatesValue,
285                                  const Uint32 len)
286 {
287   assert(start >= m_range.m_start);
288   assert((start + len) <= (m_range.m_start + m_range.m_len));
289 
290   const Uint32 offset = start - m_range.m_start;
291   for (Uint32 i=0; i < len; i++)
292   {
293     RecordState* rs = &m_states[offset + i];
294 
295     rs->m_state = rowState;
296     rs->m_updatesValue = updatesValue;
297   }
298 }
299 
300 const char*
getVersionTypeName(const VersionType vt)301 NdbHistory::getVersionTypeName(const VersionType vt)
302 {
303   switch(vt)
304   {
305   case VT_LATEST: return "VT_LATEST";
306   case VT_END_OF_GCI : return "VT_END_OF_GCI";
307   case VT_END_OF_EPOCH : return "VT_END_OF_EPOCH";
308   case VT_OTHER : return "VT_OTHER";
309   default:
310     ndbout_c("Unknown versionType %u", vt);
311     abort();
312     return NULL;
313   }
314 }
315 
316 void
dump() const317 NdbHistory::VersionMeta::dump() const
318 {
319   ndbout_c("  -- VERSION %llu %s %u/%u --",
320            m_number,
321            getVersionTypeName(m_type),
322            Uint32(m_latest_epoch >> 32),
323            Uint32(m_latest_epoch & 0xffffffff));
324 }
325 
326 
NdbHistory(const Granularity granularity,const RecordRange range)327 NdbHistory::NdbHistory(const Granularity granularity,
328                        const RecordRange range):
329   m_granularity(granularity),
330   m_range(range),
331   m_nextNumber(0)
332 {
333   /* Add initial version with nothing present in the range */
334   StoredVersion start;
335   start.m_meta.m_number = m_nextNumber++;
336   start.m_meta.m_type = VT_LATEST;
337   start.m_meta.m_latest_epoch = 0;
338 
339   start.m_version = new Version(m_range);
340   m_storedVersions.push_back(start);
341 }
342 
~NdbHistory()343 NdbHistory::~NdbHistory()
344 {
345   for (Uint32 i=0; i < m_storedVersions.size(); i++)
346   {
347     delete m_storedVersions[i].m_version;
348   }
349 }
350 
351 bool
checkVersionBoundary(const Uint64 epoch,VersionType & lastVersionType) const352 NdbHistory::checkVersionBoundary(const Uint64 epoch,
353                                  VersionType& lastVersionType) const
354 {
355   /* Check epoch compared to last then
356    * decide what to do based on
357    * recording granularity
358    */
359   if (m_granularity == GR_LATEST_ONLY)
360   {
361     /* Latest always represented as one version */
362     return false;
363   }
364 
365   const StoredVersion& lastVersion = m_storedVersions.back();
366   const Uint64 lastEpoch = lastVersion.m_meta.m_latest_epoch;
367   assert(epoch >= lastEpoch);
368   const bool sameEpoch = (epoch == lastEpoch);
369   const bool sameGci = ((epoch >> 32) ==
370                         (lastEpoch >> 32));
371 
372   if (m_granularity == GR_LATEST_GCI  && sameGci)
373   {
374     /* No boundary, same version */
375     return false;
376   }
377   else if (m_granularity == GR_LATEST_GCI_EPOCH && sameEpoch)
378   {
379     /* No boundary, same version */
380     return false;
381   }
382 
383   /**
384    * Some kind of boundary, determine implied
385    * type of last version
386    */
387   if (!sameGci)
388   {
389     /* Gci boundary */
390     lastVersionType = VT_END_OF_GCI;
391   }
392   else if (!sameEpoch)
393   {
394     /* Epoch boundary */
395     lastVersionType = VT_END_OF_EPOCH;
396   }
397   else
398   {
399     lastVersionType = VT_OTHER;
400   }
401 
402   return true;
403 }
404 
405 void
commitVersion(const Version * version,Uint64 commitEpoch)406 NdbHistory::commitVersion(const Version* version,
407                           Uint64 commitEpoch)
408 {
409   assert(m_range.m_start == version->m_range.m_start);
410   assert(m_range.m_len == version->m_range.m_len);
411 
412   VersionType lastVersionType;
413   StoredVersion& lastVersion = m_storedVersions.back();
414 
415   bool newVersion = checkVersionBoundary(commitEpoch,
416                                          lastVersionType);
417 
418   if (newVersion)
419   {
420     /**
421      * Epoch is sufficiently different to current latest,
422      * so 'save' current latest, and create a new copy for
423      * storing this change
424      */
425 
426     /* TODO : Optionally include */
427     if (false)
428     {
429       ndbout_c("New version (%llu) "
430                "prev epoch (%u/%u) new epoch (%u/%u) "
431                "prev epoch type %s",
432                m_nextNumber + 1,
433                Uint32(lastVersion.m_meta.m_latest_epoch >> 32),
434                Uint32(lastVersion.m_meta.m_latest_epoch & 0xffffffff),
435                Uint32(commitEpoch >> 32),
436                Uint32(commitEpoch & 0xffffffff),
437                getVersionTypeName(lastVersion.m_meta.m_type));
438     }
439 
440     /* Set type of last version based on boundary type */
441     lastVersion.m_meta.m_type = lastVersionType;
442 
443     StoredVersion newVersion;
444     newVersion.m_meta.m_number = m_nextNumber++;
445     newVersion.m_meta.m_type = VT_LATEST;
446     newVersion.m_meta.m_latest_epoch = commitEpoch;
447     newVersion.m_version = new Version(version);
448 
449     m_storedVersions.push_back(newVersion);
450   }
451   else
452   {
453     /* Update current latest version */
454     lastVersion.m_version->assign(version);
455     lastVersion.m_meta.m_latest_epoch = commitEpoch;
456   }
457 }
458 
459 const NdbHistory::Version*
getLatestVersion() const460 NdbHistory::getLatestVersion() const
461 {
462   return m_storedVersions.back().m_version;
463 }
464 
465 
466 const NdbHistory::Version*
findFirstClosestMatch(const Version * match,VersionMeta & vm) const467 NdbHistory::findFirstClosestMatch(const Version* match,
468                                   VersionMeta& vm) const
469 {
470   VersionIterator vi(*this);
471   const Version* v;
472   VersionMeta meta;
473   const Version* closest = NULL;
474   Uint32 minDistance = ~Uint32(0);
475 
476   while((v = vi.next(meta)))
477   {
478     const Uint32 distance = match->diffRowCount(v);
479     if (distance < minDistance)
480     {
481       minDistance = distance;
482       closest = v;
483       vm = meta;
484     }
485   }
486 
487   return closest;
488 }
489 
490 
VersionIterator(const NdbHistory & history)491 NdbHistory::VersionIterator::VersionIterator(const NdbHistory& history):
492   m_history(history),
493   m_index(0)
494 {
495 }
496 
497 
498 const NdbHistory::Version*
next(VersionMeta & vm)499 NdbHistory::VersionIterator::next(VersionMeta& vm)
500 {
501   if (m_index < m_history.m_storedVersions.size())
502   {
503     const StoredVersion& sv = m_history.m_storedVersions[m_index];
504 
505     vm = sv.m_meta;
506     m_index++;
507 
508     return sv.m_version;
509   }
510   return NULL;
511 }
512 
513 void
reset()514 NdbHistory::VersionIterator::reset()
515 {
516   m_index = 0;
517 }
518 
519 
VersionMatchIterator(const NdbHistory & history,const Version * match)520 NdbHistory::VersionMatchIterator::VersionMatchIterator(const NdbHistory& history,
521                                                        const Version* match) :
522   m_vi(history),
523   m_match(match)
524 {
525   assert(history.m_range.m_start ==
526          match->m_range.m_start);
527   assert(history.m_range.m_len ==
528          match->m_range.m_len);
529 }
530 
531 const NdbHistory::Version*
next(VersionMeta & vm)532 NdbHistory::VersionMatchIterator::next(VersionMeta& vm)
533 {
534   const Version* v;
535   while ((v = m_vi.next(vm)) != NULL)
536   {
537     if (m_match->equal(v))
538     {
539       return v;
540     }
541   }
542   return NULL;
543 }
544 
545 void
reset()546 NdbHistory::VersionMatchIterator::reset()
547 {
548   m_vi.reset();
549 }
550 
551 
552 NdbHistory::MatchingEpochRangeIterator::
MatchingEpochRangeIterator(const NdbHistory & history,const Version * match)553 MatchingEpochRangeIterator(const NdbHistory& history,
554                            const Version* match):
555   m_vi(history),
556   m_match(match)
557 {
558   assert(history.m_range.m_start ==
559          match->m_range.m_start);
560   assert(history.m_range.m_len ==
561          match->m_range.m_len);
562 }
563 
564 bool
next(EpochRange & er)565 NdbHistory::MatchingEpochRangeIterator::next(EpochRange& er)
566 {
567   const Version* v;
568   VersionMeta vm;
569   const Version* matchStart = NULL;
570   VersionMeta matchStartVm;
571 
572   while ((v = m_vi.next(vm)) != NULL)
573   {
574     if (m_match->equal(v))
575     {
576       if (matchStart == NULL)
577       {
578         /* Start of matching range */
579         matchStart = v;
580         matchStartVm = vm;
581       }
582       /* else continuing range */
583     }
584     else
585     {
586       if (matchStart != NULL)
587       {
588         /* End of matching range */
589         /* Does it include an epoch boundary? */
590         if (vm.m_latest_epoch ==
591             matchStartVm.m_latest_epoch)
592         {
593           /* No epoch boundary, skip it and continue */
594           matchStart = NULL;
595         }
596         else
597         {
598           /* Boundary */
599           break;
600         }
601       }
602     }
603   }
604 
605   if (matchStart)
606   {
607     /* Have a match */
608     er.m_start = matchStartVm.m_latest_epoch;
609 
610     if (v != NULL)
611     {
612       er.m_end = vm.m_latest_epoch;
613     }
614     else
615     {
616       /* Latest is a kind of implicit epoch boundary */
617       er.m_end = EpochRange::MAX_EPOCH;
618     }
619 
620     return true;
621   }
622 
623   return false;
624 }
625 
626 void
reset()627 NdbHistory::MatchingEpochRangeIterator::reset()
628 {
629   m_vi.reset();
630 }
631 
632 
633 const char*
getGranularityName(const Granularity gr)634 NdbHistory::getGranularityName(const Granularity gr)
635 {
636   switch (gr)
637   {
638   case GR_LATEST_ONLY: return "GR_LATEST_ONLY";
639   case GR_LATEST_GCI : return "GR_LATEST_GCI";
640   case GR_LATEST_GCI_EPOCH : return "GR_LATEST_GCI_EPOCH";
641   case GR_ALL : return "GR_ALL";
642   default:
643     ndbout_c("Unknown granularity : %u", gr);
644     abort();
645     return NULL;
646   }
647 }
648 
649 void
dump(const bool full) const650 NdbHistory::dump(const bool full) const
651 {
652   ndbout_c("NdbHistory %p", this);
653   ndbout_c("  Granularity : %s", getGranularityName(m_granularity));
654   ndbout_c("  Range start %u len %u",
655            m_range.m_start,
656            m_range.m_len);
657   ndbout_c("  Num versions stored %u ", m_storedVersions.size());
658   const Uint32 lastVersion = m_storedVersions.size() - 1;
659   ndbout_c("  Commit epoch range %u/%u -> %u/%u",
660            Uint32(m_storedVersions[0].m_meta.m_latest_epoch >> 32),
661            Uint32(m_storedVersions[0].m_meta.m_latest_epoch & 0xffffffff),
662            Uint32(m_storedVersions[lastVersion].m_meta.m_latest_epoch >> 32),
663            Uint32(m_storedVersions[lastVersion].m_meta.m_latest_epoch & 0xffffffff));
664 
665   if (full)
666   {
667     ndbout_c("Contained versions first->last : ");
668     VersionIterator vi(*this);
669     VersionMeta vm;
670     const Version* v;
671 
672     while ((v = vi.next(vm)))
673     {
674       vm.dump();
675       v->dump(false, "     ");
676     }
677     ndbout_c("End of versions");
678   }
679 }
680 
681 void
dumpClosestMatch(const Version * target) const682 NdbHistory::dumpClosestMatch(const Version* target) const
683 {
684   /* Dump some useful info about an attempted match */
685   const Version* closestMatch = NULL;
686   NdbHistory::VersionMeta closestMatchMeta;
687 
688   if ((closestMatch = findFirstClosestMatch(target,
689                                             closestMatchMeta)))
690   {
691     ndbout_c("Closest version in history :");
692     closestMatchMeta.dump();
693     closestMatch->dump();
694 
695     // TODO : Dump a version diff, with good diff format
696   }
697   else
698   {
699     ndbout_c("Failed to find a close match in history");
700   }
701 }
702