1 /*
2    Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 #include <algorithm>
25 
26 #include <ndb_global.h>
27 #include <Ndb.hpp>
28 #include <NdbTransaction.hpp>
29 #include <NdbRecAttr.hpp>
30 #include <NdbOut.hpp>
31 #include <NdbEnv.h>
32 #include <Bitmask.hpp>
33 #include <NdbSqlUtil.hpp>
34 #include <NdbRecord.hpp>
35 #include <NdbEventOperation.hpp>
36 #include <NdbSleep.h>
37 #include "NdbIndexStatImpl.hpp"
38 #include "m_ctype.h"
39 
40 static const char* const g_headtable_name = NDB_INDEX_STAT_HEAD_TABLE;
41 static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
42 static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
43 
44 static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
45 static const int ERR_TupleNotFound[] = { 626, 0 };
46 
NdbIndexStatImpl(NdbIndexStat & facade)47 NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
48   NdbIndexStat(*this),
49   m_facade(&facade),
50   m_keyData(m_keySpec, false, 2),
51   m_valueData(m_valueSpec, false, 2)
52 {
53   init();
54   m_query_mutex = NdbMutex_Create();
55   assert(m_query_mutex != 0);
56   m_eventOp = 0;
57   m_mem_handler = &c_mem_default_handler;
58 }
59 
60 void
init()61 NdbIndexStatImpl::init()
62 {
63   m_indexSet = false;
64   m_indexId = 0;
65   m_indexVersion = 0;
66   m_tableId = 0;
67   m_keyAttrs = 0;
68   m_valueAttrs = 0;
69   // buffers
70   m_keySpecBuf = 0;
71   m_valueSpecBuf = 0;
72   m_keyDataBuf = 0;
73   m_valueDataBuf = 0;
74   // cache
75   m_cacheBuild = 0;
76   m_cacheQuery = 0;
77   m_cacheClean = 0;
78   // head
79   init_head(m_facadeHead);
80 }
81 
~NdbIndexStatImpl()82 NdbIndexStatImpl::~NdbIndexStatImpl()
83 {
84   reset_index();
85   if (m_query_mutex != 0)
86   {
87     NdbMutex_Destroy(m_query_mutex);
88     m_query_mutex = 0;
89   }
90 }
91 
92 // sys tables meta
93 
Sys(NdbIndexStatImpl * impl,Ndb * ndb)94 NdbIndexStatImpl::Sys::Sys(NdbIndexStatImpl* impl, Ndb* ndb) :
95   m_impl(impl),
96   m_ndb(ndb)
97 {
98   m_dic = m_ndb->getDictionary();
99   m_headtable = 0;
100   m_sampletable = 0;
101   m_sampleindex1 = 0;
102   m_obj_cnt = 0;
103 }
104 
~Sys()105 NdbIndexStatImpl::Sys::~Sys()
106 {
107   m_impl->sys_release(*this);
108 }
109 
110 void
sys_release(Sys & sys)111 NdbIndexStatImpl::sys_release(Sys& sys)
112 {
113   // close schema trans if any exists
114   NdbDictionary::Dictionary* const dic = sys.m_dic;
115   (void)dic->endSchemaTrans(NdbDictionary::Dictionary::SchemaTransAbort);
116 
117   if (sys.m_headtable != 0)
118   {
119     sys.m_dic->removeTableGlobal(*sys.m_headtable, false);
120     sys.m_headtable = 0;
121   }
122   if (sys.m_sampletable != 0)
123   {
124     sys.m_dic->removeTableGlobal(*sys.m_sampletable, false);
125     sys.m_sampletable = 0;
126   }
127   if (sys.m_sampleindex1 != 0)
128   {
129     sys.m_dic->removeIndexGlobal(*sys.m_sampleindex1, false);
130     sys.m_sampleindex1 = 0;
131   }
132 }
133 
134 int
make_headtable(NdbDictionary::Table & tab)135 NdbIndexStatImpl::make_headtable(NdbDictionary::Table& tab)
136 {
137   tab.setName(g_headtable_name);
138   tab.setLogging(true);
139   int ret;
140   // Creating a table in NDB using a compiled in frm blob
141   // which is already compressed and has got proper version 1 header
142   ret = tab.setFrm(g_ndb_index_stat_head_frm_data,
143                    g_ndb_index_stat_head_frm_len);
144   if (ret != 0)
145   {
146     setError(ret, __LINE__);
147     return -1;
148   }
149   // key must be first
150   {
151     NdbDictionary::Column col("index_id");
152     col.setType(NdbDictionary::Column::Unsigned);
153     col.setPrimaryKey(true);
154     tab.addColumn(col);
155   }
156   {
157     NdbDictionary::Column col("index_version");
158     col.setType(NdbDictionary::Column::Unsigned);
159     col.setPrimaryKey(true);
160     tab.addColumn(col);
161   }
162   // table
163   {
164     NdbDictionary::Column col("table_id");
165     col.setType(NdbDictionary::Column::Unsigned);
166     col.setNullable(false);
167     tab.addColumn(col);
168   }
169   {
170     NdbDictionary::Column col("frag_count");
171     col.setType(NdbDictionary::Column::Unsigned);
172     col.setNullable(false);
173     tab.addColumn(col);
174   }
175   // current sample
176   {
177     NdbDictionary::Column col("value_format");
178     col.setType(NdbDictionary::Column::Unsigned);
179     col.setNullable(false);
180     tab.addColumn(col);
181   }
182   {
183     NdbDictionary::Column col("sample_version");
184     col.setType(NdbDictionary::Column::Unsigned);
185     col.setNullable(false);
186     tab.addColumn(col);
187   }
188   {
189     NdbDictionary::Column col("load_time");
190     col.setType(NdbDictionary::Column::Unsigned);
191     col.setNullable(false);
192     tab.addColumn(col);
193   }
194   {
195     NdbDictionary::Column col("sample_count");
196     col.setType(NdbDictionary::Column::Unsigned);
197     col.setNullable(false);
198     tab.addColumn(col);
199   }
200   {
201     NdbDictionary::Column col("key_bytes");
202     col.setType(NdbDictionary::Column::Unsigned);
203     col.setNullable(false);
204     tab.addColumn(col);
205   }
206   NdbError error;
207   if (tab.validate(error) == -1) {
208     setError(error.code, __LINE__);
209     return -1;
210   }
211   return 0;
212 }
213 
214 int
make_sampletable(NdbDictionary::Table & tab)215 NdbIndexStatImpl::make_sampletable(NdbDictionary::Table& tab)
216 {
217   tab.setName(g_sampletable_name);
218   tab.setLogging(true);
219   int ret;
220   // Creating a table in NDB using a compiled in frm blob
221   // which is already compressed and has got proper version 1 header
222   ret = tab.setFrm(g_ndb_index_stat_sample_frm_data,
223                    g_ndb_index_stat_sample_frm_len);
224   if (ret != 0)
225   {
226     setError(ret, __LINE__);
227     return -1;
228   }
229   // key must be first
230   {
231     NdbDictionary::Column col("index_id");
232     col.setType(NdbDictionary::Column::Unsigned);
233     col.setPrimaryKey(true);
234     tab.addColumn(col);
235   }
236   {
237     NdbDictionary::Column col("index_version");
238     col.setType(NdbDictionary::Column::Unsigned);
239     col.setPrimaryKey(true);
240     tab.addColumn(col);
241   }
242   {
243     NdbDictionary::Column col("sample_version");
244     col.setType(NdbDictionary::Column::Unsigned);
245     col.setPrimaryKey(true);
246     tab.addColumn(col);
247   }
248   {
249     NdbDictionary::Column col("stat_key");
250     col.setType(NdbDictionary::Column::Longvarbinary);
251     col.setPrimaryKey(true);
252     col.setLength(MaxKeyBytes);
253     tab.addColumn(col);
254   }
255   // value
256   {
257     NdbDictionary::Column col("stat_value");
258     col.setType(NdbDictionary::Column::Longvarbinary);
259     col.setNullable(false);
260     col.setLength(MaxValueCBytes);
261     tab.addColumn(col);
262   }
263   NdbError error;
264   if (tab.validate(error) == -1) {
265     setError(error.code, __LINE__);
266     return -1;
267   }
268   return 0;
269 }
270 
271 int
make_sampleindex1(NdbDictionary::Index & ind)272 NdbIndexStatImpl::make_sampleindex1(NdbDictionary::Index& ind)
273 {
274   ind.setTable(g_sampletable_name);
275   ind.setName(g_sampleindex1_name);
276   ind.setType(NdbDictionary::Index::OrderedIndex);
277   ind.setLogging(false);
278   ind.addColumnName("index_id");
279   ind.addColumnName("index_version");
280   ind.addColumnName("sample_version");
281   return 0;
282 }
283 
284 int
check_table(const NdbDictionary::Table & tab1,const NdbDictionary::Table & tab2)285 NdbIndexStatImpl::check_table(const NdbDictionary::Table& tab1,
286                               const NdbDictionary::Table& tab2)
287 {
288   if (tab1.getNoOfColumns() != tab2.getNoOfColumns())
289     return -1;
290   const uint n = tab1.getNoOfColumns();
291   for (uint i = 0; i < n; i++)
292   {
293     const NdbDictionary::Column* col1 = tab1.getColumn(i);
294     const NdbDictionary::Column* col2 = tab2.getColumn(i);
295     require(col1 != 0 && col2 != 0);
296     if (!col1->equal(*col2))
297       return -1;
298   }
299   return 0;
300 }
301 
302 int
check_index(const NdbDictionary::Index & ind1,const NdbDictionary::Index & ind2)303 NdbIndexStatImpl::check_index(const NdbDictionary::Index& ind1,
304                               const NdbDictionary::Index& ind2)
305 {
306   if (ind1.getNoOfColumns() != ind2.getNoOfColumns())
307     return -1;
308   const uint n = ind1.getNoOfColumns();
309   for (uint i = 0; i < n; i++)
310   {
311     const NdbDictionary::Column* col1 = ind1.getColumn(i);
312     const NdbDictionary::Column* col2 = ind2.getColumn(i);
313     require(col1 != 0 && col2 != 0);
314     // getColumnNo() does not work on non-retrieved
315     if (!col1->equal(*col2))
316       return -1;
317   }
318   return 0;
319 }
320 
321 int
get_systables(Sys & sys)322 NdbIndexStatImpl::get_systables(Sys& sys)
323 {
324   Ndb* ndb = sys.m_ndb;
325   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
326   const int NoSuchTable = 723;
327   const int NoSuchIndex = 4243;
328 
329   sys.m_headtable = dic->getTableGlobal(g_headtable_name);
330   if (sys.m_headtable == 0)
331   {
332     int code = dic->getNdbError().code;
333     if (code != NoSuchTable) {
334       setError(code, __LINE__);
335       return -1;
336     }
337   }
338   else
339   {
340     NdbDictionary::Table tab;
341     make_headtable(tab);
342     if (check_table(*sys.m_headtable, tab) == -1)
343     {
344       setError(BadSysTables, __LINE__);
345       return -1;
346     }
347     sys.m_obj_cnt++;
348   }
349 
350   sys.m_sampletable = dic->getTableGlobal(g_sampletable_name);
351   if (sys.m_sampletable == 0)
352   {
353     int code = dic->getNdbError().code;
354     if (code != NoSuchTable) {
355       setError(code, __LINE__);
356       return -1;
357     }
358   }
359   else
360   {
361     NdbDictionary::Table tab;
362     make_sampletable(tab);
363     if (check_table(*sys.m_sampletable, tab) == -1)
364     {
365       setError(BadSysTables, __LINE__);
366       return -1;
367     }
368     sys.m_obj_cnt++;
369   }
370 
371   if (sys.m_sampletable != 0)
372   {
373     sys.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *sys.m_sampletable);
374     if (sys.m_sampleindex1 == 0)
375     {
376       int code = dic->getNdbError().code;
377       if (code != NoSuchIndex) {
378         setError(code, __LINE__);
379         return -1;
380       }
381     }
382     else
383     {
384       NdbDictionary::Index ind;
385       make_sampleindex1(ind);
386       if (check_index(*sys.m_sampleindex1, ind) == -1)
387       {
388         setError(BadSysTables, __LINE__);
389         return -1;
390       }
391       sys.m_obj_cnt++;
392     }
393   }
394 
395   return 0;
396 }
397 
398 int
create_systables(Ndb * ndb)399 NdbIndexStatImpl::create_systables(Ndb* ndb)
400 {
401   Sys sys(this, ndb);
402 
403   NdbDictionary::Dictionary* const dic = sys.m_dic;
404 
405   if (dic->beginSchemaTrans() == -1)
406   {
407     setError(dic->getNdbError().code, __LINE__);
408     return -1;
409   }
410 
411   if (get_systables(sys) == -1)
412     return -1;
413 
414   if (sys.m_obj_cnt == Sys::ObjCnt)
415   {
416     setError(HaveSysTables, __LINE__);
417     return -1;
418   }
419 
420   if (sys.m_obj_cnt != 0)
421   {
422     setError(BadSysTables, __LINE__);
423     return -1;
424   }
425 
426   {
427     NdbDictionary::Table tab;
428     if (make_headtable(tab) == -1)
429       return -1;
430     if (dic->createTable(tab) == -1)
431     {
432       setError(dic->getNdbError().code, __LINE__);
433       return -1;
434     }
435 
436     sys.m_headtable = dic->getTableGlobal(tab.getName());
437     if (sys.m_headtable == 0)
438     {
439       setError(dic->getNdbError().code, __LINE__);
440       return -1;
441     }
442   }
443 
444   {
445     NdbDictionary::Table tab;
446     if (make_sampletable(tab) == -1)
447       return -1;
448 
449 #ifdef VM_TRACE
450 #ifdef NDB_USE_GET_ENV
451     // test of schema trans
452     {
453       const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_CREATE", (char*)0, 0);
454       if (p != 0 && strchr("1Y", p[0]) != 0)
455       {
456         setError(9999, __LINE__);
457         return -1;
458       }
459     }
460 #endif
461 #endif
462 
463     if (dic->createTable(tab) == -1)
464     {
465       setError(dic->getNdbError().code, __LINE__);
466       return -1;
467     }
468 
469     sys.m_sampletable = dic->getTableGlobal(tab.getName());
470     if (sys.m_sampletable == 0)
471     {
472       setError(dic->getNdbError().code, __LINE__);
473       return -1;
474     }
475   }
476 
477   {
478     NdbDictionary::Index ind;
479     if (make_sampleindex1(ind) == -1)
480       return -1;
481     if (dic->createIndex(ind, *sys.m_sampletable) == -1)
482     {
483       setError(dic->getNdbError().code, __LINE__);
484       return -1;
485     }
486 
487     sys.m_sampleindex1 = dic->getIndexGlobal(ind.getName(), sys.m_sampletable->getName());
488     if (sys.m_sampleindex1 == 0)
489     {
490       setError(dic->getNdbError().code, __LINE__);
491       return -1;
492     }
493   }
494 
495   if (dic->endSchemaTrans() == -1)
496   {
497     setError(dic->getNdbError().code, __LINE__);
498     return -1;
499   }
500 
501   return 0;
502 }
503 
504 int
drop_systables(Ndb * ndb)505 NdbIndexStatImpl::drop_systables(Ndb* ndb)
506 {
507   Sys sys(this, ndb);
508 
509   NdbDictionary::Dictionary* const dic = sys.m_dic;
510 
511   if (dic->beginSchemaTrans() == -1)
512   {
513     setError(dic->getNdbError().code, __LINE__);
514     return -1;
515   }
516 
517   if (get_systables(sys) == -1 &&
518       m_error.code != BadSysTables)
519     return -1;
520 
521   if (sys.m_headtable != 0)
522   {
523     if (dic->dropTableGlobal(*sys.m_headtable) == -1)
524     {
525       setError(dic->getNdbError().code, __LINE__);
526       return -1;
527     }
528   }
529 
530   if (sys.m_sampletable != 0)
531   {
532 
533 #ifdef VM_TRACE
534 #ifdef NDB_USE_GET_ENV
535     // test of schema trans
536     {
537       const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_DROP", (char*)0, 0);
538       if (p != 0 && strchr("1Y", p[0]) != 0)
539       {
540         setError(9999, __LINE__);
541         return -1;
542       }
543     }
544 #endif
545 #endif
546 
547     if (dic->dropTableGlobal(*sys.m_sampletable) == -1)
548     {
549       setError(dic->getNdbError().code, __LINE__);
550       return -1;
551     }
552   }
553 
554   if (dic->endSchemaTrans() == -1)
555   {
556     setError(dic->getNdbError().code, __LINE__);
557     return -1;
558   }
559 
560   return 0;
561 }
562 
563 int
check_systables(Sys & sys)564 NdbIndexStatImpl::check_systables(Sys& sys)
565 {
566   if (get_systables(sys) == -1)
567     return -1;
568 
569   if (sys.m_obj_cnt == 0)
570   {
571     setError(NoSysTables, __LINE__);
572     return -1;
573   }
574 
575   if (sys.m_obj_cnt != Sys::ObjCnt)
576   {
577     setError(BadSysTables, __LINE__);
578     return -1;
579   }
580 
581   return 0;
582 }
583 
584 int
check_systables(Ndb * ndb)585 NdbIndexStatImpl::check_systables(Ndb* ndb)
586 {
587   Sys sys(this, ndb);
588 
589   if (check_systables(sys) == -1)
590     return -1;
591 
592   return 0;
593 }
594 
595 // operation context
596 
Con(NdbIndexStatImpl * impl,Head & head,Ndb * ndb)597 NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
598   m_impl(impl),
599   m_head(head),
600   m_ndb(ndb),
601   m_start()
602 {
603   head.m_indexId = m_impl->m_indexId;
604   head.m_indexVersion = m_impl->m_indexVersion;
605   m_dic = m_ndb->getDictionary();
606   m_headtable = 0;
607   m_sampletable = 0;
608   m_sampleindex1 = 0;
609   m_tx = 0;
610   m_op = 0;
611   m_scanop = 0;
612   m_cacheBuild = 0;
613   m_cachePos = 0;
614   m_cacheKeyOffset = 0;
615   m_cacheValueOffset = 0;
616 }
617 
~Con()618 NdbIndexStatImpl::Con::~Con()
619 {
620   if (m_cacheBuild != 0)
621   {
622     m_impl->free_cache(m_cacheBuild);
623     m_cacheBuild = 0;
624   }
625   if (m_tx != 0)
626   {
627     m_ndb->closeTransaction(m_tx);
628     m_tx = 0;
629   }
630   m_impl->sys_release(*this);
631 }
632 
633 int
startTransaction()634 NdbIndexStatImpl::Con::startTransaction()
635 {
636   assert(m_headtable != 0 && m_ndb != 0 && m_tx == 0);
637   Uint32 key[2] = {
638     m_head.m_indexId,
639     m_head.m_indexVersion
640   };
641   m_tx = m_ndb->startTransaction(m_headtable, (const char*)key, sizeof(key));
642   if (m_tx == 0)
643     return -1;
644   return 0;
645 }
646 
647 int
execute(bool commit)648 NdbIndexStatImpl::Con::execute(bool commit)
649 {
650   assert(m_tx != 0);
651   if (commit)
652   {
653     if (m_tx->execute(NdbTransaction::Commit) == -1)
654       return -1;
655     m_ndb->closeTransaction(m_tx);
656     m_tx = 0;
657   }
658   else
659   {
660     if (m_tx->execute(NdbTransaction::NoCommit) == -1)
661       return -1;
662   }
663   return 0;
664 }
665 
666 int
getNdbOperation()667 NdbIndexStatImpl::Con::getNdbOperation()
668 {
669   assert(m_headtable != 0);
670   assert(m_tx != 0 && m_op == 0);
671   m_op = m_tx->getNdbOperation(m_headtable);
672   if (m_op == 0)
673     return -1;
674   return 0;
675 }
676 
677 int
getNdbIndexScanOperation()678 NdbIndexStatImpl::Con::getNdbIndexScanOperation()
679 {
680   assert(m_sampletable != 0 && m_sampleindex1 != 0);
681   assert( m_tx != 0 && m_scanop == 0);
682   m_scanop = m_tx->getNdbIndexScanOperation(m_sampleindex1, m_sampletable);
683   if (m_scanop == 0)
684     return -1;
685   return 0;
686 }
687 
688 void
set_time()689 NdbIndexStatImpl::Con::set_time()
690 {
691   m_start = NdbTick_getCurrentTicks();
692 }
693 
694 Uint64
get_time()695 NdbIndexStatImpl::Con::get_time()
696 {
697   const NDB_TICKS stop = NdbTick_getCurrentTicks();
698   Uint64 us = NdbTick_Elapsed(m_start, stop).microSec();
699   return us;
700 }
701 
702 // index
703 
704 int
set_index(const NdbDictionary::Index & index,const NdbDictionary::Table & table)705 NdbIndexStatImpl::set_index(const NdbDictionary::Index& index,
706                             const NdbDictionary::Table& table)
707 {
708   if (m_indexSet)
709   {
710     setError(UsageError, __LINE__);
711     return -1;
712   }
713   m_indexId = index.getObjectId();
714   m_indexVersion = index.getObjectVersion();
715   m_tableId = table.getObjectId();
716   m_keyAttrs = index.getNoOfColumns();
717   m_valueAttrs = 1 + m_keyAttrs;
718   if (m_keyAttrs == 0)
719   {
720     setError(InternalError, __LINE__);
721     return -1;
722   }
723   if (m_keyAttrs > MaxKeyCount)
724   {
725     setError(InternalError, __LINE__);
726     return -1;
727   }
728 
729   // spec buffers
730   m_keySpecBuf = new NdbPack::Type [m_keyAttrs];
731   m_valueSpecBuf = new NdbPack::Type [m_valueAttrs];
732   if (m_keySpecBuf == 0 || m_valueSpecBuf == 0)
733   {
734     setError(NoMemError, __LINE__);
735     return -1;
736   }
737   m_keySpec.set_buf(m_keySpecBuf, m_keyAttrs);
738   m_valueSpec.set_buf(m_valueSpecBuf, m_valueAttrs);
739 
740   // index key spec
741   {
742     for (uint i = 0; i < m_keyAttrs; i++)
743     {
744       const NdbDictionary::Column* icol = index.getColumn(i);
745       if (icol == 0)
746       {
747         setError(UsageError, __LINE__);
748         return -1;
749       }
750       NdbPack::Type type (
751         icol->getType(),
752         icol->getSizeInBytes(),
753         icol->getNullable(),
754         icol->getCharset() != 0 ? icol->getCharset()->number : 0
755       );
756       if (m_keySpec.add(type) == -1)
757       {
758         setError(UsageError, __LINE__, m_keySpec.get_error_code());
759         return -1;
760       }
761     }
762   }
763   // stat values spec
764   {
765     NdbPack::Type type(NDB_TYPE_UNSIGNED, 4, false, 0);
766     // rir + rpk
767     if (m_valueSpec.add(type, m_valueAttrs) == -1)
768     {
769       setError(InternalError, __LINE__, m_valueSpec.get_error_code());
770       return -1;
771     }
772   }
773 
774   // data buffers (rounded to word)
775   m_keyDataBuf = new Uint8 [m_keyData.get_max_len4()];
776   m_valueDataBuf = new Uint8 [m_valueData.get_max_len4()];
777   if (m_keyDataBuf == 0 || m_valueDataBuf == 0)
778   {
779     setError(NoMemError, __LINE__);
780     return -1;
781   }
782   m_keyData.set_buf(m_keyDataBuf, m_keyData.get_max_len());
783   m_valueData.set_buf(m_valueDataBuf, m_valueData.get_max_len());
784 
785   m_indexSet = true;
786   return 0;
787 }
788 
789 void
reset_index()790 NdbIndexStatImpl::reset_index()
791 {
792   free_cache();
793   m_keySpec.reset();
794   m_valueSpec.reset();
795   delete [] m_keySpecBuf;
796   delete [] m_valueSpecBuf;
797   delete [] m_keyDataBuf;
798   delete [] m_valueDataBuf;
799   init();
800 }
801 
802 // head
803 
804 void
init_head(Head & head)805 NdbIndexStatImpl::init_head(Head& head)
806 {
807   head.m_found = -1;
808   head.m_eventType = -1;
809   head.m_indexId = 0;
810   head.m_indexVersion = 0;
811   head.m_tableId = 0;
812   head.m_fragCount = 0;
813   head.m_valueFormat = 0;
814   head.m_sampleVersion = 0;
815   head.m_loadTime = 0;
816   head.m_sampleCount = 0;
817   head.m_keyBytes = 0;
818 }
819 
820 // sys tables data
821 
822 int
sys_init(Con & con)823 NdbIndexStatImpl::sys_init(Con& con)
824 {
825   Ndb* ndb = con.m_ndb;
826   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
827   sys_release(con);
828 
829   con.m_headtable = dic->getTableGlobal(g_headtable_name);
830   if (con.m_headtable == 0)
831   {
832     setError(con, __LINE__);
833     mapError(ERR_NoSuchObject, NoSysTables);
834     return -1;
835   }
836   con.m_sampletable = dic->getTableGlobal(g_sampletable_name);
837   if (con.m_sampletable == 0)
838   {
839     setError(con, __LINE__);
840     mapError(ERR_NoSuchObject, NoSysTables);
841     return -1;
842   }
843   con.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *con.m_sampletable);
844   if (con.m_sampleindex1 == 0)
845   {
846     setError(con, __LINE__);
847     mapError(ERR_NoSuchObject, NoSysTables);
848     return -1;
849   }
850   return 0;
851 }
852 
853 void
sys_release(Con & con)854 NdbIndexStatImpl::sys_release(Con& con)
855 {
856   if (con.m_headtable != 0)
857   {
858     con.m_dic->removeTableGlobal(*con.m_headtable, false);
859     con.m_headtable = 0;
860   }
861   if (con.m_sampletable != 0)
862   {
863     con.m_dic->removeTableGlobal(*con.m_sampletable, false);
864     con.m_sampletable = 0;
865   }
866   if (con.m_sampleindex1 != 0)
867   {
868     con.m_dic->removeIndexGlobal(*con.m_sampleindex1, false);
869     con.m_sampleindex1 = 0;
870   }
871 }
872 
873 int
sys_read_head(Con & con,bool commit)874 NdbIndexStatImpl::sys_read_head(Con& con, bool commit)
875 {
876   Head& head = con.m_head;
877   head.m_sampleVersion = 0;
878   head.m_found = false;
879 
880   if (con.getNdbOperation() == -1)
881   {
882     setError(con, __LINE__);
883     return -1;
884   }
885   if (con.m_op->readTuple(NdbOperation::LM_Read) == -1)
886   {
887     setError(con, __LINE__);
888     return -1;
889   }
890   if (sys_head_setkey(con) == -1)
891     return -1;
892   if (sys_head_getvalue(con) == -1)
893     return -1;
894   if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
895   {
896     setError(con, __LINE__);
897     return -1;
898   }
899   if (con.execute(commit) == -1)
900   {
901     setError(con, __LINE__);
902     mapError(ERR_TupleNotFound, NoIndexStats);
903     return -1;
904   }
905   head.m_found = true;
906   if (head.m_sampleVersion == 0)
907   {
908     setError(NoIndexStats, __LINE__);
909     return -1;
910   }
911   return 0;
912 }
913 
914 int
sys_head_setkey(Con & con)915 NdbIndexStatImpl::sys_head_setkey(Con& con)
916 {
917   Head& head = con.m_head;
918   NdbOperation* op = con.m_op;
919   if (op->equal("index_id", (char*)&head.m_indexId) == -1)
920   {
921     setError(con, __LINE__);
922     return -1;
923   }
924   if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
925   {
926     setError(con, __LINE__);
927     return -1;
928   }
929   return 0;
930 }
931 
932 int
sys_head_getvalue(Con & con)933 NdbIndexStatImpl::sys_head_getvalue(Con& con)
934 {
935   Head& head = con.m_head;
936   NdbOperation* op = con.m_op;
937   if (op->getValue("table_id", (char*)&head.m_tableId) == 0)
938   {
939     setError(con, __LINE__);
940     return -1;
941   }
942   if (op->getValue("frag_count", (char*)&head.m_fragCount) == 0)
943   {
944     setError(con, __LINE__);
945     return -1;
946   }
947   if (op->getValue("value_format", (char*)&head.m_valueFormat) == 0)
948   {
949     setError(con, __LINE__);
950     return -1;
951   }
952   if (op->getValue("sample_version", (char*)&head.m_sampleVersion) == 0)
953   {
954     setError(con, __LINE__);
955     return -1;
956   }
957   if (op->getValue("load_time", (char*)&head.m_loadTime) == 0)
958   {
959     setError(con, __LINE__);
960     return -1;
961   }
962   if (op->getValue("sample_count", (char*)&head.m_sampleCount) == 0)
963   {
964     setError(con, __LINE__);
965     return -1;
966   }
967   if (op->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
968   {
969     setError(con, __LINE__);
970     return -1;
971   }
972   return 0;
973 }
974 
975 int
sys_sample_setkey(Con & con)976 NdbIndexStatImpl::sys_sample_setkey(Con& con)
977 {
978   Head& head = con.m_head;
979   NdbIndexScanOperation* op = con.m_scanop;
980   if (op->equal("index_id", (char*)&head.m_indexId) == -1)
981   {
982     setError(con, __LINE__);
983     return -1;
984   }
985   if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
986   {
987     setError(con, __LINE__);
988     return -1;
989   }
990   if (op->equal("sample_version", (char*)&head.m_sampleVersion) == -1)
991   {
992     setError(con, __LINE__);
993     return -1;
994   }
995   if (op->equal("stat_key", (char*)m_keyData.get_full_buf()) == -1)
996   {
997     setError(con, __LINE__);
998     return -1;
999   }
1000   return 0;
1001 }
1002 
1003 int
sys_sample_getvalue(Con & con)1004 NdbIndexStatImpl::sys_sample_getvalue(Con& con)
1005 {
1006   NdbIndexScanOperation* op = con.m_scanop;
1007   if (op->getValue("stat_key", (char*)m_keyData.get_full_buf()) == 0)
1008   {
1009     setError(con, __LINE__);
1010     return -1;
1011   }
1012   if (op->getValue("stat_value", (char*)m_valueData.get_full_buf()) == 0)
1013   {
1014     setError(con, __LINE__);
1015     return -1;
1016   }
1017   return 0;
1018 }
1019 
1020 int
sys_sample_setbound(Con & con,int sv_bound)1021 NdbIndexStatImpl::sys_sample_setbound(Con& con, int sv_bound)
1022 {
1023   Head& head = con.m_head;
1024   NdbIndexScanOperation* op = con.m_scanop;
1025   const NdbIndexScanOperation::BoundType eq_bound =
1026     NdbIndexScanOperation::BoundEQ;
1027 
1028   if (op->setBound("index_id", eq_bound, &head.m_indexId) == -1)
1029   {
1030     setError(con, __LINE__);
1031     return -1;
1032   }
1033   if (op->setBound("index_version", eq_bound, &head.m_indexVersion) == -1)
1034   {
1035     setError(con, __LINE__);
1036     return -1;
1037   }
1038   if (sv_bound != -1)
1039   {
1040     if (op->setBound("sample_version", sv_bound, &head.m_sampleVersion) == -1)
1041     {
1042       setError(con, __LINE__);
1043       return -1;
1044     }
1045   }
1046   return 0;
1047 }
1048 
1049 // update, delete
1050 
1051 int
update_stat(Ndb * ndb,Head & head)1052 NdbIndexStatImpl::update_stat(Ndb* ndb, Head& head)
1053 {
1054   Con con(this, head, ndb);
1055   if (con.m_dic->updateIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1056   {
1057     setError(con, __LINE__);
1058     mapError(ERR_NoSuchObject, NoSysTables);
1059     return -1;
1060   }
1061   return 0;
1062 }
1063 
1064 int
delete_stat(Ndb * ndb,Head & head)1065 NdbIndexStatImpl::delete_stat(Ndb* ndb, Head& head)
1066 {
1067   Con con(this, head, ndb);
1068   if (con.m_dic->deleteIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1069   {
1070     setError(con, __LINE__);
1071     mapError(ERR_NoSuchObject, NoSysTables);
1072     return -1;
1073   }
1074   return 0;
1075 }
1076 
1077 // read
1078 
1079 int
read_head(Ndb * ndb,Head & head)1080 NdbIndexStatImpl::read_head(Ndb* ndb, Head& head)
1081 {
1082   Con con(this, head, ndb);
1083   if (!m_indexSet)
1084   {
1085     setError(UsageError, __LINE__);
1086     return -1;
1087   }
1088   if (sys_init(con) == -1)
1089     return -1;
1090   if (con.startTransaction() == -1)
1091   {
1092     setError(con, __LINE__);
1093     return -1;
1094   }
1095   if (sys_read_head(con, true) == -1)
1096     return -1;
1097   return 0;
1098 }
1099 
1100 int
read_stat(Ndb * ndb,Head & head)1101 NdbIndexStatImpl::read_stat(Ndb* ndb, Head& head)
1102 {
1103   Con con(this, head, ndb);
1104   con.set_time();
1105 
1106   if (read_start(con) == -1)
1107     return -1;
1108   if (save_start(con) == -1)
1109     return -1;
1110   while (1)
1111   {
1112     int ret = read_next(con);
1113     if (ret == -1)
1114       return -1;
1115     if (ret != 0)
1116       break;
1117     if (save_next(con) == -1)
1118       return -1;
1119   }
1120   if (read_commit(con) == -1)
1121     return -1;
1122 
1123   Uint64 save_time = con.get_time();
1124   con.set_time();
1125 
1126   if (save_commit(con) == -1)
1127     return -1;
1128   Uint64 sort_time = con.get_time();
1129 
1130   const Cache& c = *m_cacheBuild;
1131   c.m_save_time = save_time;
1132   c.m_sort_time = sort_time;
1133   return 0;
1134 }
1135 
1136 int
read_start(Con & con)1137 NdbIndexStatImpl::read_start(Con& con)
1138 {
1139   //UNUSED Head& head = con.m_head;
1140   if (!m_indexSet)
1141   {
1142     setError(UsageError, __LINE__);
1143     return -1;
1144   }
1145   if (sys_init(con) == -1)
1146     return -1;
1147   if (con.startTransaction() == -1)
1148   {
1149     setError(con, __LINE__);
1150     return -1;
1151   }
1152   if (sys_read_head(con, false) == -1)
1153     return -1;
1154   if (con.getNdbIndexScanOperation() == -1)
1155   {
1156     setError(con, __LINE__);
1157     return -1;
1158   }
1159   if (con.m_scanop->readTuples(NdbOperation::LM_CommittedRead, 0) == -1)
1160   {
1161     setError(con, __LINE__);
1162     return -1;
1163   }
1164   if (sys_sample_setbound(con, NdbIndexScanOperation::BoundEQ) == -1)
1165     return -1;
1166   if (sys_sample_getvalue(con) == -1)
1167     return -1;
1168   if (con.execute(false) == -1)
1169   {
1170     setError(con, __LINE__);
1171     return -1;
1172   }
1173   return 0;
1174 }
1175 
1176 int
read_next(Con & con)1177 NdbIndexStatImpl::read_next(Con& con)
1178 {
1179   m_keyData.reset();
1180   m_valueData.reset();
1181   int ret = con.m_scanop->nextResult();
1182   if (ret != 0)
1183   {
1184     if (ret == -1)
1185       setError(con, __LINE__);
1186     return ret;
1187   }
1188 
1189   /*
1190    * Key and value are raw data and little-endian.  Create the complete
1191    * NdbPack::Data instance and convert it to native-endian.
1192    */
1193   const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
1194   const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
1195 
1196   if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
1197   {
1198     setError(InternalError, __LINE__, m_keyData.get_error_code());
1199     return -1;
1200   }
1201   if (m_keyData.convert(to_endian) == -1)
1202   {
1203     setError(InternalError, __LINE__, m_keyData.get_error_code());
1204     return -1;
1205   }
1206   if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
1207   {
1208     setError(InternalError, __LINE__, m_valueData.get_error_code());
1209     return -1;
1210   }
1211   if (m_valueData.convert(to_endian) == -1)
1212   {
1213     setError(InternalError, __LINE__, m_valueData.get_error_code());
1214     return -1;
1215   }
1216   return 0;
1217 }
1218 
1219 int
read_commit(Con & con)1220 NdbIndexStatImpl::read_commit(Con& con)
1221 {
1222   if (con.execute(true) == -1)
1223   {
1224     setError(con, __LINE__);
1225     return -1;
1226   }
1227   return 0;
1228 }
1229 
1230 // save
1231 
1232 int
save_start(Con & con)1233 NdbIndexStatImpl::save_start(Con& con)
1234 {
1235   if (m_cacheBuild != 0)
1236   {
1237     free_cache(m_cacheBuild);
1238     m_cacheBuild = 0;
1239   }
1240   con.m_cacheBuild = new Cache;
1241   if (con.m_cacheBuild == 0)
1242   {
1243     setError(NoMemError, __LINE__);
1244     return -1;
1245   }
1246   new (con.m_cacheBuild) Cache;
1247   if (cache_init(con) == -1)
1248     return -1;
1249   return 0;
1250 }
1251 
1252 int
save_next(Con & con)1253 NdbIndexStatImpl::save_next(Con& con)
1254 {
1255   if (cache_insert(con) == -1)
1256     return -1;
1257   return 0;
1258 }
1259 
1260 int
save_commit(Con & con)1261 NdbIndexStatImpl::save_commit(Con& con)
1262 {
1263   if (cache_commit(con) == -1)
1264     return -1;
1265   m_cacheBuild = con.m_cacheBuild;
1266   con.m_cacheBuild = 0;
1267   return 0;
1268 }
1269 
1270 // cache inline
1271 
1272 inline uint
get_keyaddr(uint pos) const1273 NdbIndexStatImpl::Cache::get_keyaddr(uint pos) const
1274 {
1275   assert(pos < m_sampleCount);
1276   const uint offset = pos * m_addrLen;
1277   assert(offset + m_addrLen <= m_addrBytes);
1278   const Uint8* src = &m_addrArray[offset];
1279   uint addr = 0;
1280   switch (m_addrLen) {
1281   case 4:
1282     addr += src[3] << 24;
1283     // Fall through
1284   case 3:
1285     addr += src[2] << 16;
1286     // Fall through
1287   case 2:
1288     addr += src[1] << 8;
1289     // Fall through
1290   case 1:
1291     addr += src[0] << 0;
1292     break;
1293   default:
1294     assert(false);
1295   }
1296   return addr;
1297 }
1298 
1299 inline void
set_keyaddr(uint pos,uint addr)1300 NdbIndexStatImpl::Cache::set_keyaddr(uint pos, uint addr)
1301 {
1302   assert(pos < m_sampleCount);
1303   const uint offset = pos * m_addrLen;
1304   assert(offset + m_addrLen <= m_addrBytes);
1305   Uint8* dst = &m_addrArray[offset];
1306   switch (m_addrLen) {
1307   case 4:
1308     dst[3] = (addr >> 24) & 0xFF;
1309     // Fall through
1310   case 3:
1311     dst[2] = (addr >> 16) & 0xFF;
1312     // Fall through
1313   case 2:
1314     dst[1] = (addr >> 8) & 0xFF;
1315     // Fall through
1316   case 1:
1317     dst[0] = (addr >> 0) & 0xFF;
1318     break;
1319   default:
1320     assert(false);
1321   }
1322   assert(get_keyaddr(pos) == addr);
1323 }
1324 
1325 inline const Uint8*
get_keyptr(uint addr) const1326 NdbIndexStatImpl::Cache::get_keyptr(uint addr) const
1327 {
1328   assert(addr < m_keyBytes);
1329   return &m_keyArray[addr];
1330 }
1331 
1332 inline Uint8*
get_keyptr(uint addr)1333 NdbIndexStatImpl::Cache::get_keyptr(uint addr)
1334 {
1335   assert(addr < m_keyBytes);
1336   return &m_keyArray[addr];
1337 }
1338 
1339 inline const Uint8*
get_valueptr(uint pos) const1340 NdbIndexStatImpl::Cache::get_valueptr(uint pos) const
1341 {
1342   assert(pos < m_sampleCount);
1343   return &m_valueArray[pos * m_valueLen];
1344 }
1345 
1346 inline Uint8*
get_valueptr(uint pos)1347 NdbIndexStatImpl::Cache::get_valueptr(uint pos)
1348 {
1349   assert(pos < m_sampleCount);
1350   return &m_valueArray[pos * m_valueLen];
1351 }
1352 
1353 inline void
swap_entry(uint pos1,uint pos2)1354 NdbIndexStatImpl::Cache::swap_entry(uint pos1, uint pos2)
1355 {
1356   uint hold_addr;
1357   Uint8 hold_value[MaxValueBytes];
1358 
1359   hold_addr = get_keyaddr(pos1);
1360   memcpy(hold_value, get_valueptr(pos1), m_valueLen);
1361   set_keyaddr(pos1, get_keyaddr(pos2));
1362   memcpy(get_valueptr(pos1), get_valueptr(pos2), m_valueLen);
1363   set_keyaddr(pos2, hold_addr);
1364   memcpy(get_valueptr(pos2), hold_value, m_valueLen);
1365 }
1366 
1367 inline double
get_rir1(uint pos) const1368 NdbIndexStatImpl::Cache::get_rir1(uint pos) const
1369 {
1370   const Uint8* ptr = get_valueptr(pos);
1371   Uint32 n;
1372   memcpy(&n, &ptr[0], 4);
1373   double x = (double)n;
1374   return x;
1375 }
1376 
1377 inline double
get_rir1(uint pos1,uint pos2) const1378 NdbIndexStatImpl::Cache::get_rir1(uint pos1, uint pos2) const
1379 {
1380   assert(pos2 > pos1);
1381   return get_rir1(pos2) - get_rir1(pos1);
1382 }
1383 
1384 inline double
get_rir(uint pos) const1385 NdbIndexStatImpl::Cache::get_rir(uint pos) const
1386 {
1387   double x = (double)m_fragCount * get_rir1(pos);
1388   return x;
1389 }
1390 
1391 inline double
get_rir(uint pos1,uint pos2) const1392 NdbIndexStatImpl::Cache::get_rir(uint pos1, uint pos2) const
1393 {
1394   assert(pos2 > pos1);
1395   return get_rir(pos2) - get_rir(pos1);
1396 }
1397 
1398 inline double
get_unq1(uint pos,uint k) const1399 NdbIndexStatImpl::Cache::get_unq1(uint pos, uint k) const
1400 {
1401   assert(k < m_keyAttrs);
1402   const Uint8* ptr = get_valueptr(pos);
1403   Uint32 n;
1404   memcpy(&n, &ptr[4 + k * 4], 4);
1405   double x = (double)n;
1406   return x;
1407 }
1408 
1409 inline double
get_unq1(uint pos1,uint pos2,uint k) const1410 NdbIndexStatImpl::Cache::get_unq1(uint pos1, uint pos2, uint k) const
1411 {
1412   assert(pos2 > pos1);
1413   return get_unq1(pos2, k) - get_unq1(pos1, k);
1414 }
1415 
1416 static inline double
get_unqfactor(uint p,double r,double u)1417 get_unqfactor(uint p, double r, double u)
1418 {
1419   double ONE = (double)1.0;
1420   double d = (double)p;
1421   double f = ONE + (d - ONE) * ::pow(u / r, d - ONE);
1422   return f;
1423 }
1424 
1425 inline double
get_unq(uint pos,uint k) const1426 NdbIndexStatImpl::Cache::get_unq(uint pos, uint k) const
1427 {
1428   uint p = m_fragCount;
1429   double r = get_rir1(pos);
1430   double u = get_unq1(pos, k);
1431   double f = get_unqfactor(p, r, u);
1432   double x = f * u;
1433   return x;
1434 }
1435 
1436 inline double
get_unq(uint pos1,uint pos2,uint k) const1437 NdbIndexStatImpl::Cache::get_unq(uint pos1, uint pos2, uint k) const
1438 {
1439   uint p = m_fragCount;
1440   double r = get_rir1(pos1, pos2);
1441   double u = get_unq1(pos1, pos2, k);
1442   double f = get_unqfactor(p, r, u);
1443   double x = f * u;
1444   return x;
1445 }
1446 
1447 inline double
get_rpk(uint pos,uint k) const1448 NdbIndexStatImpl::Cache::get_rpk(uint pos, uint k) const
1449 {
1450   return get_rir(pos) / get_unq(pos, k);
1451 }
1452 
1453 inline double
get_rpk(uint pos1,uint pos2,uint k) const1454 NdbIndexStatImpl::Cache::get_rpk(uint pos1, uint pos2, uint k) const
1455 {
1456   assert(pos2 > pos1);
1457   return get_rir(pos1, pos2) / get_unq(pos1, pos2, k);
1458 }
1459 
1460 // cache
1461 
Cache()1462 NdbIndexStatImpl::Cache::Cache()
1463 {
1464   m_valid = false;
1465   m_keyAttrs = 0;
1466   m_valueAttrs = 0;
1467   m_fragCount = 0;
1468   m_sampleVersion = 0;
1469   m_sampleCount = 0;
1470   m_keyBytes = 0;
1471   m_valueLen = 0;
1472   m_valueBytes = 0;
1473   m_addrLen = 0;
1474   m_addrBytes = 0;
1475   m_addrArray = 0;
1476   m_keyArray = 0;
1477   m_valueArray = 0;
1478   m_nextClean = 0;
1479   // performance
1480   m_save_time = 0;
1481   m_sort_time = 0;
1482   // in use by query_stat
1483   m_ref_count = 0;
1484 }
1485 
1486 int
cache_init(Con & con)1487 NdbIndexStatImpl::cache_init(Con& con)
1488 {
1489   Cache& c = *con.m_cacheBuild;
1490   Head& head = con.m_head;
1491   Mem* mem = m_mem_handler;
1492 
1493   if (m_keyAttrs == 0)
1494   {
1495     setError(InternalError, __LINE__);
1496     return -1;
1497   }
1498   c.m_keyAttrs = m_keyAttrs;
1499   c.m_valueAttrs = m_valueAttrs;
1500   c.m_fragCount = head.m_fragCount;
1501   c.m_sampleCount = head.m_sampleCount;
1502   c.m_keyBytes = head.m_keyBytes;
1503   c.m_valueLen = 4 + c.m_keyAttrs * 4;
1504   c.m_valueBytes = c.m_sampleCount * c.m_valueLen;
1505   c.m_addrLen =
1506     c.m_keyBytes < (1 << 8) ? 1 :
1507     c.m_keyBytes < (1 << 16) ? 2 :
1508     c.m_keyBytes < (1 << 24) ? 3 : 4;
1509   c.m_addrBytes = c.m_sampleCount * c.m_addrLen;
1510 
1511   // wl4124_todo omit addrArray if keys have fixed size
1512   c.m_addrArray = (Uint8*)mem->mem_alloc(c.m_addrBytes);
1513   if (c.m_addrArray == 0)
1514   {
1515     setError(NoMemError, __LINE__);
1516     return -1;
1517   }
1518   c.m_keyArray = (Uint8*)mem->mem_alloc(c.m_keyBytes);
1519   if (c.m_keyArray == 0)
1520   {
1521     setError(NoMemError, __LINE__);
1522     return -1;
1523   }
1524   c.m_valueArray = (Uint8*)mem->mem_alloc(c.m_valueBytes);
1525   if (c.m_valueArray == 0)
1526   {
1527     setError(NoMemError, __LINE__);
1528     return -1;
1529   }
1530   return 0;
1531 }
1532 
1533 int
cache_insert(Con & con)1534 NdbIndexStatImpl::cache_insert(Con& con)
1535 {
1536   Cache& c = *con.m_cacheBuild;
1537 
1538   const uint nextPos = con.m_cachePos + 1;
1539   if (nextPos > c.m_sampleCount)
1540   {
1541     setError(InternalError, __LINE__);
1542     return -1;
1543   }
1544   assert(m_keyData.is_full());
1545   const uint keyLen = m_keyData.get_data_len();
1546   const uint nextKeyOffset = con.m_cacheKeyOffset + keyLen;
1547   if (nextKeyOffset > c.m_keyBytes)
1548   {
1549     setError(InternalError, __LINE__);
1550     return -1;
1551   }
1552   if (m_valueData.get_data_len() != c.m_valueLen)
1553   {
1554     setError(InternalError, __LINE__);
1555     return -1;
1556   }
1557   const uint nextValueOffset = con.m_cacheValueOffset + c.m_valueLen;
1558   if (nextValueOffset > c.m_valueBytes)
1559   {
1560     setError(InternalError, __LINE__);
1561     return -1;
1562   }
1563 
1564   c.set_keyaddr(con.m_cachePos, con.m_cacheKeyOffset);
1565   con.m_cachePos = nextPos;
1566 
1567   Uint8* cacheKeyPtr = &c.m_keyArray[con.m_cacheKeyOffset];
1568   const Uint8* keyPtr = (const Uint8*)m_keyData.get_data_buf();
1569   memcpy(cacheKeyPtr, keyPtr, keyLen);
1570   con.m_cacheKeyOffset = nextKeyOffset;
1571 
1572   Uint8* cacheValuePtr = &c.m_valueArray[con.m_cacheValueOffset];
1573   const Uint8* valuePtr = (const Uint8*)m_valueData.get_data_buf();
1574   memcpy(cacheValuePtr, valuePtr, c.m_valueLen);
1575   con.m_cacheValueOffset = nextValueOffset;
1576 
1577   // verify sanity
1578   {
1579     const Uint8* rir_ptr = &cacheValuePtr[0];
1580     Uint32 rir;
1581     memcpy(&rir, rir_ptr, 4);
1582     if (!(rir != 0))
1583     {
1584       setError(InvalidCache, __LINE__);
1585       return -1;
1586     }
1587     Uint32 unq_prev = 0;
1588     for (uint k = 0; k < c.m_keyAttrs; k++)
1589     {
1590       Uint8* unq_ptr = &cacheValuePtr[4 + k * 4];
1591       Uint32 unq;
1592       memcpy(&unq, unq_ptr, 4);
1593       if (!(unq != 0))
1594       {
1595         setError(InvalidCache, __LINE__);
1596         return -1;
1597       }
1598       if (!(rir >= unq))
1599       {
1600         setError(InvalidCache, __LINE__);
1601         return -1;
1602       }
1603       if (!(unq >= unq_prev))
1604       {
1605         setError(InvalidCache, __LINE__);
1606         return -1;
1607       }
1608       unq_prev = unq;
1609     }
1610   }
1611   return 0;
1612 }
1613 
1614 int
cache_commit(Con & con)1615 NdbIndexStatImpl::cache_commit(Con& con)
1616 {
1617   Cache& c = *con.m_cacheBuild;
1618   Head& head = con.m_head;
1619   if (con.m_cachePos != c.m_sampleCount)
1620   {
1621     setError(InternalError, __LINE__);
1622     return -1;
1623   }
1624   if (con.m_cacheKeyOffset != c.m_keyBytes)
1625   {
1626     setError(InternalError, __LINE__);
1627     return -1;
1628   }
1629   if (con.m_cacheValueOffset != c.m_valueBytes)
1630   {
1631     setError(InternalError, __LINE__);
1632     return -1;
1633   }
1634   c.m_sampleVersion = head.m_sampleVersion;
1635   if (cache_sort(c) == -1)
1636     return -1;
1637   if (cache_verify(c) == -1)
1638     return -1;
1639   c.m_valid = true;
1640   return 0;
1641 }
1642 
1643 int
cache_cmpaddr(const Cache & c,uint addr1,uint addr2) const1644 NdbIndexStatImpl::cache_cmpaddr(const Cache& c, uint addr1, uint addr2) const
1645 {
1646   const Uint8* key1 = c.get_keyptr(addr1);
1647   const Uint8* key2 = c.get_keyptr(addr2);
1648 
1649   NdbPack::DataC keyData1(m_keySpec, false);
1650   NdbPack::DataC keyData2(m_keySpec, false);
1651   keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1652   keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1653 
1654   Uint32 num_eq;
1655   int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1656   assert(addr1 == addr2 || res != 0);
1657   return res;
1658 }
1659 
1660 int
cache_cmppos(const Cache & c,uint pos1,uint pos2) const1661 NdbIndexStatImpl::cache_cmppos(const Cache& c, uint pos1, uint pos2) const
1662 {
1663   uint addr1 = c.get_keyaddr(pos1);
1664   uint addr2 = c.get_keyaddr(pos2);
1665   return cache_cmpaddr(c, addr1, addr2);
1666 }
1667 
1668 /*
1669  * Sort addr and value arrays via key values.  The samples were inserted
1670  * in key order and were read back via index scan so they may be nearly
1671  * ordered at first.  This is quicksort worst case so we do not use it.
1672  */
1673 int
cache_sort(Cache & c)1674 NdbIndexStatImpl::cache_sort(Cache& c)
1675 {
1676   if (c.m_sampleCount > 1)
1677     cache_hsort(c);
1678   return 0;
1679 }
1680 
1681 // insertion sort - expensive
1682 void
cache_isort(Cache & c)1683 NdbIndexStatImpl::cache_isort(Cache& c)
1684 {
1685   int n = c.m_sampleCount;
1686   for (int i = 1; i < n; i++)
1687   {
1688     for (int j = i - 1; j >= 0; j--)
1689     {
1690       int res = cache_cmppos(c, j, j + 1);
1691       if (res < 0)
1692         break;
1693       c.swap_entry(j, j + 1);
1694     }
1695   }
1696 }
1697 
1698 // heapsort
1699 void
cache_hsort(Cache & c)1700 NdbIndexStatImpl::cache_hsort(Cache& c)
1701 {
1702   int count = c.m_sampleCount;
1703   int i;
1704 
1705   // highest entry which can have children
1706   i = count / 2;
1707 
1708   // make into heap (binary tree where child < parent)
1709   while (i >= 0)
1710   {
1711     cache_hsort_sift(c, i, count);
1712     i--;
1713   }
1714 
1715   // verify is too expensive to enable under VM_TRACE
1716 
1717 #ifdef ndb_index_stat_hsort_verify
1718   cache_hsort_verify(c, count);
1719 #endif
1720 
1721   // sort
1722   i = count - 1;
1723   while (i > 0)
1724   {
1725     // move current max to proper position
1726     c.swap_entry(0, i);
1727 
1728     // restore heap property for the rest
1729     cache_hsort_sift(c, 0, i);
1730 #ifdef ndb_index_stat_hsort_verify
1731     cache_hsort_verify(c, i);
1732 #endif
1733     i--;
1734   }
1735 }
1736 
1737 void
cache_hsort_sift(Cache & c,int i,int count)1738 NdbIndexStatImpl::cache_hsort_sift(Cache& c, int i, int count)
1739 {
1740   int parent = i;
1741 
1742   while (1)
1743   {
1744     // left child if any
1745     int child = parent * 2 + 1;
1746     if (! (child < count))
1747       break;
1748 
1749     // replace by right child if bigger
1750     if (child + 1 < count && cache_cmppos(c, child, child + 1) < 0)
1751       child = child + 1;
1752 
1753     // done if both children are less than parent
1754     if (cache_cmppos(c, child, parent) < 0)
1755       break;
1756 
1757     c.swap_entry(parent, child);
1758     parent = child;
1759   }
1760 }
1761 
1762 #ifdef ndb_index_stat_hsort_verify
1763 // verify heap property
1764 void
cache_hsort_verify(Cache & c,int count)1765 NdbIndexStatImpl::cache_hsort_verify(Cache& c, int count)
1766 {
1767   for (int i = 0; i < count; i++)
1768   {
1769     int parent = i;
1770     int child1 = 2 * i + 1;
1771     int child2 = 2 * i + 2;
1772     if (child1 < count)
1773     {
1774       assert(cache_cmppos(c, child1, parent) < 0);
1775     }
1776     if (child2 < count)
1777     {
1778       assert(cache_cmppos(c, child2, parent) < 0);
1779     }
1780   }
1781 }
1782 #endif
1783 
1784 int
cache_verify(const Cache & c)1785 NdbIndexStatImpl::cache_verify(const Cache& c)
1786 {
1787   for (uint pos1 = 0; pos1 < c.m_sampleCount; pos1++)
1788   {
1789     const uint addr1 = c.get_keyaddr(pos1);
1790     const Uint8* key1 = c.get_keyptr(addr1);
1791     NdbPack::DataC keyData1(m_keySpec, false);
1792     keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1793     uint pos2 = pos1 + 1;
1794     if (pos2 < c.m_sampleCount)
1795     {
1796       const uint addr2 = c.get_keyaddr(pos2);
1797       const Uint8* key2 = c.get_keyptr(addr2);
1798       NdbPack::DataC keyData2(m_keySpec, false);
1799       keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1800       Uint32 num_eq;
1801       int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1802       if (!(res < 0))
1803       {
1804         setError(InvalidCache, __LINE__);
1805         return -1;
1806       }
1807       const Uint8* ptr1 = c.get_valueptr(pos1);
1808       const Uint8* ptr2 = c.get_valueptr(pos2);
1809       Uint32 rir1;
1810       Uint32 rir2;
1811       memcpy(&rir1, &ptr1[0], 4);
1812       memcpy(&rir2, &ptr2[0], 4);
1813       if (!(rir1 < rir2))
1814       {
1815         setError(InvalidCache, __LINE__);
1816         return -1;
1817       }
1818       for (uint k = 0; k < c.m_keyAttrs; k++)
1819       {
1820         Uint32 unq1;
1821         Uint32 unq2;
1822         memcpy(&unq1, &ptr1[4 + k * 4], 4);
1823         memcpy(&unq2, &ptr2[4 + k * 4], 4);
1824         if (!(unq1 <= unq2))
1825         {
1826           setError(InvalidCache, __LINE__);
1827           return -1;
1828         }
1829         if (k == c.m_keyAttrs - 1 && !(unq1 < unq2))
1830         {
1831           setError(InvalidCache, __LINE__);
1832           return -1;
1833         }
1834       }
1835     }
1836   }
1837   return 0;
1838 }
1839 
1840 void
move_cache()1841 NdbIndexStatImpl::move_cache()
1842 {
1843   Cache* cacheTmp = m_cacheQuery;
1844 
1845   NdbMutex_Lock(m_query_mutex);
1846   m_cacheQuery = m_cacheBuild;
1847   NdbMutex_Unlock(m_query_mutex);
1848   m_cacheBuild = 0;
1849 
1850   if (cacheTmp != 0)
1851   {
1852     cacheTmp->m_nextClean = m_cacheClean;
1853     m_cacheClean = cacheTmp;
1854   }
1855 }
1856 
1857 void
clean_cache()1858 NdbIndexStatImpl::clean_cache()
1859 {
1860   while (m_cacheClean != 0)
1861   {
1862     NdbIndexStatImpl::Cache* tmp = m_cacheClean;
1863     m_cacheClean = tmp->m_nextClean;
1864     free_cache(tmp);
1865   }
1866 }
1867 
1868 void
free_cache(Cache * c)1869 NdbIndexStatImpl::free_cache(Cache* c)
1870 {
1871   Mem* mem = m_mem_handler;
1872   mem->mem_free(c->m_addrArray);
1873   mem->mem_free(c->m_keyArray);
1874   mem->mem_free(c->m_valueArray);
1875   delete c;
1876 }
1877 
1878 void
free_cache()1879 NdbIndexStatImpl::free_cache()
1880 {
1881   // twice to move all to clean list
1882   move_cache();
1883   move_cache();
1884   clean_cache();
1885 }
1886 
1887 // cache dump
1888 
CacheIter(const NdbIndexStatImpl & impl)1889 NdbIndexStatImpl::CacheIter::CacheIter(const NdbIndexStatImpl& impl) :
1890   m_keyData(impl.m_keySpec, false),
1891   m_valueData(impl.m_valueSpec, false)
1892 {
1893   m_keyCount = impl.m_keyAttrs;
1894   m_sampleCount = 0;
1895   m_sampleIndex = 0;
1896 }
1897 
1898 int
dump_cache_start(CacheIter & iter)1899 NdbIndexStatImpl::dump_cache_start(CacheIter& iter)
1900 {
1901   if (m_cacheQuery == 0)
1902   {
1903     setError(UsageError, __LINE__);
1904     return -1;
1905   }
1906   const Cache& c = *m_cacheQuery;
1907   new (&iter) CacheIter(*this);
1908   iter.m_sampleCount = c.m_sampleCount;
1909   iter.m_sampleIndex = ~(Uint32)0;
1910   return 0;
1911 }
1912 
1913 bool
dump_cache_next(CacheIter & iter)1914 NdbIndexStatImpl::dump_cache_next(CacheIter& iter)
1915 {
1916   if (iter.m_sampleIndex == ~(Uint32)0)
1917     iter.m_sampleIndex = 0;
1918   else
1919     iter.m_sampleIndex++;
1920   if (iter.m_sampleIndex >= iter.m_sampleCount)
1921     return false;
1922   const Cache& c = *m_cacheQuery;
1923   const uint pos = iter.m_sampleIndex;
1924   const uint addr = c.get_keyaddr(pos);
1925   const Uint8* key = c.get_keyptr(addr);
1926   const Uint8* value = c.get_valueptr(pos);
1927   iter.m_keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
1928   iter.m_valueData.set_buf(value, c.m_valueLen, c.m_valueAttrs);
1929   return true;
1930 }
1931 
1932 // bound
1933 
1934 int
finalize_bound(Bound & bound)1935 NdbIndexStatImpl::finalize_bound(Bound& bound)
1936 {
1937   assert(bound.m_type == 0 || bound.m_type == 1);
1938   int side = 0;
1939   if (bound.m_data.get_cnt() == 0)
1940   {
1941     if (bound.m_strict != -1)
1942     {
1943       setError(UsageError, __LINE__);
1944       return -1;
1945     }
1946   }
1947   else
1948   {
1949     if (bound.m_strict == -1)
1950     {
1951       setError(UsageError, __LINE__);
1952       return -1;
1953     }
1954     if (bound.m_type == 0)
1955       side = bound.m_strict ? +1 : -1;
1956     else
1957       side = bound.m_strict ? -1 : +1;
1958   }
1959   if (bound.m_bound.finalize(side) == -1)
1960   {
1961     setError(UsageError, __LINE__);
1962     return -1;
1963   }
1964   return 0;
1965 }
1966 
1967 // range
1968 
1969 int
convert_range(Range & range,const NdbRecord * key_record,const NdbIndexScanOperation::IndexBound * ib)1970 NdbIndexStatImpl::convert_range(Range& range,
1971                                 const NdbRecord* key_record,
1972                                 const NdbIndexScanOperation::IndexBound* ib)
1973 {
1974   if (ib == 0)
1975     return 0;
1976   if (ib->low_key_count == 0 && ib->high_key_count == 0)
1977     return 0;
1978   for (uint j = 0; j <= 1; j++)
1979   {
1980     Bound& bound = j == 0 ? range.m_bound1 : range.m_bound2;
1981     bound.m_bound.reset();
1982     const char* key = j == 0 ? ib->low_key : ib->high_key;
1983     const uint key_count = j == 0 ? ib->low_key_count : ib->high_key_count;
1984     const bool inclusive = j == 0 ? ib->low_inclusive : ib->high_inclusive;
1985     Uint32 len_out;
1986     for (uint i = 0; i < key_count; i++)
1987     {
1988       const uint i2 = key_record->key_indexes[i];
1989       require(i2 < key_record->noOfColumns);
1990       const NdbRecord::Attr& attr = key_record->columns[i2];
1991       if (!attr.is_null(key))
1992       {
1993         const char* data = key + attr.offset;
1994         char buf[256];
1995         if (attr.flags & NdbRecord::IsMysqldShrinkVarchar)
1996         {
1997           Uint32 len;
1998           if (!attr.shrink_varchar(key, len, buf))
1999           {
2000             setError(InternalError, __LINE__);
2001             return -1;
2002           }
2003           data = buf;
2004         }
2005         if (bound.m_data.add(data, &len_out) == -1)
2006         {
2007           setError(InternalError, __LINE__, bound.m_data.get_error_code());
2008           return -1;
2009         }
2010       }
2011       else
2012       {
2013         if (bound.m_data.add_null(&len_out) == -1)
2014         {
2015           setError(InternalError, __LINE__, bound.m_data.get_error_code());
2016           return -1;
2017         }
2018       }
2019     }
2020     if (key_count > 0)
2021       bound.m_strict = !inclusive;
2022     if (finalize_bound(bound) == -1)
2023     {
2024       setError(InternalError, __LINE__);
2025       return -1;
2026     }
2027   }
2028 
2029 #ifdef VM_TRACE
2030 #ifdef NDB_USE_GET_ENV
2031   {
2032     const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_RANGE_ERROR", (char*)0, 0);
2033     if (p != 0 && strchr("1Y", p[0]) != 0)
2034     {
2035       if (rand() % 10 == 0)
2036       {
2037         setError(InternalError, __LINE__, NdbIndexStat::InternalError);
2038         return -1;
2039       }
2040     }
2041   }
2042 #endif
2043 #endif
2044 
2045   return 0;
2046 }
2047 
2048 // query
2049 
2050 // normalize values to >= 1.0
2051 void
query_normalize(const Cache & c,StatValue & value)2052 NdbIndexStatImpl::query_normalize(const Cache& c, StatValue& value)
2053 {
2054   if (!value.m_empty)
2055   {
2056     if (value.m_rir < 1.0)
2057       value.m_rir = 1.0;
2058     for (uint k = 0; k < c.m_keyAttrs; k++)
2059     {
2060       if (value.m_unq[k] < 1.0)
2061         value.m_unq[k] = 1.0;
2062     }
2063   }
2064   else
2065   {
2066     value.m_rir = 1.0;
2067     for (uint k = 0; k < c.m_keyAttrs; k++)
2068       value.m_unq[k] = 1.0;
2069   }
2070 }
2071 
2072 int
query_stat(const Range & range,Stat & stat)2073 NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
2074 {
2075   NdbMutex_Lock(m_query_mutex);
2076   if (unlikely(m_cacheQuery == 0))
2077   {
2078     NdbMutex_Unlock(m_query_mutex);
2079     setError(UsageError, __LINE__);
2080     return -1;
2081   }
2082   const Cache& c = *m_cacheQuery;
2083   if (unlikely(!c.m_valid))
2084   {
2085     NdbMutex_Unlock(m_query_mutex);
2086     setError(InvalidCache, __LINE__);
2087     return -1;
2088   }
2089   c.m_ref_count++;
2090   NdbMutex_Unlock(m_query_mutex);
2091 
2092 #ifdef VM_TRACE
2093 #ifdef NDB_USE_GET_ENV
2094   {
2095     const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_SLOW_QUERY", (char*)0, 0);
2096     if (p != 0 && strchr("1Y", p[0]) != 0)
2097     {
2098       int ms = 1 + rand() % 20;
2099       NdbSleep_MilliSleep(ms);
2100     }
2101   }
2102 #endif
2103 #endif
2104 
2105   // clients run these in parallel
2106   query_interpolate(c, range, stat);
2107   query_normalize(c, stat.m_value);
2108 
2109   NdbMutex_Lock(m_query_mutex);
2110   assert(c.m_ref_count != 0);
2111   c.m_ref_count--;
2112   NdbMutex_Unlock(m_query_mutex);
2113   return 0;
2114 }
2115 
2116 void
query_interpolate(const Cache & c,const Range & range,Stat & stat)2117 NdbIndexStatImpl::query_interpolate(const Cache& c,
2118                                     const Range& range,
2119                                     Stat& stat)
2120 {
2121   const uint keyAttrs = c.m_keyAttrs;
2122   StatValue& value = stat.m_value;
2123   value.m_empty = false;
2124   stat.m_rule[0] = "-";
2125   stat.m_rule[1] = "-";
2126   stat.m_rule[2] = "-";
2127 
2128   if (c.m_sampleCount == 0)
2129   {
2130     stat.m_rule[0] = "r1.1";
2131     value.m_empty = true;
2132     return;
2133   }
2134   const uint posMIN = 0;
2135   const uint posMAX = c.m_sampleCount - 1;
2136 
2137   const Bound& bound1 = range.m_bound1;
2138   const Bound& bound2 = range.m_bound2;
2139   if (bound1.m_data.is_empty() && bound2.m_data.is_empty())
2140   {
2141     stat.m_rule[0] = "r1.2";
2142     value.m_rir = c.get_rir(posMAX);
2143     for (uint k = 0; k < keyAttrs; k++)
2144       value.m_unq[k] = c.get_unq(posMAX, k);
2145     return;
2146   }
2147 
2148   StatBound& stat1 = stat.m_stat1;
2149   StatBound& stat2 = stat.m_stat2;
2150   if (!bound1.m_data.is_empty())
2151   {
2152     query_interpolate(c, bound1, stat1);
2153     query_normalize(c, stat1.m_value);
2154     stat.m_rule[1] = stat1.m_rule;
2155   }
2156   if (!bound2.m_data.is_empty())
2157   {
2158     query_interpolate(c, bound2, stat2);
2159     query_normalize(c, stat2.m_value);
2160     stat.m_rule[2] = stat2.m_rule;
2161   }
2162 
2163   const StatValue& value1 = stat1.m_value;
2164   const StatValue& value2 = stat2.m_value;
2165   const uint posL1 = stat1.m_pos - 1; // invalid if posH1 == posMIN
2166   const uint posH1 = stat1.m_pos;
2167   const uint posL2 = stat2.m_pos - 1; // invalid if posH2 == posMIN
2168   const uint posH2 = stat2.m_pos;
2169   const uint cnt1 = bound1.m_data.get_cnt();
2170   const uint cnt2 = bound2.m_data.get_cnt();
2171   const uint mincnt = std::min(cnt1, cnt2);
2172   Uint32 numEq = 0; // of bound1,bound2
2173 
2174   if (bound1.m_data.is_empty())
2175   {
2176     stat.m_rule[0] = "r1.3";
2177     value.m_rir = value2.m_rir;
2178     for (uint k = 0; k < keyAttrs; k++)
2179       value.m_unq[k] = value2.m_unq[k];
2180     return;
2181   }
2182   if (bound2.m_data.is_empty())
2183   {
2184     stat.m_rule[0] = "r1.4";
2185     value.m_rir = c.get_rir(posMAX) - value1.m_rir;
2186     for (uint k = 0; k < keyAttrs; k++)
2187       value.m_unq[k] = c.get_unq(posMAX, k) - value1.m_unq[k];
2188     return;
2189   }
2190   if (posH1 > posH2)
2191   {
2192     stat.m_rule[0] = "r1.5";
2193     value.m_empty = true;
2194     return;
2195   }
2196   // also returns number of equal initial components
2197   if (bound1.m_bound.cmp(bound2.m_bound, mincnt, numEq) >= 0)
2198   {
2199     stat.m_rule[0] = "r1.6";
2200     value.m_empty = true;
2201     return;
2202   }
2203   if (posH1 == posMIN)
2204   {
2205     stat.m_rule[0] = "r1.7";
2206     value.m_rir = value2.m_rir - value1.m_rir;
2207     for (uint k = 0; k < keyAttrs; k++)
2208       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2209     return;
2210   }
2211   if (posH2 == posMAX + 1)
2212   {
2213     stat.m_rule[0] = "r1.8";
2214     value.m_rir = value2.m_rir - value1.m_rir;
2215     for (uint k = 0; k <= keyAttrs; k++)
2216       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2217     return;
2218   }
2219   if (posL1 == posL2)
2220   {
2221     assert(posH1 == posH2);
2222     if (cnt1 == keyAttrs &&
2223         cnt2 == keyAttrs &&
2224         numEq == keyAttrs) {
2225       stat.m_rule[0] = "r2.1";
2226       assert(bound1.m_bound.get_side() == -1 &&
2227              bound2.m_bound.get_side() == +1);
2228       assert(stat1.m_numEqL < keyAttrs && stat2.m_numEqH < keyAttrs);
2229       value.m_rir = c.get_rpk(posL1, posH1, keyAttrs - 1);
2230       for (uint k = 0; k < keyAttrs; k++)
2231         value.m_unq[k] = value.m_rir / c.get_rpk(posL1, posH1, k);
2232       return;
2233     }
2234     if (numEq != 0)
2235     {
2236       stat.m_rule[0] = "r2.2";
2237       // skip for now
2238     }
2239     if (true)
2240     {
2241       stat.m_rule[0] = "r2.3";
2242       const double w = 0.5;
2243       value.m_rir = w * c.get_rir(posL1, posH1);
2244       for (uint k = 0; k < keyAttrs; k++)
2245         value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2246       return;
2247     }
2248   }
2249   if (posH1 == posL2)
2250   {
2251     if (cnt1 == keyAttrs &&
2252         cnt2 == keyAttrs &&
2253         numEq == keyAttrs) {
2254       stat.m_rule[0] = "r3.1";
2255       assert(bound1.m_bound.get_side() == -1 &&
2256              bound2.m_bound.get_side() == +1);
2257       assert(stat1.m_numEqH == keyAttrs && stat2.m_numEqL == keyAttrs);
2258       value.m_rir = value2.m_rir - value1.m_rir;
2259       for (uint k = 0; k < keyAttrs; k++)
2260         value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2261       return;
2262     }
2263     if (numEq != 0)
2264     {
2265       stat.m_rule[0] = "r3.2";
2266       // skip for now
2267     }
2268     if (true)
2269     {
2270       stat.m_rule[0] = "r3.3";
2271       const double w = 0.5;
2272       value.m_rir = w * c.get_rir(posL1, posH1);
2273       for (uint k = 0; k < keyAttrs; k++)
2274         value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2275       return;
2276     }
2277   }
2278   if (true)
2279   {
2280     stat.m_rule[0] = "r4";
2281     value.m_rir = value2.m_rir - value1.m_rir;
2282     for (uint k = 0; k < keyAttrs; k++)
2283       value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2284     return;
2285   }
2286 }
2287 
2288 void
query_interpolate(const Cache & c,const Bound & bound,StatBound & stat)2289 NdbIndexStatImpl::query_interpolate(const Cache& c,
2290                                     const Bound& bound,
2291                                     StatBound& stat)
2292 {
2293   const uint keyAttrs = c.m_keyAttrs;
2294   StatValue& value = stat.m_value;
2295   value.m_empty = false;
2296   stat.m_rule = "-";
2297 
2298   query_search(c, bound, stat);
2299 
2300   const uint posMIN = 0;
2301   const uint posMAX = c.m_sampleCount - 1;
2302   const uint posL = stat.m_pos - 1; // invalid if posH == posMIN
2303   const uint posH = stat.m_pos;
2304   const uint cnt = bound.m_data.get_cnt();
2305   const int side = bound.m_bound.get_side();
2306 
2307   if (posH == posMIN)
2308   {
2309     if (cnt == keyAttrs &&
2310         cnt == stat.m_numEqH) {
2311       stat.m_rule = "b1.1";
2312       assert(side == -1);
2313       value.m_rir = c.get_rir(posMIN) - c.get_rpk(posMIN, keyAttrs - 1);
2314       for (uint k = 0; k < keyAttrs; k++)
2315         value.m_unq[k] = c.get_unq(posMIN, k) - 1;
2316       return;
2317     }
2318     if (true)
2319     {
2320       stat.m_rule = "b1.2";
2321       value.m_empty = true;
2322       return;
2323     }
2324   }
2325   if (posH == posMAX + 1)
2326   {
2327     stat.m_rule = "b2";
2328     value.m_rir = c.get_rir(posMAX);
2329     for (uint k = 0; k < keyAttrs; k++)
2330       value.m_unq[k] = c.get_unq(posMAX, k);
2331     return;
2332   }
2333   if (cnt == keyAttrs &&
2334       cnt == stat.m_numEqL) {
2335     stat.m_rule = "b3.1";
2336     assert(side == +1);
2337     value.m_rir = c.get_rir(posL);
2338     for (uint k = 0; k < keyAttrs; k++)
2339       value.m_unq[k] = c.get_unq(posL, k);
2340     return;
2341   }
2342   if (cnt == keyAttrs &&
2343       cnt == stat.m_numEqH &&
2344       side == +1) {
2345     stat.m_rule = "b3.2";
2346     value.m_rir = c.get_rir(posH);
2347     for (uint k = 0; k < keyAttrs; k++)
2348       value.m_unq[k] = c.get_unq(posH, k);
2349     return;
2350   }
2351   if (cnt == keyAttrs &&
2352       cnt == stat.m_numEqH &&
2353       side == -1) {
2354     stat.m_rule = "b3.3";
2355     const double u = c.get_unq(posL, posH, keyAttrs - 1);
2356     const double wL = 1.0 / u;
2357     const double wH = 1.0 - wL;
2358     value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2359     for (uint k = 0; k < keyAttrs; k++)
2360       value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2361     return;
2362   }
2363   if (true)
2364   {
2365     stat.m_rule = "b4";
2366     const double wL = 0.5;
2367     const double wH = 0.5;
2368     value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2369     for (uint k = 0; k < keyAttrs; k++)
2370       value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2371     return;
2372   }
2373 }
2374 
2375 void
query_search(const Cache & c,const Bound & bound,StatBound & stat)2376 NdbIndexStatImpl::query_search(const Cache& c,
2377                                const Bound& bound,
2378                                StatBound& stat)
2379 {
2380   assert(c.m_sampleCount > 0);
2381   assert(!bound.m_data.is_empty());
2382   Uint32 numEq;
2383 
2384   int lo = -1;
2385   int hi = c.m_sampleCount;
2386   while (hi - lo > 1)
2387   {
2388     int j = (hi + lo) / 2;
2389     assert(lo < j && j < hi);
2390     int res = query_keycmp(c, bound, j, numEq);
2391     if (res < 0)
2392       lo = j;
2393     else if (res > 0)
2394       hi = j;
2395     else
2396     {
2397       assert(false);
2398       return;
2399     }
2400   }
2401   assert(hi - lo == 1);
2402   stat.m_pos = hi;
2403 
2404   if (stat.m_pos > 0)
2405   {
2406     (void)query_keycmp(c, bound, stat.m_pos - 1, stat.m_numEqL);
2407   }
2408   if (stat.m_pos < c.m_sampleCount)
2409   {
2410     (void)query_keycmp(c, bound, stat.m_pos, stat.m_numEqH);
2411   }
2412 }
2413 
2414 // return <0/>0 for key before/after bound
2415 int
query_keycmp(const Cache & c,const Bound & bound,uint pos,Uint32 & numEq)2416 NdbIndexStatImpl::query_keycmp(const Cache& c,
2417                                const Bound& bound,
2418                                uint pos, Uint32& numEq)
2419 {
2420   const uint addr = c.get_keyaddr(pos);
2421   const Uint8* key = c.get_keyptr(addr);
2422   NdbPack::DataC keyData(m_keySpec, false);
2423   keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
2424   // reverse result for key vs bound
2425   Uint32 cnt = bound.m_bound.get_data().get_cnt();
2426   int res = (-1) * bound.m_bound.cmp(keyData, cnt, numEq);
2427   return res;
2428 }
2429 
2430 // events and polling
2431 
2432 int
create_sysevents(Ndb * ndb)2433 NdbIndexStatImpl::create_sysevents(Ndb* ndb)
2434 {
2435   Sys sys(this, ndb);
2436   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2437 
2438   if (check_systables(sys) == -1)
2439     return -1;
2440   const NdbDictionary::Table* tab = sys.m_headtable;
2441   require(tab != 0);
2442 
2443   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2444   NdbDictionary::Event ev(evname, *tab);
2445   ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
2446   ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
2447   ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
2448   for (int i = 0; i < tab->getNoOfColumns(); i++)
2449     ev.addEventColumn(i);
2450   ev.setReport(NdbDictionary::Event::ER_UPDATED);
2451 
2452   if (dic->createEvent(ev) == -1)
2453   {
2454     setError(dic->getNdbError().code, __LINE__);
2455     return -1;
2456   }
2457   return 0;
2458 }
2459 
2460 int
drop_sysevents(Ndb * ndb)2461 NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
2462 {
2463   Sys sys(this, ndb);
2464   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2465 
2466   if (check_systables(sys) == -1)
2467     return -1;
2468 
2469   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2470   if (dic->dropEvent(evname) == -1)
2471   {
2472     int code = dic->getNdbError().code;
2473     if (code != 4710)
2474     {
2475       setError(dic->getNdbError().code, __LINE__);
2476       return -1;
2477     }
2478   }
2479   return 0;
2480 }
2481 
2482 int
check_sysevents(Ndb * ndb)2483 NdbIndexStatImpl::check_sysevents(Ndb* ndb)
2484 {
2485   Sys sys(this, ndb);
2486   NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2487 
2488   if (check_systables(sys) == -1)
2489     return -1;
2490 
2491   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2492   const NdbDictionary::Event* ev = dic->getEvent(evname);
2493   if (ev == 0)
2494   {
2495     setError(dic->getNdbError().code, __LINE__);
2496     return -1;
2497   }
2498   delete ev; // getEvent() creates new instance
2499   return 0;
2500 }
2501 
2502 int
create_listener(Ndb * ndb)2503 NdbIndexStatImpl::create_listener(Ndb* ndb)
2504 {
2505   if (m_eventOp != 0)
2506   {
2507     setError(UsageError, __LINE__);
2508     return -1;
2509   }
2510   const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2511   m_eventOp = ndb->createEventOperation(evname);
2512   if (m_eventOp == 0)
2513   {
2514     setError(ndb->getNdbError().code, __LINE__);
2515     return -1;
2516   }
2517 
2518   // all columns are non-nullable
2519   Head& head = m_facadeHead;
2520   if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
2521       m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
2522       m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
2523       m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
2524       m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
2525       m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
2526       m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
2527       m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
2528       m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
2529   {
2530     setError(m_eventOp->getNdbError().code, __LINE__);
2531     return -1;
2532   }
2533   // wl4124_todo why this
2534   static Head xxx;
2535   if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
2536       m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
2537       m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
2538       m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
2539       m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
2540       m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
2541       m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
2542       m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
2543       m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
2544   {
2545     setError(m_eventOp->getNdbError().code, __LINE__);
2546     return -1;
2547   }
2548   return 0;
2549 }
2550 
2551 int
execute_listener(Ndb * ndb)2552 NdbIndexStatImpl::execute_listener(Ndb* ndb)
2553 {
2554   if (m_eventOp == 0)
2555   {
2556     setError(UsageError, __LINE__);
2557     return -1;
2558   }
2559   if (m_eventOp->execute() == -1)
2560   {
2561     setError(m_eventOp->getNdbError().code, __LINE__);
2562     return -1;
2563   }
2564   return 0;
2565 }
2566 
2567 int
poll_listener(Ndb * ndb,int max_wait_ms)2568 NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
2569 {
2570   int ret;
2571   if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
2572   {
2573     setError(ndb->getNdbError().code, __LINE__);
2574     return -1;
2575   }
2576   return (ret == 0 ? 0 : 1);
2577 }
2578 
2579 int
next_listener(Ndb * ndb)2580 NdbIndexStatImpl::next_listener(Ndb* ndb)
2581 {
2582   NdbEventOperation* op = ndb->nextEvent();
2583   if (op == 0)
2584     return 0;
2585 
2586   Head& head = m_facadeHead;
2587   head.m_eventType = (int)op->getEventType();
2588   return 1;
2589 }
2590 
2591 int
drop_listener(Ndb * ndb)2592 NdbIndexStatImpl::drop_listener(Ndb* ndb)
2593 {
2594   if (m_eventOp != 0)
2595   {
2596     // NOTE! dropEventoperation always return 0
2597     int ret;
2598     (void)ret; //USED
2599     ret = ndb->dropEventOperation(m_eventOp);
2600     assert(ret == 0);
2601     m_eventOp = 0;
2602   }
2603   return 0;
2604 }
2605 
2606 // mem alloc - default impl
2607 
MemDefault()2608 NdbIndexStatImpl::MemDefault::MemDefault()
2609 {
2610 }
2611 
~MemDefault()2612 NdbIndexStatImpl::MemDefault::~MemDefault()
2613 {
2614 }
2615 
2616 void*
mem_alloc(UintPtr size)2617 NdbIndexStatImpl::MemDefault::mem_alloc(UintPtr size)
2618 {
2619   void* ptr = malloc(size);
2620   return ptr;
2621 }
2622 
2623 void
mem_free(void * ptr)2624 NdbIndexStatImpl::MemDefault::mem_free(void* ptr)
2625 {
2626   if (ptr != 0)
2627     free(ptr);
2628 }
2629 
2630 // error
2631 
2632 void
setError(int code,int line,int extra)2633 NdbIndexStatImpl::setError(int code, int line, int extra)
2634 {
2635   if (code == 0)
2636     code = InternalError;
2637   m_error.code = code;
2638   m_error.line = line;
2639   m_error.extra = extra;
2640 #ifdef VM_TRACE
2641 #ifdef NDB_USE_GET_ENV
2642   const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_ON_ERROR", (char*)0, 0);
2643   if (p != 0 && strchr("1Y", p[0]) != 0)
2644     abort();
2645 #endif
2646 #endif
2647 }
2648 
2649 void
setError(const Con & con,int line)2650 NdbIndexStatImpl::setError(const Con& con, int line)
2651 {
2652   int code = 0;
2653   if (code == 0 && con.m_op != 0)
2654   {
2655     code = con.m_op->getNdbError().code;
2656   }
2657   if (code == 0 && con.m_scanop != 0)
2658   {
2659     code = con.m_scanop->getNdbError().code;
2660   }
2661   if (code == 0 && con.m_tx != 0)
2662   {
2663     code = con.m_tx->getNdbError().code;
2664   }
2665   if (code == 0 && con.m_dic != 0)
2666   {
2667     code = con.m_dic->getNdbError().code;
2668   }
2669   if (code == 0 && con.m_ndb != 0)
2670   {
2671     code = con.m_ndb->getNdbError().code;
2672   }
2673   setError(code, line);
2674 }
2675 
2676 void
mapError(const int * map,int code)2677 NdbIndexStatImpl::mapError(const int* map, int code)
2678 {
2679   while (*map != 0)
2680   {
2681     if (m_error.code == *map) {
2682       m_error.code = code;
2683       break;
2684     }
2685     map++;
2686   }
2687 }
2688