1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 #include <ndb_global.h>
24 #include <Ndb.hpp>
25 #include <NdbTransaction.hpp>
26 #include <NdbRecAttr.hpp>
27 #include <NdbOut.hpp>
28 #include <NdbEnv.h>
29 #include <Bitmask.hpp>
30 #include <NdbSqlUtil.hpp>
31 #include <NdbRecord.hpp>
32 #include <NdbEventOperation.hpp>
33 #include <NdbSleep.h>
34 #include "NdbIndexStatImpl.hpp"
35 
36 #undef min
37 #undef max
38 #define min(a, b) ((a) <= (b) ? (a) : (b))
39 #define max(a, b) ((a) >= (b) ? (a) : (b))
40 
41 static const char* const g_headtable_name = NDB_INDEX_STAT_HEAD_TABLE;
42 static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
43 static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
44 
45 static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
46 static const int ERR_TupleNotFound[] = { 626, 0 };
47 
NdbIndexStatImpl(NdbIndexStat & facade)48 NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
49   NdbIndexStat(*this),
50   m_facade(&facade),
51   m_keyData(m_keySpec, false, 2),
52   m_valueData(m_valueSpec, false, 2)
53 {
54   init();
55   m_query_mutex = NdbMutex_Create();
56   assert(m_query_mutex != 0);
57   m_eventOp = 0;
58   m_mem_handler = &c_mem_default_handler;
59 }
60 
61 void
init()62 NdbIndexStatImpl::init()
63 {
64   m_indexSet = false;
65   m_indexId = 0;
66   m_indexVersion = 0;
67   m_tableId = 0;
68   m_keyAttrs = 0;
69   m_valueAttrs = 0;
70   // buffers
71   m_keySpecBuf = 0;
72   m_valueSpecBuf = 0;
73   m_keyDataBuf = 0;
74   m_valueDataBuf = 0;
75   // cache
76   m_cacheBuild = 0;
77   m_cacheQuery = 0;
78   m_cacheClean = 0;
79   // head
80   init_head(m_facadeHead);
81 }
82 
~NdbIndexStatImpl()83 NdbIndexStatImpl::~NdbIndexStatImpl()
84 {
85   reset_index();
86   if (m_query_mutex != 0)
87   {
88     NdbMutex_Destroy(m_query_mutex);
89     m_query_mutex = 0;
90   }
91 }
92 
93 // sys tables meta
94 
Sys(NdbIndexStatImpl * impl,Ndb * ndb)95 NdbIndexStatImpl::Sys::Sys(NdbIndexStatImpl* impl, Ndb* ndb) :
96   m_impl(impl),
97   m_ndb(ndb)
98 {
99   m_dic = m_ndb->getDictionary();
100   m_headtable = 0;
101   m_sampletable = 0;
102   m_sampleindex1 = 0;
103   m_obj_cnt = 0;
104 }
105 
~Sys()106 NdbIndexStatImpl::Sys::~Sys()
107 {
108   m_impl->sys_release(*this);
109 }
110 
111 void
sys_release(Sys & sys)112 NdbIndexStatImpl::sys_release(Sys& sys)
113 {
114   // close schema trans if any exists
115   NdbDictionary::Dictionary* const dic = sys.m_dic;
116   (void)dic->endSchemaTrans(NdbDictionary::Dictionary::SchemaTransAbort);
117 
118   if (sys.m_headtable != 0)
119   {
120     sys.m_dic->removeTableGlobal(*sys.m_headtable, false);
121     sys.m_headtable = 0;
122   }
123   if (sys.m_sampletable != 0)
124   {
125     sys.m_dic->removeTableGlobal(*sys.m_sampletable, false);
126     sys.m_sampletable = 0;
127   }
128   if (sys.m_sampleindex1 != 0)
129   {
130     sys.m_dic->removeIndexGlobal(*sys.m_sampleindex1, false);
131     sys.m_sampleindex1 = 0;
132   }
133 }
134 
135 int
make_headtable(NdbDictionary::Table & tab)136 NdbIndexStatImpl::make_headtable(NdbDictionary::Table& tab)
137 {
138   tab.setName(g_headtable_name);
139   tab.setLogging(true);
140   int ret;
141   ret = tab.setFrm(g_ndb_index_stat_head_frm_data,
142                    g_ndb_index_stat_head_frm_len);
143   if (ret != 0)
144   {
145     setError(ret, __LINE__);
146     return -1;
147   }
148   // key must be first
149   {
150     NdbDictionary::Column col("index_id");
151     col.setType(NdbDictionary::Column::Unsigned);
152     col.setPrimaryKey(true);
153     tab.addColumn(col);
154   }
155   {
156     NdbDictionary::Column col("index_version");
157     col.setType(NdbDictionary::Column::Unsigned);
158     col.setPrimaryKey(true);
159     tab.addColumn(col);
160   }
161   // table
162   {
163     NdbDictionary::Column col("table_id");
164     col.setType(NdbDictionary::Column::Unsigned);
165     col.setNullable(false);
166     tab.addColumn(col);
167   }
168   {
169     NdbDictionary::Column col("frag_count");
170     col.setType(NdbDictionary::Column::Unsigned);
171     col.setNullable(false);
172     tab.addColumn(col);
173   }
174   // current sample
175   {
176     NdbDictionary::Column col("value_format");
177     col.setType(NdbDictionary::Column::Unsigned);
178     col.setNullable(false);
179     tab.addColumn(col);
180   }
181   {
182     NdbDictionary::Column col("sample_version");
183     col.setType(NdbDictionary::Column::Unsigned);
184     col.setNullable(false);
185     tab.addColumn(col);
186   }
187   {
188     NdbDictionary::Column col("load_time");
189     col.setType(NdbDictionary::Column::Unsigned);
190     col.setNullable(false);
191     tab.addColumn(col);
192   }
193   {
194     NdbDictionary::Column col("sample_count");
195     col.setType(NdbDictionary::Column::Unsigned);
196     col.setNullable(false);
197     tab.addColumn(col);
198   }
199   {
200     NdbDictionary::Column col("key_bytes");
201     col.setType(NdbDictionary::Column::Unsigned);
202     col.setNullable(false);
203     tab.addColumn(col);
204   }
205   NdbError error;
206   if (tab.validate(error) == -1) {
207     setError(error.code, __LINE__);
208     return -1;
209   }
210   return 0;
211 }
212 
213 int
make_sampletable(NdbDictionary::Table & tab)214 NdbIndexStatImpl::make_sampletable(NdbDictionary::Table& tab)
215 {
216   tab.setName(g_sampletable_name);
217   tab.setLogging(true);
218   int ret;
219   ret = tab.setFrm(g_ndb_index_stat_sample_frm_data,
220                    g_ndb_index_stat_sample_frm_len);
221   if (ret != 0)
222   {
223     setError(ret, __LINE__);
224     return -1;
225   }
226   // key must be first
227   {
228     NdbDictionary::Column col("index_id");
229     col.setType(NdbDictionary::Column::Unsigned);
230     col.setPrimaryKey(true);
231     tab.addColumn(col);
232   }
233   {
234     NdbDictionary::Column col("index_version");
235     col.setType(NdbDictionary::Column::Unsigned);
236     col.setPrimaryKey(true);
237     tab.addColumn(col);
238   }
239   {
240     NdbDictionary::Column col("sample_version");
241     col.setType(NdbDictionary::Column::Unsigned);
242     col.setPrimaryKey(true);
243     tab.addColumn(col);
244   }
245   {
246     NdbDictionary::Column col("stat_key");
247     col.setType(NdbDictionary::Column::Longvarbinary);
248     col.setPrimaryKey(true);
249     col.setLength(MaxKeyBytes);
250     tab.addColumn(col);
251   }
252   // value
253   {
254     NdbDictionary::Column col("stat_value");
255     col.setType(NdbDictionary::Column::Longvarbinary);
256     col.setNullable(false);
257     col.setLength(MaxValueCBytes);
258     tab.addColumn(col);
259   }
260   NdbError error;
261   if (tab.validate(error) == -1) {
262     setError(error.code, __LINE__);
263     return -1;
264   }
265   return 0;
266 }
267 
268 int
make_sampleindex1(NdbDictionary::Index & ind)269 NdbIndexStatImpl::make_sampleindex1(NdbDictionary::Index& ind)
270 {
271   ind.setTable(g_sampletable_name);
272   ind.setName(g_sampleindex1_name);
273   ind.setType(NdbDictionary::Index::OrderedIndex);
274   ind.setLogging(false);
275   ind.addColumnName("index_id");
276   ind.addColumnName("index_version");
277   ind.addColumnName("sample_version");
278   return 0;
279 }
280 
281 int
check_table(const NdbDictionary::Table & tab1,const NdbDictionary::Table & tab2)282 NdbIndexStatImpl::check_table(const NdbDictionary::Table& tab1,
283                               const NdbDictionary::Table& tab2)
284 {
285   if (tab1.getNoOfColumns() != tab2.getNoOfColumns())
286     return -1;
287   const uint n = tab1.getNoOfColumns();
288   for (uint i = 0; i < n; i++)
289   {
290     const NdbDictionary::Column* col1 = tab1.getColumn(i);
291     const NdbDictionary::Column* col2 = tab2.getColumn(i);
292     require(col1 != 0 && col2 != 0);
293     if (!col1->equal(*col2))
294       return -1;
295   }
296   return 0;
297 }
298 
299 int
check_index(const NdbDictionary::Index & ind1,const NdbDictionary::Index & ind2)300 NdbIndexStatImpl::check_index(const NdbDictionary::Index& ind1,
301                               const NdbDictionary::Index& ind2)
302 {
303   if (ind1.getNoOfColumns() != ind2.getNoOfColumns())
304     return -1;
305   const uint n = ind1.getNoOfColumns();
306   for (uint i = 0; i < n; i++)
307   {
308     const NdbDictionary::Column* col1 = ind1.getColumn(i);
309     const NdbDictionary::Column* col2 = ind2.getColumn(i);
310     require(col1 != 0 && col2 != 0);
311     // getColumnNo() does not work on non-retrieved
312     if (!col1->equal(*col2))
313       return -1;
314   }
315   return 0;
316 }
317 
318 int
get_systables(Sys & sys)319 NdbIndexStatImpl::get_systables(Sys& sys)
320 {
321   Ndb* ndb = sys.m_ndb;
322   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
323   const int NoSuchTable = 723;
324   const int NoSuchIndex = 4243;
325 
326   sys.m_headtable = dic->getTableGlobal(g_headtable_name);
327   if (sys.m_headtable == 0)
328   {
329     int code = dic->getNdbError().code;
330     if (code != NoSuchTable) {
331       setError(code, __LINE__);
332       return -1;
333     }
334   }
335   else
336   {
337     NdbDictionary::Table tab;
338     make_headtable(tab);
339     if (check_table(*sys.m_headtable, tab) == -1)
340     {
341       setError(BadSysTables, __LINE__);
342       return -1;
343     }
344     sys.m_obj_cnt++;
345   }
346 
347   sys.m_sampletable = dic->getTableGlobal(g_sampletable_name);
348   if (sys.m_sampletable == 0)
349   {
350     int code = dic->getNdbError().code;
351     if (code != NoSuchTable) {
352       setError(code, __LINE__);
353       return -1;
354     }
355   }
356   else
357   {
358     NdbDictionary::Table tab;
359     make_sampletable(tab);
360     if (check_table(*sys.m_sampletable, tab) == -1)
361     {
362       setError(BadSysTables, __LINE__);
363       return -1;
364     }
365     sys.m_obj_cnt++;
366   }
367 
368   if (sys.m_sampletable != 0)
369   {
370     sys.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *sys.m_sampletable);
371     if (sys.m_sampleindex1 == 0)
372     {
373       int code = dic->getNdbError().code;
374       if (code != NoSuchIndex) {
375         setError(code, __LINE__);
376         return -1;
377       }
378     }
379     else
380     {
381       NdbDictionary::Index ind;
382       make_sampleindex1(ind);
383       if (check_index(*sys.m_sampleindex1, ind) == -1)
384       {
385         setError(BadSysTables, __LINE__);
386         return -1;
387       }
388       sys.m_obj_cnt++;
389     }
390   }
391 
392   return 0;
393 }
394 
395 int
create_systables(Ndb * ndb)396 NdbIndexStatImpl::create_systables(Ndb* ndb)
397 {
398   Sys sys(this, ndb);
399 
400   NdbDictionary::Dictionary* const dic = sys.m_dic;
401 
402   if (dic->beginSchemaTrans() == -1)
403   {
404     setError(dic->getNdbError().code, __LINE__);
405     return -1;
406   }
407 
408   if (get_systables(sys) == -1)
409     return -1;
410 
411   if (sys.m_obj_cnt == Sys::ObjCnt)
412   {
413     setError(HaveSysTables, __LINE__);
414     return -1;
415   }
416 
417   if (sys.m_obj_cnt != 0)
418   {
419     setError(BadSysTables, __LINE__);
420     return -1;
421   }
422 
423   {
424     NdbDictionary::Table tab;
425     if (make_headtable(tab) == -1)
426       return -1;
427     if (dic->createTable(tab) == -1)
428     {
429       setError(dic->getNdbError().code, __LINE__);
430       return -1;
431     }
432 
433     sys.m_headtable = dic->getTableGlobal(tab.getName());
434     if (sys.m_headtable == 0)
435     {
436       setError(dic->getNdbError().code, __LINE__);
437       return -1;
438     }
439   }
440 
441   {
442     NdbDictionary::Table tab;
443     if (make_sampletable(tab) == -1)
444       return -1;
445 
446 #ifdef VM_TRACE
447 #ifdef NDB_USE_GET_ENV
448     // test of schema trans
449     {
450       const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_CREATE", (char*)0, 0);
451       if (p != 0 && strchr("1Y", p[0]) != 0)
452       {
453         setError(9999, __LINE__);
454         return -1;
455       }
456     }
457 #endif
458 #endif
459 
460     if (dic->createTable(tab) == -1)
461     {
462       setError(dic->getNdbError().code, __LINE__);
463       return -1;
464     }
465 
466     sys.m_sampletable = dic->getTableGlobal(tab.getName());
467     if (sys.m_sampletable == 0)
468     {
469       setError(dic->getNdbError().code, __LINE__);
470       return -1;
471     }
472   }
473 
474   {
475     NdbDictionary::Index ind;
476     if (make_sampleindex1(ind) == -1)
477       return -1;
478     if (dic->createIndex(ind, *sys.m_sampletable) == -1)
479     {
480       setError(dic->getNdbError().code, __LINE__);
481       return -1;
482     }
483 
484     sys.m_sampleindex1 = dic->getIndexGlobal(ind.getName(), sys.m_sampletable->getName());
485     if (sys.m_sampleindex1 == 0)
486     {
487       setError(dic->getNdbError().code, __LINE__);
488       return -1;
489     }
490   }
491 
492   if (dic->endSchemaTrans() == -1)
493   {
494     setError(dic->getNdbError().code, __LINE__);
495     return -1;
496   }
497 
498   return 0;
499 }
500 
501 int
drop_systables(Ndb * ndb)502 NdbIndexStatImpl::drop_systables(Ndb* ndb)
503 {
504   Sys sys(this, ndb);
505 
506   NdbDictionary::Dictionary* const dic = sys.m_dic;
507 
508   if (dic->beginSchemaTrans() == -1)
509   {
510     setError(dic->getNdbError().code, __LINE__);
511     return -1;
512   }
513 
514   if (get_systables(sys) == -1 &&
515       m_error.code != BadSysTables)
516     return -1;
517 
518   if (sys.m_headtable != 0)
519   {
520     if (dic->dropTableGlobal(*sys.m_headtable) == -1)
521     {
522       setError(dic->getNdbError().code, __LINE__);
523       return -1;
524     }
525   }
526 
527   if (sys.m_sampletable != 0)
528   {
529 
530 #ifdef VM_TRACE
531 #ifdef NDB_USE_GET_ENV
532     // test of schema trans
533     {
534       const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_DROP", (char*)0, 0);
535       if (p != 0 && strchr("1Y", p[0]) != 0)
536       {
537         setError(9999, __LINE__);
538         return -1;
539       }
540     }
541 #endif
542 #endif
543 
544     if (dic->dropTableGlobal(*sys.m_sampletable) == -1)
545     {
546       setError(dic->getNdbError().code, __LINE__);
547       return -1;
548     }
549   }
550 
551   if (dic->endSchemaTrans() == -1)
552   {
553     setError(dic->getNdbError().code, __LINE__);
554     return -1;
555   }
556 
557   return 0;
558 }
559 
560 int
check_systables(Sys & sys)561 NdbIndexStatImpl::check_systables(Sys& sys)
562 {
563   if (get_systables(sys) == -1)
564     return -1;
565 
566   if (sys.m_obj_cnt == 0)
567   {
568     setError(NoSysTables, __LINE__);
569     return -1;
570   }
571 
572   if (sys.m_obj_cnt != Sys::ObjCnt)
573   {
574     setError(BadSysTables, __LINE__);
575     return -1;
576   }
577 
578   return 0;
579 }
580 
581 int
check_systables(Ndb * ndb)582 NdbIndexStatImpl::check_systables(Ndb* ndb)
583 {
584   Sys sys(this, ndb);
585 
586   if (check_systables(sys) == -1)
587     return -1;
588 
589   return 0;
590 }
591 
592 // operation context
593 
Con(NdbIndexStatImpl * impl,Head & head,Ndb * ndb)594 NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
595   m_impl(impl),
596   m_head(head),
597   m_ndb(ndb),
598   m_start()
599 {
600   head.m_indexId = m_impl->m_indexId;
601   head.m_indexVersion = m_impl->m_indexVersion;
602   m_dic = m_ndb->getDictionary();
603   m_headtable = 0;
604   m_sampletable = 0;
605   m_sampleindex1 = 0;
606   m_tx = 0;
607   m_op = 0;
608   m_scanop = 0;
609   m_cacheBuild = 0;
610   m_cachePos = 0;
611   m_cacheKeyOffset = 0;
612   m_cacheValueOffset = 0;
613 }
614 
~Con()615 NdbIndexStatImpl::Con::~Con()
616 {
617   if (m_cacheBuild != 0)
618   {
619     m_impl->free_cache(m_cacheBuild);
620     m_cacheBuild = 0;
621   }
622   if (m_tx != 0)
623   {
624     m_ndb->closeTransaction(m_tx);
625     m_tx = 0;
626   }
627   m_impl->sys_release(*this);
628 }
629 
630 int
startTransaction()631 NdbIndexStatImpl::Con::startTransaction()
632 {
633   assert(m_headtable != 0 && m_ndb != 0 && m_tx == 0);
634   Uint32 key[2] = {
635     m_head.m_indexId,
636     m_head.m_indexVersion
637   };
638   m_tx = m_ndb->startTransaction(m_headtable, (const char*)key, sizeof(key));
639   if (m_tx == 0)
640     return -1;
641   return 0;
642 }
643 
644 int
execute(bool commit)645 NdbIndexStatImpl::Con::execute(bool commit)
646 {
647   assert(m_tx != 0);
648   if (commit)
649   {
650     if (m_tx->execute(NdbTransaction::Commit) == -1)
651       return -1;
652     m_ndb->closeTransaction(m_tx);
653     m_tx = 0;
654   }
655   else
656   {
657     if (m_tx->execute(NdbTransaction::NoCommit) == -1)
658       return -1;
659   }
660   return 0;
661 }
662 
663 int
getNdbOperation()664 NdbIndexStatImpl::Con::getNdbOperation()
665 {
666   assert(m_headtable != 0);
667   assert(m_tx != 0 && m_op == 0);
668   m_op = m_tx->getNdbOperation(m_headtable);
669   if (m_op == 0)
670     return -1;
671   return 0;
672 }
673 
674 int
getNdbIndexScanOperation()675 NdbIndexStatImpl::Con::getNdbIndexScanOperation()
676 {
677   assert(m_sampletable != 0 && m_sampleindex1 != 0);
678   assert( m_tx != 0 && m_scanop == 0);
679   m_scanop = m_tx->getNdbIndexScanOperation(m_sampleindex1, m_sampletable);
680   if (m_scanop == 0)
681     return -1;
682   return 0;
683 }
684 
685 void
set_time()686 NdbIndexStatImpl::Con::set_time()
687 {
688   m_start = NdbTick_getCurrentTicks();
689 }
690 
691 Uint64
get_time()692 NdbIndexStatImpl::Con::get_time()
693 {
694   const NDB_TICKS stop = NdbTick_getCurrentTicks();
695   Uint64 us = NdbTick_Elapsed(m_start, stop).microSec();
696   return us;
697 }
698 
699 // index
700 
701 int
set_index(const NdbDictionary::Index & index,const NdbDictionary::Table & table)702 NdbIndexStatImpl::set_index(const NdbDictionary::Index& index,
703                             const NdbDictionary::Table& table)
704 {
705   if (m_indexSet)
706   {
707     setError(UsageError, __LINE__);
708     return -1;
709   }
710   m_indexId = index.getObjectId();
711   m_indexVersion = index.getObjectVersion();
712   m_tableId = table.getObjectId();
713   m_keyAttrs = index.getNoOfColumns();
714   m_valueAttrs = 1 + m_keyAttrs;
715   if (m_keyAttrs == 0)
716   {
717     setError(InternalError, __LINE__);
718     return -1;
719   }
720   if (m_keyAttrs > MaxKeyCount)
721   {
722     setError(InternalError, __LINE__);
723     return -1;
724   }
725 
726   // spec buffers
727   m_keySpecBuf = new NdbPack::Type [m_keyAttrs];
728   m_valueSpecBuf = new NdbPack::Type [m_valueAttrs];
729   if (m_keySpecBuf == 0 || m_valueSpecBuf == 0)
730   {
731     setError(NoMemError, __LINE__);
732     return -1;
733   }
734   m_keySpec.set_buf(m_keySpecBuf, m_keyAttrs);
735   m_valueSpec.set_buf(m_valueSpecBuf, m_valueAttrs);
736 
737   // index key spec
738   {
739     for (uint i = 0; i < m_keyAttrs; i++)
740     {
741       const NdbDictionary::Column* icol = index.getColumn(i);
742       if (icol == 0)
743       {
744         setError(UsageError, __LINE__);
745         return -1;
746       }
747       NdbPack::Type type (
748         icol->getType(),
749         icol->getSizeInBytes(),
750         icol->getNullable(),
751         icol->getCharset() != 0 ? icol->getCharset()->number : 0
752       );
753       if (m_keySpec.add(type) == -1)
754       {
755         setError(UsageError, __LINE__, m_keySpec.get_error_code());
756         return -1;
757       }
758     }
759   }
760   // stat values spec
761   {
762     NdbPack::Type type(NDB_TYPE_UNSIGNED, 4, false, 0);
763     // rir + rpk
764     if (m_valueSpec.add(type, m_valueAttrs) == -1)
765     {
766       setError(InternalError, __LINE__, m_valueSpec.get_error_code());
767       return -1;
768     }
769   }
770 
771   // data buffers (rounded to word)
772   m_keyDataBuf = new Uint8 [m_keyData.get_max_len4()];
773   m_valueDataBuf = new Uint8 [m_valueData.get_max_len4()];
774   if (m_keyDataBuf == 0 || m_valueDataBuf == 0)
775   {
776     setError(NoMemError, __LINE__);
777     return -1;
778   }
779   m_keyData.set_buf(m_keyDataBuf, m_keyData.get_max_len());
780   m_valueData.set_buf(m_valueDataBuf, m_valueData.get_max_len());
781 
782   m_indexSet = true;
783   return 0;
784 }
785 
786 void
reset_index()787 NdbIndexStatImpl::reset_index()
788 {
789   free_cache();
790   m_keySpec.reset();
791   m_valueSpec.reset();
792   delete [] m_keySpecBuf;
793   delete [] m_valueSpecBuf;
794   delete [] m_keyDataBuf;
795   delete [] m_valueDataBuf;
796   init();
797 }
798 
799 // head
800 
801 void
init_head(Head & head)802 NdbIndexStatImpl::init_head(Head& head)
803 {
804   head.m_found = -1;
805   head.m_eventType = -1;
806   head.m_indexId = 0;
807   head.m_indexVersion = 0;
808   head.m_tableId = 0;
809   head.m_fragCount = 0;
810   head.m_valueFormat = 0;
811   head.m_sampleVersion = 0;
812   head.m_loadTime = 0;
813   head.m_sampleCount = 0;
814   head.m_keyBytes = 0;
815 }
816 
817 // sys tables data
818 
819 int
sys_init(Con & con)820 NdbIndexStatImpl::sys_init(Con& con)
821 {
822   Ndb* ndb = con.m_ndb;
823   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
824   sys_release(con);
825 
826   con.m_headtable = dic->getTableGlobal(g_headtable_name);
827   if (con.m_headtable == 0)
828   {
829     setError(con, __LINE__);
830     mapError(ERR_NoSuchObject, NoSysTables);
831     return -1;
832   }
833   con.m_sampletable = dic->getTableGlobal(g_sampletable_name);
834   if (con.m_sampletable == 0)
835   {
836     setError(con, __LINE__);
837     mapError(ERR_NoSuchObject, NoSysTables);
838     return -1;
839   }
840   con.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *con.m_sampletable);
841   if (con.m_sampleindex1 == 0)
842   {
843     setError(con, __LINE__);
844     mapError(ERR_NoSuchObject, NoSysTables);
845     return -1;
846   }
847   return 0;
848 }
849 
850 void
sys_release(Con & con)851 NdbIndexStatImpl::sys_release(Con& con)
852 {
853   if (con.m_headtable != 0)
854   {
855     con.m_dic->removeTableGlobal(*con.m_headtable, false);
856     con.m_headtable = 0;
857   }
858   if (con.m_sampletable != 0)
859   {
860     con.m_dic->removeTableGlobal(*con.m_sampletable, false);
861     con.m_sampletable = 0;
862   }
863   if (con.m_sampleindex1 != 0)
864   {
865     con.m_dic->removeIndexGlobal(*con.m_sampleindex1, false);
866     con.m_sampleindex1 = 0;
867   }
868 }
869 
870 int
sys_read_head(Con & con,bool commit)871 NdbIndexStatImpl::sys_read_head(Con& con, bool commit)
872 {
873   Head& head = con.m_head;
874   head.m_sampleVersion = 0;
875   head.m_found = false;
876 
877   if (con.getNdbOperation() == -1)
878   {
879     setError(con, __LINE__);
880     return -1;
881   }
882   if (con.m_op->readTuple(NdbOperation::LM_Read) == -1)
883   {
884     setError(con, __LINE__);
885     return -1;
886   }
887   if (sys_head_setkey(con) == -1)
888     return -1;
889   if (sys_head_getvalue(con) == -1)
890     return -1;
891   if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
892   {
893     setError(con, __LINE__);
894     return -1;
895   }
896   if (con.execute(commit) == -1)
897   {
898     setError(con, __LINE__);
899     mapError(ERR_TupleNotFound, NoIndexStats);
900     return -1;
901   }
902   head.m_found = true;
903   if (head.m_sampleVersion == 0)
904   {
905     setError(NoIndexStats, __LINE__);
906     return -1;
907   }
908   return 0;
909 }
910 
911 int
sys_head_setkey(Con & con)912 NdbIndexStatImpl::sys_head_setkey(Con& con)
913 {
914   Head& head = con.m_head;
915   NdbOperation* op = con.m_op;
916   if (op->equal("index_id", (char*)&head.m_indexId) == -1)
917   {
918     setError(con, __LINE__);
919     return -1;
920   }
921   if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
922   {
923     setError(con, __LINE__);
924     return -1;
925   }
926   return 0;
927 }
928 
929 int
sys_head_getvalue(Con & con)930 NdbIndexStatImpl::sys_head_getvalue(Con& con)
931 {
932   Head& head = con.m_head;
933   NdbOperation* op = con.m_op;
934   if (op->getValue("table_id", (char*)&head.m_tableId) == 0)
935   {
936     setError(con, __LINE__);
937     return -1;
938   }
939   if (op->getValue("frag_count", (char*)&head.m_fragCount) == 0)
940   {
941     setError(con, __LINE__);
942     return -1;
943   }
944   if (op->getValue("value_format", (char*)&head.m_valueFormat) == 0)
945   {
946     setError(con, __LINE__);
947     return -1;
948   }
949   if (op->getValue("sample_version", (char*)&head.m_sampleVersion) == 0)
950   {
951     setError(con, __LINE__);
952     return -1;
953   }
954   if (op->getValue("load_time", (char*)&head.m_loadTime) == 0)
955   {
956     setError(con, __LINE__);
957     return -1;
958   }
959   if (op->getValue("sample_count", (char*)&head.m_sampleCount) == 0)
960   {
961     setError(con, __LINE__);
962     return -1;
963   }
964   if (op->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
965   {
966     setError(con, __LINE__);
967     return -1;
968   }
969   return 0;
970 }
971 
972 int
sys_sample_setkey(Con & con)973 NdbIndexStatImpl::sys_sample_setkey(Con& con)
974 {
975   Head& head = con.m_head;
976   NdbIndexScanOperation* op = con.m_scanop;
977   if (op->equal("index_id", (char*)&head.m_indexId) == -1)
978   {
979     setError(con, __LINE__);
980     return -1;
981   }
982   if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
983   {
984     setError(con, __LINE__);
985     return -1;
986   }
987   if (op->equal("sample_version", (char*)&head.m_sampleVersion) == -1)
988   {
989     setError(con, __LINE__);
990     return -1;
991   }
992   if (op->equal("stat_key", (char*)m_keyData.get_full_buf()) == -1)
993   {
994     setError(con, __LINE__);
995     return -1;
996   }
997   return 0;
998 }
999 
1000 int
sys_sample_getvalue(Con & con)1001 NdbIndexStatImpl::sys_sample_getvalue(Con& con)
1002 {
1003   NdbIndexScanOperation* op = con.m_scanop;
1004   if (op->getValue("stat_key", (char*)m_keyData.get_full_buf()) == 0)
1005   {
1006     setError(con, __LINE__);
1007     return -1;
1008   }
1009   if (op->getValue("stat_value", (char*)m_valueData.get_full_buf()) == 0)
1010   {
1011     setError(con, __LINE__);
1012     return -1;
1013   }
1014   return 0;
1015 }
1016 
1017 int
sys_sample_setbound(Con & con,int sv_bound)1018 NdbIndexStatImpl::sys_sample_setbound(Con& con, int sv_bound)
1019 {
1020   Head& head = con.m_head;
1021   NdbIndexScanOperation* op = con.m_scanop;
1022   const NdbIndexScanOperation::BoundType eq_bound =
1023     NdbIndexScanOperation::BoundEQ;
1024 
1025   if (op->setBound("index_id", eq_bound, &head.m_indexId) == -1)
1026   {
1027     setError(con, __LINE__);
1028     return -1;
1029   }
1030   if (op->setBound("index_version", eq_bound, &head.m_indexVersion) == -1)
1031   {
1032     setError(con, __LINE__);
1033     return -1;
1034   }
1035   if (sv_bound != -1)
1036   {
1037     if (op->setBound("sample_version", sv_bound, &head.m_sampleVersion) == -1)
1038     {
1039       setError(con, __LINE__);
1040       return -1;
1041     }
1042   }
1043   return 0;
1044 }
1045 
1046 // update, delete
1047 
1048 int
update_stat(Ndb * ndb,Head & head)1049 NdbIndexStatImpl::update_stat(Ndb* ndb, Head& head)
1050 {
1051   Con con(this, head, ndb);
1052   if (con.m_dic->updateIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1053   {
1054     setError(con, __LINE__);
1055     mapError(ERR_NoSuchObject, NoSysTables);
1056     return -1;
1057   }
1058   return 0;
1059 }
1060 
1061 int
delete_stat(Ndb * ndb,Head & head)1062 NdbIndexStatImpl::delete_stat(Ndb* ndb, Head& head)
1063 {
1064   Con con(this, head, ndb);
1065   if (con.m_dic->deleteIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1066   {
1067     setError(con, __LINE__);
1068     mapError(ERR_NoSuchObject, NoSysTables);
1069     return -1;
1070   }
1071   return 0;
1072 }
1073 
1074 // read
1075 
1076 int
read_head(Ndb * ndb,Head & head)1077 NdbIndexStatImpl::read_head(Ndb* ndb, Head& head)
1078 {
1079   Con con(this, head, ndb);
1080   if (!m_indexSet)
1081   {
1082     setError(UsageError, __LINE__);
1083     return -1;
1084   }
1085   if (sys_init(con) == -1)
1086     return -1;
1087   if (con.startTransaction() == -1)
1088   {
1089     setError(con, __LINE__);
1090     return -1;
1091   }
1092   if (sys_read_head(con, true) == -1)
1093     return -1;
1094   return 0;
1095 }
1096 
1097 int
read_stat(Ndb * ndb,Head & head)1098 NdbIndexStatImpl::read_stat(Ndb* ndb, Head& head)
1099 {
1100   Con con(this, head, ndb);
1101   con.set_time();
1102 
1103   if (read_start(con) == -1)
1104     return -1;
1105   if (save_start(con) == -1)
1106     return -1;
1107   while (1)
1108   {
1109     int ret = read_next(con);
1110     if (ret == -1)
1111       return -1;
1112     if (ret != 0)
1113       break;
1114     if (save_next(con) == -1)
1115       return -1;
1116   }
1117   if (read_commit(con) == -1)
1118     return -1;
1119 
1120   Uint64 save_time = con.get_time();
1121   con.set_time();
1122 
1123   if (save_commit(con) == -1)
1124     return -1;
1125   Uint64 sort_time = con.get_time();
1126 
1127   const Cache& c = *m_cacheBuild;
1128   c.m_save_time = save_time;
1129   c.m_sort_time = sort_time;
1130   return 0;
1131 }
1132 
1133 int
read_start(Con & con)1134 NdbIndexStatImpl::read_start(Con& con)
1135 {
1136   //UNUSED Head& head = con.m_head;
1137   if (!m_indexSet)
1138   {
1139     setError(UsageError, __LINE__);
1140     return -1;
1141   }
1142   if (sys_init(con) == -1)
1143     return -1;
1144   if (con.startTransaction() == -1)
1145   {
1146     setError(con, __LINE__);
1147     return -1;
1148   }
1149   if (sys_read_head(con, false) == -1)
1150     return -1;
1151   if (con.getNdbIndexScanOperation() == -1)
1152   {
1153     setError(con, __LINE__);
1154     return -1;
1155   }
1156   if (con.m_scanop->readTuples(NdbOperation::LM_CommittedRead, 0) == -1)
1157   {
1158     setError(con, __LINE__);
1159     return -1;
1160   }
1161   if (sys_sample_setbound(con, NdbIndexScanOperation::BoundEQ) == -1)
1162     return -1;
1163   if (sys_sample_getvalue(con) == -1)
1164     return -1;
1165   if (con.execute(false) == -1)
1166   {
1167     setError(con, __LINE__);
1168     return -1;
1169   }
1170   return 0;
1171 }
1172 
1173 int
read_next(Con & con)1174 NdbIndexStatImpl::read_next(Con& con)
1175 {
1176   m_keyData.reset();
1177   m_valueData.reset();
1178   int ret = con.m_scanop->nextResult();
1179   if (ret != 0)
1180   {
1181     if (ret == -1)
1182       setError(con, __LINE__);
1183     return ret;
1184   }
1185 
1186   /*
1187    * Key and value are raw data and little-endian.  Create the complete
1188    * NdbPack::Data instance and convert it to native-endian.
1189    */
1190   const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
1191   const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
1192 
1193   if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
1194   {
1195     setError(InternalError, __LINE__, m_keyData.get_error_code());
1196     return -1;
1197   }
1198   if (m_keyData.convert(to_endian) == -1)
1199   {
1200     setError(InternalError, __LINE__, m_keyData.get_error_code());
1201     return -1;
1202   }
1203   if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
1204   {
1205     setError(InternalError, __LINE__, m_valueData.get_error_code());
1206     return -1;
1207   }
1208   if (m_valueData.convert(to_endian) == -1)
1209   {
1210     setError(InternalError, __LINE__, m_valueData.get_error_code());
1211     return -1;
1212   }
1213   return 0;
1214 }
1215 
1216 int
read_commit(Con & con)1217 NdbIndexStatImpl::read_commit(Con& con)
1218 {
1219   if (con.execute(true) == -1)
1220   {
1221     setError(con, __LINE__);
1222     return -1;
1223   }
1224   return 0;
1225 }
1226 
1227 // save
1228 
1229 int
save_start(Con & con)1230 NdbIndexStatImpl::save_start(Con& con)
1231 {
1232   if (m_cacheBuild != 0)
1233   {
1234     free_cache(m_cacheBuild);
1235     m_cacheBuild = 0;
1236   }
1237   con.m_cacheBuild = new Cache;
1238   if (con.m_cacheBuild == 0)
1239   {
1240     setError(NoMemError, __LINE__);
1241     return -1;
1242   }
1243   new (con.m_cacheBuild) Cache;
1244   if (cache_init(con) == -1)
1245     return -1;
1246   return 0;
1247 }
1248 
1249 int
save_next(Con & con)1250 NdbIndexStatImpl::save_next(Con& con)
1251 {
1252   if (cache_insert(con) == -1)
1253     return -1;
1254   return 0;
1255 }
1256 
1257 int
save_commit(Con & con)1258 NdbIndexStatImpl::save_commit(Con& con)
1259 {
1260   if (cache_commit(con) == -1)
1261     return -1;
1262   m_cacheBuild = con.m_cacheBuild;
1263   con.m_cacheBuild = 0;
1264   return 0;
1265 }
1266 
1267 // cache inline
1268 
1269 inline uint
get_keyaddr(uint pos) const1270 NdbIndexStatImpl::Cache::get_keyaddr(uint pos) const
1271 {
1272   assert(pos < m_sampleCount);
1273   const uint offset = pos * m_addrLen;
1274   assert(offset + m_addrLen <= m_addrBytes);
1275   const Uint8* src = &m_addrArray[offset];
1276   uint addr = 0;
1277   switch (m_addrLen) {
1278   case 4:
1279     addr += src[3] << 24;
1280   case 3:
1281     addr += src[2] << 16;
1282   case 2:
1283     addr += src[1] << 8;
1284   case 1:
1285     addr += src[0] << 0;
1286     break;
1287   default:
1288     assert(false);
1289   }
1290   return addr;
1291 }
1292 
1293 inline void
set_keyaddr(uint pos,uint addr)1294 NdbIndexStatImpl::Cache::set_keyaddr(uint pos, uint addr)
1295 {
1296   assert(pos < m_sampleCount);
1297   const uint offset = pos * m_addrLen;
1298   assert(offset + m_addrLen <= m_addrBytes);
1299   Uint8* dst = &m_addrArray[offset];
1300   switch (m_addrLen) {
1301   case 4:
1302     dst[3] = (addr >> 24) & 0xFF;
1303   case 3:
1304     dst[2] = (addr >> 16) & 0xFF;
1305   case 2:
1306     dst[1] = (addr >> 8) & 0xFF;
1307   case 1:
1308     dst[0] = (addr >> 0) & 0xFF;
1309     break;
1310   default:
1311     assert(false);
1312   }
1313   assert(get_keyaddr(pos) == addr);
1314 }
1315 
1316 inline const Uint8*
get_keyptr(uint addr) const1317 NdbIndexStatImpl::Cache::get_keyptr(uint addr) const
1318 {
1319   assert(addr < m_keyBytes);
1320   return &m_keyArray[addr];
1321 }
1322 
1323 inline Uint8*
get_keyptr(uint addr)1324 NdbIndexStatImpl::Cache::get_keyptr(uint addr)
1325 {
1326   assert(addr < m_keyBytes);
1327   return &m_keyArray[addr];
1328 }
1329 
1330 inline const Uint8*
get_valueptr(uint pos) const1331 NdbIndexStatImpl::Cache::get_valueptr(uint pos) const
1332 {
1333   assert(pos < m_sampleCount);
1334   return &m_valueArray[pos * m_valueLen];
1335 }
1336 
1337 inline Uint8*
get_valueptr(uint pos)1338 NdbIndexStatImpl::Cache::get_valueptr(uint pos)
1339 {
1340   assert(pos < m_sampleCount);
1341   return &m_valueArray[pos * m_valueLen];
1342 }
1343 
1344 inline void
swap_entry(uint pos1,uint pos2)1345 NdbIndexStatImpl::Cache::swap_entry(uint pos1, uint pos2)
1346 {
1347   uint hold_addr;
1348   Uint8 hold_value[MaxValueBytes];
1349 
1350   hold_addr = get_keyaddr(pos1);
1351   memcpy(hold_value, get_valueptr(pos1), m_valueLen);
1352   set_keyaddr(pos1, get_keyaddr(pos2));
1353   memcpy(get_valueptr(pos1), get_valueptr(pos2), m_valueLen);
1354   set_keyaddr(pos2, hold_addr);
1355   memcpy(get_valueptr(pos2), hold_value, m_valueLen);
1356 }
1357 
1358 inline double
get_rir1(uint pos) const1359 NdbIndexStatImpl::Cache::get_rir1(uint pos) const
1360 {
1361   const Uint8* ptr = get_valueptr(pos);
1362   Uint32 n;
1363   memcpy(&n, &ptr[0], 4);
1364   double x = (double)n;
1365   return x;
1366 }
1367 
1368 inline double
get_rir1(uint pos1,uint pos2) const1369 NdbIndexStatImpl::Cache::get_rir1(uint pos1, uint pos2) const
1370 {
1371   assert(pos2 > pos1);
1372   return get_rir1(pos2) - get_rir1(pos1);
1373 }
1374 
1375 inline double
get_rir(uint pos) const1376 NdbIndexStatImpl::Cache::get_rir(uint pos) const
1377 {
1378   double x = (double)m_fragCount * get_rir1(pos);
1379   return x;
1380 }
1381 
1382 inline double
get_rir(uint pos1,uint pos2) const1383 NdbIndexStatImpl::Cache::get_rir(uint pos1, uint pos2) const
1384 {
1385   assert(pos2 > pos1);
1386   return get_rir(pos2) - get_rir(pos1);
1387 }
1388 
1389 inline double
get_unq1(uint pos,uint k) const1390 NdbIndexStatImpl::Cache::get_unq1(uint pos, uint k) const
1391 {
1392   assert(k < m_keyAttrs);
1393   const Uint8* ptr = get_valueptr(pos);
1394   Uint32 n;
1395   memcpy(&n, &ptr[4 + k * 4], 4);
1396   double x = (double)n;
1397   return x;
1398 }
1399 
1400 inline double
get_unq1(uint pos1,uint pos2,uint k) const1401 NdbIndexStatImpl::Cache::get_unq1(uint pos1, uint pos2, uint k) const
1402 {
1403   assert(pos2 > pos1);
1404   return get_unq1(pos2, k) - get_unq1(pos1, k);
1405 }
1406 
1407 static inline double
get_unqfactor(uint p,double r,double u)1408 get_unqfactor(uint p, double r, double u)
1409 {
1410   double ONE = (double)1.0;
1411   double d = (double)p;
1412   double f = ONE + (d - ONE) * ::pow(u / r, d - ONE);
1413   return f;
1414 }
1415 
1416 inline double
get_unq(uint pos,uint k) const1417 NdbIndexStatImpl::Cache::get_unq(uint pos, uint k) const
1418 {
1419   uint p = m_fragCount;
1420   double r = get_rir1(pos);
1421   double u = get_unq1(pos, k);
1422   double f = get_unqfactor(p, r, u);
1423   double x = f * u;
1424   return x;
1425 }
1426 
1427 inline double
get_unq(uint pos1,uint pos2,uint k) const1428 NdbIndexStatImpl::Cache::get_unq(uint pos1, uint pos2, uint k) const
1429 {
1430   uint p = m_fragCount;
1431   double r = get_rir1(pos1, pos2);
1432   double u = get_unq1(pos1, pos2, k);
1433   double f = get_unqfactor(p, r, u);
1434   double x = f * u;
1435   return x;
1436 }
1437 
1438 inline double
get_rpk(uint pos,uint k) const1439 NdbIndexStatImpl::Cache::get_rpk(uint pos, uint k) const
1440 {
1441   return get_rir(pos) / get_unq(pos, k);
1442 }
1443 
1444 inline double
get_rpk(uint pos1,uint pos2,uint k) const1445 NdbIndexStatImpl::Cache::get_rpk(uint pos1, uint pos2, uint k) const
1446 {
1447   assert(pos2 > pos1);
1448   return get_rir(pos1, pos2) / get_unq(pos1, pos2, k);
1449 }
1450 
1451 // cache
1452 
Cache()1453 NdbIndexStatImpl::Cache::Cache()
1454 {
1455   m_valid = false;
1456   m_keyAttrs = 0;
1457   m_valueAttrs = 0;
1458   m_fragCount = 0;
1459   m_sampleVersion = 0;
1460   m_sampleCount = 0;
1461   m_keyBytes = 0;
1462   m_valueLen = 0;
1463   m_valueBytes = 0;
1464   m_addrLen = 0;
1465   m_addrBytes = 0;
1466   m_addrArray = 0;
1467   m_keyArray = 0;
1468   m_valueArray = 0;
1469   m_nextClean = 0;
1470   // performance
1471   m_save_time = 0;
1472   m_sort_time = 0;
1473   // in use by query_stat
1474   m_ref_count = 0;
1475 }
1476 
1477 int
cache_init(Con & con)1478 NdbIndexStatImpl::cache_init(Con& con)
1479 {
1480   Cache& c = *con.m_cacheBuild;
1481   Head& head = con.m_head;
1482   Mem* mem = m_mem_handler;
1483 
1484   if (m_keyAttrs == 0)
1485   {
1486     setError(InternalError, __LINE__);
1487     return -1;
1488   }
1489   c.m_keyAttrs = m_keyAttrs;
1490   c.m_valueAttrs = m_valueAttrs;
1491   c.m_fragCount = head.m_fragCount;
1492   c.m_sampleCount = head.m_sampleCount;
1493   c.m_keyBytes = head.m_keyBytes;
1494   c.m_valueLen = 4 + c.m_keyAttrs * 4;
1495   c.m_valueBytes = c.m_sampleCount * c.m_valueLen;
1496   c.m_addrLen =
1497     c.m_keyBytes < (1 << 8) ? 1 :
1498     c.m_keyBytes < (1 << 16) ? 2 :
1499     c.m_keyBytes < (1 << 24) ? 3 : 4;
1500   c.m_addrBytes = c.m_sampleCount * c.m_addrLen;
1501 
1502   // wl4124_todo omit addrArray if keys have fixed size
1503   c.m_addrArray = (Uint8*)mem->mem_alloc(c.m_addrBytes);
1504   if (c.m_addrArray == 0)
1505   {
1506     setError(NoMemError, __LINE__);
1507     return -1;
1508   }
1509   c.m_keyArray = (Uint8*)mem->mem_alloc(c.m_keyBytes);
1510   if (c.m_keyArray == 0)
1511   {
1512     setError(NoMemError, __LINE__);
1513     return -1;
1514   }
1515   c.m_valueArray = (Uint8*)mem->mem_alloc(c.m_valueBytes);
1516   if (c.m_valueArray == 0)
1517   {
1518     setError(NoMemError, __LINE__);
1519     return -1;
1520   }
1521   return 0;
1522 }
1523 
1524 int
cache_insert(Con & con)1525 NdbIndexStatImpl::cache_insert(Con& con)
1526 {
1527   Cache& c = *con.m_cacheBuild;
1528 
1529   const uint nextPos = con.m_cachePos + 1;
1530   if (nextPos > c.m_sampleCount)
1531   {
1532     setError(InternalError, __LINE__);
1533     return -1;
1534   }
1535   assert(m_keyData.is_full());
1536   const uint keyLen = m_keyData.get_data_len();
1537   const uint nextKeyOffset = con.m_cacheKeyOffset + keyLen;
1538   if (nextKeyOffset > c.m_keyBytes)
1539   {
1540     setError(InternalError, __LINE__);
1541     return -1;
1542   }
1543   if (m_valueData.get_data_len() != c.m_valueLen)
1544   {
1545     setError(InternalError, __LINE__);
1546     return -1;
1547   }
1548   const uint nextValueOffset = con.m_cacheValueOffset + c.m_valueLen;
1549   if (nextValueOffset > c.m_valueBytes)
1550   {
1551     setError(InternalError, __LINE__);
1552     return -1;
1553   }
1554 
1555   c.set_keyaddr(con.m_cachePos, con.m_cacheKeyOffset);
1556   con.m_cachePos = nextPos;
1557 
1558   Uint8* cacheKeyPtr = &c.m_keyArray[con.m_cacheKeyOffset];
1559   const Uint8* keyPtr = (const Uint8*)m_keyData.get_data_buf();
1560   memcpy(cacheKeyPtr, keyPtr, keyLen);
1561   con.m_cacheKeyOffset = nextKeyOffset;
1562 
1563   Uint8* cacheValuePtr = &c.m_valueArray[con.m_cacheValueOffset];
1564   const Uint8* valuePtr = (const Uint8*)m_valueData.get_data_buf();
1565   memcpy(cacheValuePtr, valuePtr, c.m_valueLen);
1566   con.m_cacheValueOffset = nextValueOffset;
1567 
1568   // verify sanity
1569   {
1570     const Uint8* rir_ptr = &cacheValuePtr[0];
1571     Uint32 rir;
1572     memcpy(&rir, rir_ptr, 4);
1573     if (!(rir != 0))
1574     {
1575       setError(InvalidCache, __LINE__);
1576       return -1;
1577     }
1578     Uint32 unq_prev = 0;
1579     for (uint k = 0; k < c.m_keyAttrs; k++)
1580     {
1581       Uint8* unq_ptr = &cacheValuePtr[4 + k * 4];
1582       Uint32 unq;
1583       memcpy(&unq, unq_ptr, 4);
1584       if (!(unq != 0))
1585       {
1586         setError(InvalidCache, __LINE__);
1587         return -1;
1588       }
1589       if (!(rir >= unq))
1590       {
1591         setError(InvalidCache, __LINE__);
1592         return -1;
1593       }
1594       if (!(unq >= unq_prev))
1595       {
1596         setError(InvalidCache, __LINE__);
1597         return -1;
1598       }
1599       unq_prev = unq;
1600     }
1601   }
1602   return 0;
1603 }
1604 
1605 int
cache_commit(Con & con)1606 NdbIndexStatImpl::cache_commit(Con& con)
1607 {
1608   Cache& c = *con.m_cacheBuild;
1609   Head& head = con.m_head;
1610   if (con.m_cachePos != c.m_sampleCount)
1611   {
1612     setError(InternalError, __LINE__);
1613     return -1;
1614   }
1615   if (con.m_cacheKeyOffset != c.m_keyBytes)
1616   {
1617     setError(InternalError, __LINE__);
1618     return -1;
1619   }
1620   if (con.m_cacheValueOffset != c.m_valueBytes)
1621   {
1622     setError(InternalError, __LINE__);
1623     return -1;
1624   }
1625   c.m_sampleVersion = head.m_sampleVersion;
1626   if (cache_sort(c) == -1)
1627     return -1;
1628   if (cache_verify(c) == -1)
1629     return -1;
1630   c.m_valid = true;
1631   return 0;
1632 }
1633 
1634 int
cache_cmpaddr(const Cache & c,uint addr1,uint addr2) const1635 NdbIndexStatImpl::cache_cmpaddr(const Cache& c, uint addr1, uint addr2) const
1636 {
1637   const Uint8* key1 = c.get_keyptr(addr1);
1638   const Uint8* key2 = c.get_keyptr(addr2);
1639 
1640   NdbPack::DataC keyData1(m_keySpec, false);
1641   NdbPack::DataC keyData2(m_keySpec, false);
1642   keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1643   keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1644 
1645   Uint32 num_eq;
1646   int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1647   assert(addr1 == addr2 || res != 0);
1648   return res;
1649 }
1650 
1651 int
cache_cmppos(const Cache & c,uint pos1,uint pos2) const1652 NdbIndexStatImpl::cache_cmppos(const Cache& c, uint pos1, uint pos2) const
1653 {
1654   uint addr1 = c.get_keyaddr(pos1);
1655   uint addr2 = c.get_keyaddr(pos2);
1656   return cache_cmpaddr(c, addr1, addr2);
1657 }
1658 
1659 /*
1660  * Sort addr and value arrays via key values.  The samples were inserted
1661  * in key order and were read back via index scan so they may be nearly
1662  * ordered at first.  This is quicksort worst case so we do not use it.
1663  */
1664 int
cache_sort(Cache & c)1665 NdbIndexStatImpl::cache_sort(Cache& c)
1666 {
1667   if (c.m_sampleCount > 1)
1668     cache_hsort(c);
1669   return 0;
1670 }
1671 
1672 // insertion sort - expensive
1673 void
cache_isort(Cache & c)1674 NdbIndexStatImpl::cache_isort(Cache& c)
1675 {
1676   int n = c.m_sampleCount;
1677   for (int i = 1; i < n; i++)
1678   {
1679     for (int j = i - 1; j >= 0; j--)
1680     {
1681       int res = cache_cmppos(c, j, j + 1);
1682       if (res < 0)
1683         break;
1684       c.swap_entry(j, j + 1);
1685     }
1686   }
1687 }
1688 
1689 // heapsort
1690 void
cache_hsort(Cache & c)1691 NdbIndexStatImpl::cache_hsort(Cache& c)
1692 {
1693   int count = c.m_sampleCount;
1694   int i;
1695 
1696   // highest entry which can have children
1697   i = count / 2;
1698 
1699   // make into heap (binary tree where child < parent)
1700   while (i >= 0)
1701   {
1702     cache_hsort_sift(c, i, count);
1703     i--;
1704   }
1705 
1706   // verify is too expensive to enable under VM_TRACE
1707 
1708 #ifdef ndb_index_stat_hsort_verify
1709   cache_hsort_verify(c, count);
1710 #endif
1711 
1712   // sort
1713   i = count - 1;
1714   while (i > 0)
1715   {
1716     // move current max to proper position
1717     c.swap_entry(0, i);
1718 
1719     // restore heap property for the rest
1720     cache_hsort_sift(c, 0, i);
1721 #ifdef ndb_index_stat_hsort_verify
1722     cache_hsort_verify(c, i);
1723 #endif
1724     i--;
1725   }
1726 }
1727 
1728 void
cache_hsort_sift(Cache & c,int i,int count)1729 NdbIndexStatImpl::cache_hsort_sift(Cache& c, int i, int count)
1730 {
1731   int parent = i;
1732 
1733   while (1)
1734   {
1735     // left child if any
1736     int child = parent * 2 + 1;
1737     if (! (child < count))
1738       break;
1739 
1740     // replace by right child if bigger
1741     if (child + 1 < count && cache_cmppos(c, child, child + 1) < 0)
1742       child = child + 1;
1743 
1744     // done if both children are less than parent
1745     if (cache_cmppos(c, child, parent) < 0)
1746       break;
1747 
1748     c.swap_entry(parent, child);
1749     parent = child;
1750   }
1751 }
1752 
1753 #ifdef ndb_index_stat_hsort_verify
1754 // verify heap property
1755 void
cache_hsort_verify(Cache & c,int count)1756 NdbIndexStatImpl::cache_hsort_verify(Cache& c, int count)
1757 {
1758   for (int i = 0; i < count; i++)
1759   {
1760     int parent = i;
1761     int child1 = 2 * i + 1;
1762     int child2 = 2 * i + 2;
1763     if (child1 < count)
1764     {
1765       assert(cache_cmppos(c, child1, parent) < 0);
1766     }
1767     if (child2 < count)
1768     {
1769       assert(cache_cmppos(c, child2, parent) < 0);
1770     }
1771   }
1772 }
1773 #endif
1774 
1775 int
cache_verify(const Cache & c)1776 NdbIndexStatImpl::cache_verify(const Cache& c)
1777 {
1778   for (uint pos1 = 0; pos1 < c.m_sampleCount; pos1++)
1779   {
1780     const uint addr1 = c.get_keyaddr(pos1);
1781     const Uint8* key1 = c.get_keyptr(addr1);
1782     NdbPack::DataC keyData1(m_keySpec, false);
1783     keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1784     uint pos2 = pos1 + 1;
1785     if (pos2 < c.m_sampleCount)
1786     {
1787       const uint addr2 = c.get_keyaddr(pos2);
1788       const Uint8* key2 = c.get_keyptr(addr2);
1789       NdbPack::DataC keyData2(m_keySpec, false);
1790       keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1791       Uint32 num_eq;
1792       int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1793       if (!(res < 0))
1794       {
1795         setError(InvalidCache, __LINE__);
1796         return -1;
1797       }
1798       const Uint8* ptr1 = c.get_valueptr(pos1);
1799       const Uint8* ptr2 = c.get_valueptr(pos2);
1800       Uint32 rir1;
1801       Uint32 rir2;
1802       memcpy(&rir1, &ptr1[0], 4);
1803       memcpy(&rir2, &ptr2[0], 4);
1804       if (!(rir1 < rir2))
1805       {
1806         setError(InvalidCache, __LINE__);
1807         return -1;
1808       }
1809       for (uint k = 0; k < c.m_keyAttrs; k++)
1810       {
1811         Uint32 unq1;
1812         Uint32 unq2;
1813         memcpy(&unq1, &ptr1[4 + k * 4], 4);
1814         memcpy(&unq2, &ptr2[4 + k * 4], 4);
1815         if (!(unq1 <= unq2))
1816         {
1817           setError(InvalidCache, __LINE__);
1818           return -1;
1819         }
1820         if (k == c.m_keyAttrs - 1 && !(unq1 < unq2))
1821         {
1822           setError(InvalidCache, __LINE__);
1823           return -1;
1824         }
1825       }
1826     }
1827   }
1828   return 0;
1829 }
1830 
1831 void
move_cache()1832 NdbIndexStatImpl::move_cache()
1833 {
1834   Cache* cacheTmp = m_cacheQuery;
1835 
1836   NdbMutex_Lock(m_query_mutex);
1837   m_cacheQuery = m_cacheBuild;
1838   NdbMutex_Unlock(m_query_mutex);
1839   m_cacheBuild = 0;
1840 
1841   if (cacheTmp != 0)
1842   {
1843     cacheTmp->m_nextClean = m_cacheClean;
1844     m_cacheClean = cacheTmp;
1845   }
1846 }
1847 
1848 void
clean_cache()1849 NdbIndexStatImpl::clean_cache()
1850 {
1851   while (m_cacheClean != 0)
1852   {
1853     NdbIndexStatImpl::Cache* tmp = m_cacheClean;
1854     m_cacheClean = tmp->m_nextClean;
1855     free_cache(tmp);
1856   }
1857 }
1858 
1859 void
free_cache(Cache * c)1860 NdbIndexStatImpl::free_cache(Cache* c)
1861 {
1862   Mem* mem = m_mem_handler;
1863   mem->mem_free(c->m_addrArray);
1864   mem->mem_free(c->m_keyArray);
1865   mem->mem_free(c->m_valueArray);
1866   delete c;
1867 }
1868 
1869 void
free_cache()1870 NdbIndexStatImpl::free_cache()
1871 {
1872   // twice to move all to clean list
1873   move_cache();
1874   move_cache();
1875   clean_cache();
1876 }
1877 
1878 // cache dump
1879 
CacheIter(const NdbIndexStatImpl & impl)1880 NdbIndexStatImpl::CacheIter::CacheIter(const NdbIndexStatImpl& impl) :
1881   m_keyData(impl.m_keySpec, false),
1882   m_valueData(impl.m_valueSpec, false)
1883 {
1884   m_keyCount = impl.m_keyAttrs;
1885   m_sampleCount = 0;
1886   m_sampleIndex = 0;
1887 }
1888 
1889 int
dump_cache_start(CacheIter & iter)1890 NdbIndexStatImpl::dump_cache_start(CacheIter& iter)
1891 {
1892   if (m_cacheQuery == 0)
1893   {
1894     setError(UsageError, __LINE__);
1895     return -1;
1896   }
1897   const Cache& c = *m_cacheQuery;
1898   new (&iter) CacheIter(*this);
1899   iter.m_sampleCount = c.m_sampleCount;
1900   iter.m_sampleIndex = ~(Uint32)0;
1901   return 0;
1902 }
1903 
1904 bool
dump_cache_next(CacheIter & iter)1905 NdbIndexStatImpl::dump_cache_next(CacheIter& iter)
1906 {
1907   if (iter.m_sampleIndex == ~(Uint32)0)
1908     iter.m_sampleIndex = 0;
1909   else
1910     iter.m_sampleIndex++;
1911   if (iter.m_sampleIndex >= iter.m_sampleCount)
1912     return false;
1913   const Cache& c = *m_cacheQuery;
1914   const uint pos = iter.m_sampleIndex;
1915   const uint addr = c.get_keyaddr(pos);
1916   const Uint8* key = c.get_keyptr(addr);
1917   const Uint8* value = c.get_valueptr(pos);
1918   iter.m_keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
1919   iter.m_valueData.set_buf(value, c.m_valueLen, c.m_valueAttrs);
1920   return true;
1921 }
1922 
1923 // bound
1924 
1925 int
finalize_bound(Bound & bound)1926 NdbIndexStatImpl::finalize_bound(Bound& bound)
1927 {
1928   assert(bound.m_type == 0 || bound.m_type == 1);
1929   int side = 0;
1930   if (bound.m_data.get_cnt() == 0)
1931   {
1932     if (bound.m_strict != -1)
1933     {
1934       setError(UsageError, __LINE__);
1935       return -1;
1936     }
1937   }
1938   else
1939   {
1940     if (bound.m_strict == -1)
1941     {
1942       setError(UsageError, __LINE__);
1943       return -1;
1944     }
1945     if (bound.m_type == 0)
1946       side = bound.m_strict ? +1 : -1;
1947     else
1948       side = bound.m_strict ? -1 : +1;
1949   }
1950   if (bound.m_bound.finalize(side) == -1)
1951   {
1952     setError(UsageError, __LINE__);
1953     return -1;
1954   }
1955   return 0;
1956 }
1957 
1958 // range
1959 
1960 int
convert_range(Range & range,const NdbRecord * key_record,const NdbIndexScanOperation::IndexBound * ib)1961 NdbIndexStatImpl::convert_range(Range& range,
1962                                 const NdbRecord* key_record,
1963                                 const NdbIndexScanOperation::IndexBound* ib)
1964 {
1965   if (ib == 0)
1966     return 0;
1967   if (ib->low_key_count == 0 && ib->high_key_count == 0)
1968     return 0;
1969   for (uint j = 0; j <= 1; j++)
1970   {
1971     Bound& bound = j == 0 ? range.m_bound1 : range.m_bound2;
1972     bound.m_bound.reset();
1973     const char* key = j == 0 ? ib->low_key : ib->high_key;
1974     const uint key_count = j == 0 ? ib->low_key_count : ib->high_key_count;
1975     const bool inclusive = j == 0 ? ib->low_inclusive : ib->high_inclusive;
1976     Uint32 len_out;
1977     for (uint i = 0; i < key_count; i++)
1978     {
1979       const uint i2 = key_record->key_indexes[i];
1980       require(i2 < key_record->noOfColumns);
1981       const NdbRecord::Attr& attr = key_record->columns[i2];
1982       if (!attr.is_null(key))
1983       {
1984         const char* data = key + attr.offset;
1985         char buf[256];
1986         if (attr.flags & NdbRecord::IsMysqldShrinkVarchar)
1987         {
1988           Uint32 len;
1989           if (!attr.shrink_varchar(key, len, buf))
1990           {
1991             setError(InternalError, __LINE__);
1992             return -1;
1993           }
1994           data = buf;
1995         }
1996         if (bound.m_data.add(data, &len_out) == -1)
1997         {
1998           setError(InternalError, __LINE__, bound.m_data.get_error_code());
1999           return -1;
2000         }
2001       }
2002       else
2003       {
2004         if (bound.m_data.add_null(&len_out) == -1)
2005         {
2006           setError(InternalError, __LINE__, bound.m_data.get_error_code());
2007           return -1;
2008         }
2009       }
2010     }
2011     if (key_count > 0)
2012       bound.m_strict = !inclusive;
2013     if (finalize_bound(bound) == -1)
2014     {
2015       setError(InternalError, __LINE__);
2016       return -1;
2017     }
2018   }
2019 
2020 #ifdef VM_TRACE
2021 #ifdef NDB_USE_GET_ENV
2022   {
2023     const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_RANGE_ERROR", (char*)0, 0);
2024     if (p != 0 && strchr("1Y", p[0]) != 0)
2025     {
2026       if (rand() % 10 == 0)
2027       {
2028         setError(InternalError, __LINE__, NdbIndexStat::InternalError);
2029         return -1;
2030       }
2031     }
2032   }
2033 #endif
2034 #endif
2035 
2036   return 0;
2037 }
2038 
2039 // query
2040 
2041 // normalize values to >= 1.0
2042 void
query_normalize(const Cache & c,StatValue & value)2043 NdbIndexStatImpl::query_normalize(const Cache& c, StatValue& value)
2044 {
2045   if (!value.m_empty)
2046   {
2047     if (value.m_rir < 1.0)
2048       value.m_rir = 1.0;
2049     for (uint k = 0; k < c.m_keyAttrs; k++)
2050     {
2051       if (value.m_unq[k] < 1.0)
2052         value.m_unq[k] = 1.0;
2053     }
2054   }
2055   else
2056   {
2057     value.m_rir = 1.0;
2058     for (uint k = 0; k < c.m_keyAttrs; k++)
2059       value.m_unq[k] = 1.0;
2060   }
2061 }
2062 
2063 int
query_stat(const Range & range,Stat & stat)2064 NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
2065 {
2066   NdbMutex_Lock(m_query_mutex);
2067   if (unlikely(m_cacheQuery == 0))
2068   {
2069     NdbMutex_Unlock(m_query_mutex);
2070     setError(UsageError, __LINE__);
2071     return -1;
2072   }
2073   const Cache& c = *m_cacheQuery;
2074   if (unlikely(!c.m_valid))
2075   {
2076     NdbMutex_Unlock(m_query_mutex);
2077     setError(InvalidCache, __LINE__);
2078     return -1;
2079   }
2080   c.m_ref_count++;
2081   NdbMutex_Unlock(m_query_mutex);
2082 
2083 #ifdef VM_TRACE
2084 #ifdef NDB_USE_GET_ENV
2085   {
2086     const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_SLOW_QUERY", (char*)0, 0);
2087     if (p != 0 && strchr("1Y", p[0]) != 0)
2088     {
2089       int ms = 1 + rand() % 20;
2090       NdbSleep_MilliSleep(ms);
2091     }
2092   }
2093 #endif
2094 #endif
2095 
2096   // clients run these in parallel
2097   query_interpolate(c, range, stat);
2098   query_normalize(c, stat.m_value);
2099 
2100   NdbMutex_Lock(m_query_mutex);
2101   assert(c.m_ref_count != 0);
2102   c.m_ref_count--;
2103   NdbMutex_Unlock(m_query_mutex);
2104   return 0;
2105 }
2106 
2107 void
query_interpolate(const Cache & c,const Range & range,Stat & stat)2108 NdbIndexStatImpl::query_interpolate(const Cache& c,
2109                                     const Range& range,
2110                                     Stat& stat)
2111 {
2112   const uint keyAttrs = c.m_keyAttrs;
2113   StatValue& value = stat.m_value;
2114   value.m_empty = false;
2115   stat.m_rule[0] = "-";
2116   stat.m_rule[1] = "-";
2117   stat.m_rule[2] = "-";
2118 
2119   if (c.m_sampleCount == 0)
2120   {
2121     stat.m_rule[0] = "r1.1";
2122     value.m_empty = true;
2123     return;
2124   }
2125   const uint posMIN = 0;
2126   const uint posMAX = c.m_sampleCount - 1;
2127 
2128   const Bound& bound1 = range.m_bound1;
2129   const Bound& bound2 = range.m_bound2;
2130   if (bound1.m_data.is_empty() && bound2.m_data.is_empty())
2131   {
2132     stat.m_rule[0] = "r1.2";
2133     value.m_rir = c.get_rir(posMAX);
2134     for (uint k = 0; k < keyAttrs; k++)
2135       value.m_unq[k] = c.get_unq(posMAX, k);
2136     return;
2137   }
2138 
2139   StatBound& stat1 = stat.m_stat1;
2140   StatBound& stat2 = stat.m_stat2;
2141   if (!bound1.m_data.is_empty())
2142   {
2143     query_interpolate(c, bound1, stat1);
2144     query_normalize(c, stat1.m_value);
2145     stat.m_rule[1] = stat1.m_rule;
2146   }
2147   if (!bound2.m_data.is_empty())
2148   {
2149     query_interpolate(c, bound2, stat2);
2150     query_normalize(c, stat2.m_value);
2151     stat.m_rule[2] = stat2.m_rule;
2152   }
2153 
2154   const StatValue& value1 = stat1.m_value;
2155   const StatValue& value2 = stat2.m_value;
2156   const uint posL1 = stat1.m_pos - 1; // invalid if posH1 == posMIN
2157   const uint posH1 = stat1.m_pos;
2158   const uint posL2 = stat2.m_pos - 1; // invalid if posH2 == posMIN
2159   const uint posH2 = stat2.m_pos;
2160   const uint cnt1 = bound1.m_data.get_cnt();
2161   const uint cnt2 = bound2.m_data.get_cnt();
2162   const uint mincnt = min(cnt1, cnt2);
2163   Uint32 numEq = 0; // of bound1,bound2
2164 
2165   if (bound1.m_data.is_empty())
2166   {
2167     stat.m_rule[0] = "r1.3";
2168     value.m_rir = value2.m_rir;
2169     for (uint k = 0; k < keyAttrs; k++)
2170       value.m_unq[k] = value2.m_unq[k];
2171     return;
2172   }
2173   if (bound2.m_data.is_empty())
2174   {
2175     stat.m_rule[0] = "r1.4";
2176     value.m_rir = c.get_rir(posMAX) - value1.m_rir;
2177     for (uint k = 0; k < keyAttrs; k++)
2178       value.m_unq[k] = c.get_unq(posMAX, k) - value1.m_unq[k];
2179     return;
2180   }
2181   if (posH1 > posH2)
2182   {
2183     stat.m_rule[0] = "r1.5";
2184     value.m_empty = true;
2185     return;
2186   }
2187   // also returns number of equal initial components
2188   if (bound1.m_bound.cmp(bound2.m_bound, mincnt, numEq) >= 0)
2189   {
2190     stat.m_rule[0] = "r1.6";
2191     value.m_empty = true;
2192     return;
2193   }
2194   if (posH1 == posMIN)
2195   {
2196     stat.m_rule[0] = "r1.7";
2197     value.m_rir = value2.m_rir - value1.m_rir;
2198     for (uint k = 0; k < keyAttrs; k++)
2199       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2200     return;
2201   }
2202   if (posH2 == posMAX + 1)
2203   {
2204     stat.m_rule[0] = "r1.8";
2205     value.m_rir = value2.m_rir - value1.m_rir;
2206     for (uint k = 0; k <= keyAttrs; k++)
2207       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2208     return;
2209   }
2210   if (posL1 == posL2)
2211   {
2212     assert(posH1 == posH2);
2213     if (cnt1 == keyAttrs &&
2214         cnt2 == keyAttrs &&
2215         numEq == keyAttrs) {
2216       stat.m_rule[0] = "r2.1";
2217       assert(bound1.m_bound.get_side() == -1 &&
2218              bound2.m_bound.get_side() == +1);
2219       assert(stat1.m_numEqL < keyAttrs && stat2.m_numEqH < keyAttrs);
2220       value.m_rir = c.get_rpk(posL1, posH1, keyAttrs - 1);
2221       for (uint k = 0; k < keyAttrs; k++)
2222         value.m_unq[k] = value.m_rir / c.get_rpk(posL1, posH1, k);
2223       return;
2224     }
2225     if (numEq != 0)
2226     {
2227       stat.m_rule[0] = "r2.2";
2228       // skip for now
2229     }
2230     if (true)
2231     {
2232       stat.m_rule[0] = "r2.3";
2233       const double w = 0.5;
2234       value.m_rir = w * c.get_rir(posL1, posH1);
2235       for (uint k = 0; k < keyAttrs; k++)
2236         value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2237       return;
2238     }
2239   }
2240   if (posH1 == posL2)
2241   {
2242     if (cnt1 == keyAttrs &&
2243         cnt2 == keyAttrs &&
2244         numEq == keyAttrs) {
2245       stat.m_rule[0] = "r3.1";
2246       assert(bound1.m_bound.get_side() == -1 &&
2247              bound2.m_bound.get_side() == +1);
2248       assert(stat1.m_numEqH == keyAttrs && stat2.m_numEqL == keyAttrs);
2249       value.m_rir = value2.m_rir - value1.m_rir;
2250       for (uint k = 0; k < keyAttrs; k++)
2251         value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2252       return;
2253     }
2254     if (numEq != 0)
2255     {
2256       stat.m_rule[0] = "r3.2";
2257       // skip for now
2258     }
2259     if (true)
2260     {
2261       stat.m_rule[0] = "r3.3";
2262       const double w = 0.5;
2263       value.m_rir = w * c.get_rir(posL1, posH1);
2264       for (uint k = 0; k < keyAttrs; k++)
2265         value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2266       return;
2267     }
2268   }
2269   if (true)
2270   {
2271     stat.m_rule[0] = "r4";
2272     value.m_rir = value2.m_rir - value1.m_rir;
2273     for (uint k = 0; k < keyAttrs; k++)
2274       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2275     return;
2276   }
2277 }
2278 
2279 void
query_interpolate(const Cache & c,const Bound & bound,StatBound & stat)2280 NdbIndexStatImpl::query_interpolate(const Cache& c,
2281                                     const Bound& bound,
2282                                     StatBound& stat)
2283 {
2284   const uint keyAttrs = c.m_keyAttrs;
2285   StatValue& value = stat.m_value;
2286   value.m_empty = false;
2287   stat.m_rule = "-";
2288 
2289   query_search(c, bound, stat);
2290 
2291   const uint posMIN = 0;
2292   const uint posMAX = c.m_sampleCount - 1;
2293   const uint posL = stat.m_pos - 1; // invalid if posH == posMIN
2294   const uint posH = stat.m_pos;
2295   const uint cnt = bound.m_data.get_cnt();
2296   const int side = bound.m_bound.get_side();
2297 
2298   if (posH == posMIN)
2299   {
2300     if (cnt == keyAttrs &&
2301         cnt == stat.m_numEqH) {
2302       stat.m_rule = "b1.1";
2303       assert(side == -1);
2304       value.m_rir = c.get_rir(posMIN) - c.get_rpk(posMIN, keyAttrs - 1);
2305       for (uint k = 0; k < keyAttrs; k++)
2306         value.m_unq[k] = c.get_unq(posMIN, k) - 1;
2307       return;
2308     }
2309     if (true)
2310     {
2311       stat.m_rule = "b1.2";
2312       value.m_empty = true;
2313       return;
2314     }
2315   }
2316   if (posH == posMAX + 1)
2317   {
2318     stat.m_rule = "b2";
2319     value.m_rir = c.get_rir(posMAX);
2320     for (uint k = 0; k < keyAttrs; k++)
2321       value.m_unq[k] = c.get_unq(posMAX, k);
2322     return;
2323   }
2324   if (cnt == keyAttrs &&
2325       cnt == stat.m_numEqL) {
2326     stat.m_rule = "b3.1";
2327     assert(side == +1);
2328     value.m_rir = c.get_rir(posL);
2329     for (uint k = 0; k < keyAttrs; k++)
2330       value.m_unq[k] = c.get_unq(posL, k);
2331     return;
2332   }
2333   if (cnt == keyAttrs &&
2334       cnt == stat.m_numEqH &&
2335       side == +1) {
2336     stat.m_rule = "b3.2";
2337     value.m_rir = c.get_rir(posH);
2338     for (uint k = 0; k < keyAttrs; k++)
2339       value.m_unq[k] = c.get_unq(posH, k);
2340     return;
2341   }
2342   if (cnt == keyAttrs &&
2343       cnt == stat.m_numEqH &&
2344       side == -1) {
2345     stat.m_rule = "b3.3";
2346     const double u = c.get_unq(posL, posH, keyAttrs - 1);
2347     const double wL = 1.0 / u;
2348     const double wH = 1.0 - wL;
2349     value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2350     for (uint k = 0; k < keyAttrs; k++)
2351       value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2352     return;
2353   }
2354   if (true)
2355   {
2356     stat.m_rule = "b4";
2357     const double wL = 0.5;
2358     const double wH = 0.5;
2359     value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2360     for (uint k = 0; k < keyAttrs; k++)
2361       value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2362     return;
2363   }
2364 }
2365 
2366 void
query_search(const Cache & c,const Bound & bound,StatBound & stat)2367 NdbIndexStatImpl::query_search(const Cache& c,
2368                                const Bound& bound,
2369                                StatBound& stat)
2370 {
2371   assert(c.m_sampleCount > 0);
2372   assert(!bound.m_data.is_empty());
2373   Uint32 numEq;
2374 
2375   int lo = -1;
2376   int hi = c.m_sampleCount;
2377   while (hi - lo > 1)
2378   {
2379     int j = (hi + lo) / 2;
2380     assert(lo < j && j < hi);
2381     int res = query_keycmp(c, bound, j, numEq);
2382     if (res < 0)
2383       lo = j;
2384     else if (res > 0)
2385       hi = j;
2386     else
2387     {
2388       assert(false);
2389       return;
2390     }
2391   }
2392   assert(hi - lo == 1);
2393   stat.m_pos = hi;
2394 
2395   if (stat.m_pos > 0)
2396   {
2397     (void)query_keycmp(c, bound, stat.m_pos - 1, stat.m_numEqL);
2398   }
2399   if (stat.m_pos < c.m_sampleCount)
2400   {
2401     (void)query_keycmp(c, bound, stat.m_pos, stat.m_numEqH);
2402   }
2403 }
2404 
2405 // return <0/>0 for key before/after bound
2406 int
query_keycmp(const Cache & c,const Bound & bound,uint pos,Uint32 & numEq)2407 NdbIndexStatImpl::query_keycmp(const Cache& c,
2408                                const Bound& bound,
2409                                uint pos, Uint32& numEq)
2410 {
2411   const uint addr = c.get_keyaddr(pos);
2412   const Uint8* key = c.get_keyptr(addr);
2413   NdbPack::DataC keyData(m_keySpec, false);
2414   keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
2415   // reverse result for key vs bound
2416   Uint32 cnt = bound.m_bound.get_data().get_cnt();
2417   int res = (-1) * bound.m_bound.cmp(keyData, cnt, numEq);
2418   return res;
2419 }
2420 
2421 // events and polling
2422 
2423 int
create_sysevents(Ndb * ndb)2424 NdbIndexStatImpl::create_sysevents(Ndb* ndb)
2425 {
2426   Sys sys(this, ndb);
2427   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2428 
2429   if (check_systables(sys) == -1)
2430     return -1;
2431   const NdbDictionary::Table* tab = sys.m_headtable;
2432   require(tab != 0);
2433 
2434   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2435   NdbDictionary::Event ev(evname, *tab);
2436   ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
2437   ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
2438   ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
2439   for (int i = 0; i < tab->getNoOfColumns(); i++)
2440     ev.addEventColumn(i);
2441   ev.setReport(NdbDictionary::Event::ER_UPDATED);
2442 
2443   if (dic->createEvent(ev) == -1)
2444   {
2445     setError(dic->getNdbError().code, __LINE__);
2446     return -1;
2447   }
2448   return 0;
2449 }
2450 
2451 int
drop_sysevents(Ndb * ndb)2452 NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
2453 {
2454   Sys sys(this, ndb);
2455   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2456 
2457   if (check_systables(sys) == -1)
2458     return -1;
2459 
2460   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2461   if (dic->dropEvent(evname) == -1)
2462   {
2463     int code = dic->getNdbError().code;
2464     if (code != 4710)
2465     {
2466       setError(dic->getNdbError().code, __LINE__);
2467       return -1;
2468     }
2469   }
2470   return 0;
2471 }
2472 
2473 int
check_sysevents(Ndb * ndb)2474 NdbIndexStatImpl::check_sysevents(Ndb* ndb)
2475 {
2476   Sys sys(this, ndb);
2477   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2478 
2479   if (check_systables(sys) == -1)
2480     return -1;
2481 
2482   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2483   const NdbDictionary::Event* ev = dic->getEvent(evname);
2484   if (ev == 0)
2485   {
2486     setError(dic->getNdbError().code, __LINE__);
2487     return -1;
2488   }
2489   delete ev; // getEvent() creates new instance
2490   return 0;
2491 }
2492 
2493 int
create_listener(Ndb * ndb)2494 NdbIndexStatImpl::create_listener(Ndb* ndb)
2495 {
2496   if (m_eventOp != 0)
2497   {
2498     setError(UsageError, __LINE__);
2499     return -1;
2500   }
2501   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2502   m_eventOp = ndb->createEventOperation(evname);
2503   if (m_eventOp == 0)
2504   {
2505     setError(ndb->getNdbError().code, __LINE__);
2506     return -1;
2507   }
2508 
2509   // all columns are non-nullable
2510   Head& head = m_facadeHead;
2511   if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
2512       m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
2513       m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
2514       m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
2515       m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
2516       m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
2517       m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
2518       m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
2519       m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
2520   {
2521     setError(m_eventOp->getNdbError().code, __LINE__);
2522     return -1;
2523   }
2524   // wl4124_todo why this
2525   static Head xxx;
2526   if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
2527       m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
2528       m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
2529       m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
2530       m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
2531       m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
2532       m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
2533       m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
2534       m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
2535   {
2536     setError(m_eventOp->getNdbError().code, __LINE__);
2537     return -1;
2538   }
2539   return 0;
2540 }
2541 
2542 int
execute_listener(Ndb * ndb)2543 NdbIndexStatImpl::execute_listener(Ndb* ndb)
2544 {
2545   if (m_eventOp == 0)
2546   {
2547     setError(UsageError, __LINE__);
2548     return -1;
2549   }
2550   if (m_eventOp->execute() == -1)
2551   {
2552     setError(m_eventOp->getNdbError().code, __LINE__);
2553     return -1;
2554   }
2555   return 0;
2556 }
2557 
2558 int
poll_listener(Ndb * ndb,int max_wait_ms)2559 NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
2560 {
2561   int ret;
2562   if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
2563   {
2564     setError(ndb->getNdbError().code, __LINE__);
2565     return -1;
2566   }
2567   return (ret == 0 ? 0 : 1);
2568 }
2569 
2570 int
next_listener(Ndb * ndb)2571 NdbIndexStatImpl::next_listener(Ndb* ndb)
2572 {
2573   NdbEventOperation* op = ndb->nextEvent();
2574   if (op == 0)
2575     return 0;
2576 
2577   Head& head = m_facadeHead;
2578   head.m_eventType = (int)op->getEventType();
2579   return 1;
2580 }
2581 
2582 int
drop_listener(Ndb * ndb)2583 NdbIndexStatImpl::drop_listener(Ndb* ndb)
2584 {
2585   if (m_eventOp == 0)
2586   {
2587     setError(UsageError, __LINE__);
2588     return -1;
2589   }
2590   if (ndb->dropEventOperation(m_eventOp) != 0)
2591   {
2592     // NOTE! dropEventoperation always return 0
2593     setError(ndb->getNdbError().code, __LINE__);
2594     return -1;
2595   }
2596   m_eventOp = 0;
2597   return 0;
2598 }
2599 
2600 // mem alloc - default impl
2601 
MemDefault()2602 NdbIndexStatImpl::MemDefault::MemDefault()
2603 {
2604 }
2605 
~MemDefault()2606 NdbIndexStatImpl::MemDefault::~MemDefault()
2607 {
2608 }
2609 
2610 void*
mem_alloc(UintPtr size)2611 NdbIndexStatImpl::MemDefault::mem_alloc(UintPtr size)
2612 {
2613   void* ptr = malloc(size);
2614   return ptr;
2615 }
2616 
2617 void
mem_free(void * ptr)2618 NdbIndexStatImpl::MemDefault::mem_free(void* ptr)
2619 {
2620   if (ptr != 0)
2621     free(ptr);
2622 }
2623 
2624 // error
2625 
2626 void
setError(int code,int line,int extra)2627 NdbIndexStatImpl::setError(int code, int line, int extra)
2628 {
2629   if (code == 0)
2630     code = InternalError;
2631   m_error.code = code;
2632   m_error.line = line;
2633   m_error.extra = extra;
2634 #ifdef VM_TRACE
2635 #ifdef NDB_USE_GET_ENV
2636   const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_ON_ERROR", (char*)0, 0);
2637   if (p != 0 && strchr("1Y", p[0]) != 0)
2638     abort();
2639 #endif
2640 #endif
2641 }
2642 
2643 void
setError(const Con & con,int line)2644 NdbIndexStatImpl::setError(const Con& con, int line)
2645 {
2646   int code = 0;
2647   if (code == 0 && con.m_op != 0)
2648   {
2649     code = con.m_op->getNdbError().code;
2650   }
2651   if (code == 0 && con.m_scanop != 0)
2652   {
2653     code = con.m_scanop->getNdbError().code;
2654   }
2655   if (code == 0 && con.m_tx != 0)
2656   {
2657     code = con.m_tx->getNdbError().code;
2658   }
2659   if (code == 0 && con.m_dic != 0)
2660   {
2661     code = con.m_dic->getNdbError().code;
2662   }
2663   if (code == 0 && con.m_ndb != 0)
2664   {
2665     code = con.m_ndb->getNdbError().code;
2666   }
2667   setError(code, line);
2668 }
2669 
2670 void
mapError(const int * map,int code)2671 NdbIndexStatImpl::mapError(const int* map, int code)
2672 {
2673   while (*map != 0)
2674   {
2675     if (m_error.code == *map) {
2676       m_error.code = code;
2677       break;
2678     }
2679     map++;
2680   }
2681 }
2682