1 /*
2    Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 #include <NdbPack.hpp>
27 #include <NdbOut.hpp>
28 #include <NdbEnv.h>
29 
30 // NdbPack::Error
31 
32 int
get_error_code() const33 NdbPack::Error::get_error_code() const
34 {
35   return m_error_code;
36 }
37 
38 int
get_error_line() const39 NdbPack::Error::get_error_line() const
40 {
41   return m_error_line;
42 }
43 
44 void
set_error(int code,int line) const45 NdbPack::Error::set_error(int code, int line) const
46 {
47   m_error_code = code;
48   m_error_line = line;
49 #ifdef VM_TRACE
50   const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
51   if (p != 0 && strchr("1Y", p[0]) != 0)
52     require(false);
53 #endif
54 }
55 
56 void
set_error(const Error & e2) const57 NdbPack::Error::set_error(const Error& e2) const
58 {
59   set_error(e2.m_error_code, e2.m_error_line);
60 }
61 
62 // NdbPack::Endian
63 
64 void
convert(void * ptr,Uint32 len)65 NdbPack::Endian::convert(void* ptr, Uint32 len)
66 {
67   Uint8* p = (Uint8*)ptr;
68   for (Uint32 i = 0; i < len / 2; i++)
69   {
70     Uint32 j = len - i - 1;
71     Uint8 tmp = p[i];
72     p[i] = p[j];
73     p[j] = tmp;
74   }
75 }
76 
77 // NdbPack::Type
78 
79 struct Ndb_pack_type_info {
80   bool m_supported;
81   Uint16 m_fixSize;     // if non-zero must have this exact size
82   Uint16 m_arrayType;   // 0,1,2 length bytes
83   bool m_charType;      // type with character set
84   bool m_convert;       // convert endian (reverse byte order)
85 };
86 
87 static const Ndb_pack_type_info
88 g_ndb_pack_type_info[] = {
89   { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
90   { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
91   { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
92   { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
93   { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
94   { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
95   { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
96   { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
97   { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
98   { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
99   { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
100   { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
101   { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
102   { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
103   { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
104   { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
105   { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
106   { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
107   { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
108   { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
109   { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
110   { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
111   { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
112   { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
113   { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
114   { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
115   { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
116   { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
117   { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
118   { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
119   { 1, 0, 0, 0, 0 }  // NDB_TYPE_DECIMALUNSIGNED
120 };
121 
122 static const int g_ndb_pack_type_info_cnt =
123   sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
124 
125 int
complete()126 NdbPack::Type::complete()
127 {
128   if (m_typeId == 0)
129   {
130     set_error(TypeNotSet, __LINE__);
131     return -1;
132   }
133   if (m_typeId >= g_ndb_pack_type_info_cnt)
134   {
135     set_error(TypeNotSet, __LINE__);
136     return -1;
137   }
138   const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
139   if (!info.m_supported)
140   {
141     set_error(TypeNotSupported, __LINE__);
142     return -1;
143   }
144   if (m_byteSize == 0)
145   {
146     set_error(TypeSizeZero, __LINE__);
147     return -1;
148   }
149   if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
150   {
151     set_error(TypeFixSizeInvalid, __LINE__);
152     return -1;
153   }
154   if (!(m_nullable <= 1))
155   {
156     set_error(TypeNullableNotBool, __LINE__);
157     return -1;
158   }
159   if (info.m_charType && m_csNumber == 0)
160   {
161     set_error(CharsetNotSpecified, __LINE__);
162     return -1;
163   }
164   if (info.m_charType && all_charsets[m_csNumber] == 0)
165   {
166     CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
167     if (cs == 0)
168     {
169       set_error(CharsetNotFound, __LINE__);
170       return -1;
171     }
172     all_charsets[m_csNumber] = cs; // yes caller must do this
173   }
174   if (!info.m_charType && m_csNumber != 0)
175   {
176     set_error(CharsetNotAllowed, __LINE__);
177     return -1;
178   }
179   m_arrayType = info.m_arrayType;
180   return 0;
181 }
182 
183 // NdbPack::Spec
184 
185 int
add(Type type)186 NdbPack::Spec::add(Type type)
187 {
188   Uint32 cnt = m_cnt;
189   Uint32 nullable_cnt = m_nullableCnt;
190   Uint32 varsize_cnt = m_varsizeCnt;
191   Uint32 max_byte_size = m_maxByteSize;
192   if (type.complete() == -1)
193   {
194     set_error(type);
195     return -1;
196   }
197   type.m_nullbitPos = 0xFFFF;
198   if (type.m_nullable)
199   {
200     type.m_nullbitPos = nullable_cnt;
201     nullable_cnt++;
202   }
203   if (type.m_arrayType != 0)
204   {
205     varsize_cnt++;
206   }
207   max_byte_size += type.m_byteSize;
208   if (cnt >= m_bufMaxCnt)
209   {
210     set_error(SpecBufOverflow, __LINE__);
211     return -1;
212   }
213   m_buf[cnt] = type;
214   cnt++;
215   m_cnt = cnt;
216   m_nullableCnt = nullable_cnt;
217   m_varsizeCnt = varsize_cnt;
218   m_maxByteSize = max_byte_size;
219   return 0;
220 }
221 
222 int
add(Type type,Uint32 cnt)223 NdbPack::Spec::add(Type type, Uint32 cnt)
224 {
225   for (Uint32 i = 0; i < cnt; i++)
226   {
227     if (add(type) == -1)
228       return -1;
229   }
230   return 0;
231 }
232 
233 void
copy(const Spec & s2)234 NdbPack::Spec::copy(const Spec& s2)
235 {
236   assert(m_bufMaxCnt >= s2.m_cnt);
237   reset();
238   m_cnt = s2.m_cnt;
239   m_nullableCnt = s2.m_nullableCnt;
240   m_varsizeCnt = s2.m_varsizeCnt;
241   m_maxByteSize = s2.m_maxByteSize;
242   for (Uint32 i = 0; i < m_cnt; i++)
243   {
244     m_buf[i] = s2.m_buf[i];
245   }
246 }
247 
248 // NdbPack::Iter
249 
250 int
desc(const Uint8 * item)251 NdbPack::Iter::desc(const Uint8* item)
252 {
253   const Uint32 i = m_cnt; // item index
254   assert(i < m_spec.m_cnt);
255   const Type& type = m_spec.m_buf[i];
256   const Uint32 lenBytes = type.m_arrayType;
257   Uint32 bareLen = 0;
258   switch (lenBytes) {
259   case 0:
260     bareLen = type.m_byteSize;
261     break;
262   case 1:
263     bareLen = item[0];
264     break;
265   case 2:
266     bareLen = item[0] + (item[1] << 8);
267     break;
268   default:
269     assert(false);
270     set_error(InternalError, __LINE__);
271     return -1;
272   }
273   const Uint32 itemLen = lenBytes + bareLen;
274   if (itemLen > type.m_byteSize)
275   {
276     set_error(DataValueOverflow, __LINE__);
277     return -1;
278   }
279   m_itemPos += m_itemLen; // skip previous item
280   m_cnt++;
281   m_lenBytes = lenBytes;
282   m_bareLen = bareLen;
283   m_itemLen = itemLen;
284   return 0;
285 }
286 
287 int
desc_null()288 NdbPack::Iter::desc_null()
289 {
290   assert(m_cnt < m_spec.m_cnt);
291   // caller checks if null allowed
292   m_itemPos += m_itemLen; // skip previous item
293   m_cnt++;
294   m_nullCnt++;
295   m_lenBytes = 0;
296   m_bareLen = 0;
297   m_itemLen = 0;
298   return 0;
299 }
300 
301 int
cmp(const Iter & r2,const Uint8 * buf1,const Uint8 * buf2) const302 NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
303 {
304   const Iter& r1 = *this;
305   assert(&r1.m_spec == &r2.m_spec);
306   assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
307   const Uint32 i = r1.m_cnt - 1; // item index
308   int res = 0;
309   const Uint32 n1 = r1.m_itemLen;
310   const Uint32 n2 = r2.m_itemLen;
311   if (n1 != 0)
312   {
313     if (n2 != 0)
314     {
315       const Type& type = r1.m_spec.m_buf[i];
316       const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
317       const Uint8* p1 = &buf1[r1.m_itemPos];
318       const Uint8* p2 = &buf2[r2.m_itemPos];
319       CHARSET_INFO* cs = all_charsets[type.m_csNumber];
320       res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2);
321     }
322     else
323     {
324       res = +1;
325     }
326   }
327   else
328   {
329     if (n2 != 0)
330       res = -1;
331   }
332   return res;
333 }
334 
335 // NdbPack::DataC
336 
337 int
desc(Iter & r) const338 NdbPack::DataC::desc(Iter& r) const
339 {
340   const Uint32 i = r.m_cnt; // item index
341   assert(i < m_cnt);
342   const Type& type = m_spec.m_buf[i];
343   if (type.m_nullable || m_allNullable)
344   {
345     Uint32 nullbitPos = 0;
346     if (!m_allNullable)
347       nullbitPos = type.m_nullbitPos;
348     else
349       nullbitPos = i;
350     const Uint32 byte_pos = nullbitPos / 8;
351     const Uint32 bit_pos = nullbitPos % 8;
352     const Uint8 bit_mask = (1 << bit_pos);
353     const Uint8& the_byte = m_buf[byte_pos];
354     if ((the_byte & bit_mask) != 0)
355     {
356       if (r.desc_null() == -1)
357       {
358         set_error(r);
359         return -1;
360       }
361       return 0;
362     }
363   }
364   const Uint32 pos = r.m_itemPos + r.m_itemLen;
365   const Uint8* item = &m_buf[pos];
366   if (r.desc(item) == -1)
367   {
368     set_error(r);
369     return -1;
370   }
371   return 0;
372 }
373 
374 int
cmp(const DataC & d2,Uint32 cnt,Uint32 & num_eq) const375 NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
376 {
377   const DataC& d1 = *this;
378   assert(cnt <= d1.m_cnt);
379   assert(cnt <= d2.m_cnt);
380   Iter r1(d1);
381   Iter r2(d2);
382   int res = 0;
383   Uint32 i; // remember last
384   for (i = 0; i < cnt; i++)
385   {
386     d1.desc(r1);
387     d2.desc(r2);
388     res = r1.cmp(r2, d1.m_buf, d2.m_buf);
389     if (res != 0)
390       break;
391   }
392   num_eq = i;
393   return res;
394 }
395 
396 // NdbPack::Data
397 
398 int
add(const void * data,Uint32 * len_out)399 NdbPack::Data::add(const void* data, Uint32* len_out)
400 {
401   assert(data != 0);
402   const Uint8* item = (const Uint8*)data;
403   const Uint32 i = m_cnt; // item index
404   if (i >= m_spec.m_cnt)
405   {
406     set_error(DataCntOverflow, __LINE__);
407     return -1;
408   }
409   Iter& r = m_iter;
410   assert(r.m_cnt == i);
411   const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
412   if (r.desc(item) == -1)
413   {
414     set_error(r);
415     return -1;
416   }
417   if (fullLen + r.m_itemLen > m_bufMaxLen)
418   {
419     set_error(DataBufOverflow, __LINE__);
420     return -1;
421   }
422   memcpy(&m_buf[fullLen], item, r.m_itemLen);
423   *len_out = r.m_itemLen;
424   m_cnt++;
425   return 0;
426 }
427 
428 int
add(const void * data,Uint32 cnt,Uint32 * len_out)429 NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
430 {
431   const Uint8* data_ptr = (const Uint8*)data;
432   Uint32 len_tot = 0;
433   for (Uint32 i = 0; i < cnt; i++)
434   {
435     Uint32 len;
436     if (add(data_ptr, &len) == -1)
437       return -1;
438     if (data != 0)
439       data_ptr += len;
440     len_tot += len;
441   }
442   *len_out = len_tot;
443   return 0;
444 }
445 
446 int
add_null(Uint32 * len_out)447 NdbPack::Data::add_null(Uint32* len_out)
448 {
449   const Uint32 i = m_cnt; // item index
450   if (i >= m_spec.m_cnt)
451   {
452     set_error(DataCntOverflow, __LINE__);
453     return -1;
454   }
455   Iter& r = m_iter;
456   assert(r.m_cnt == i);
457   if (r.desc_null() == -1)
458   {
459     set_error(r);
460     return -1;
461   }
462   Uint32 nullbitPos = 0;
463   if (!m_allNullable)
464   {
465     const Type& type = m_spec.m_buf[i];
466     if (!type.m_nullable)
467     {
468       set_error(DataNotNullable, __LINE__);
469       return -1;
470     }
471     nullbitPos = type.m_nullbitPos;
472   }
473   else
474   {
475     nullbitPos = i;
476   }
477   const Uint32 byte_pos = nullbitPos / 8;
478   const Uint32 bit_pos = nullbitPos % 8;
479   const Uint8 bit_mask = (1 << bit_pos);
480   Uint8& the_byte = m_buf[m_varBytes + byte_pos];
481   assert((the_byte & bit_mask) == 0);
482   the_byte |= bit_mask;
483   *len_out = r.m_itemLen;
484   m_cnt++;
485   return 0;
486 }
487 
488 int
add_null(Uint32 cnt,Uint32 * len_out)489 NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
490 {
491   Uint32 len_tot = 0;
492   for (Uint32 i = 0; i < cnt; i++)
493   {
494     Uint32 len;
495     if (add_null(&len) == -1)
496       return -1;
497     len_tot += len;
498   }
499   *len_out = len_tot;
500   return 0;
501 }
502 
503 int
add_poai(const Uint32 * poai,Uint32 * len_out)504 NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
505 {
506   const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
507   if (!ah.isNULL())
508   {
509     if (add(&poai[1], len_out) == -1)
510       return -1;
511   }
512   else
513   {
514     if (add_null(len_out) == -1)
515       return -1;
516   }
517   if (ah.getByteSize() != *len_out)
518   {
519     set_error(InvalidAttrInfo, __LINE__);
520     return -1;
521   }
522   return 0;
523 }
524 
525 int
add_poai(const Uint32 * poai,Uint32 cnt,Uint32 * len_out)526 NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
527 {
528   Uint32 len_tot = 0;
529   for (Uint32 i = 0; i < cnt; i++)
530   {
531     Uint32 len;
532     if (add_poai(poai, &len) == -1)
533       return -1;
534     len_tot += len;
535     poai += 1 + (len + 3) / 4;
536   }
537   *len_out = len_tot;
538   return 0;
539 }
540 
541 int
finalize_impl()542 NdbPack::Data::finalize_impl()
543 {
544   const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
545   switch (m_varBytes) {
546   // case 0: inlined
547   case 1:
548     if (dataLen <= 0xFF)
549     {
550       m_buf[0] = dataLen;
551       return 0;
552     }
553     break;
554   case 2:
555     if (dataLen <= 0xFFFF)
556     {
557       m_buf[0] = (dataLen & 0xFF);
558       m_buf[1] = (dataLen >> 8);
559       return 0;
560     }
561     break;
562   default:
563     break;
564   }
565   set_error(InternalError, __LINE__);
566   return -1;
567 }
568 
569 int
desc_all(Uint32 cnt,Endian::Value from_endian)570 NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
571 {
572   if (from_endian == NdbPack::Endian::Native)
573     from_endian = NdbPack::Endian::get_endian();
574   m_endian = from_endian;
575   assert(m_cnt == 0); // reset() would destroy nullmask
576   for (Uint32 i = 0; i < cnt; i++)
577   {
578     m_cnt++;
579     if (desc(m_iter) == -1)
580       return -1;
581   }
582   if (finalize() == -1)
583     return -1;
584   return 0;
585 }
586 
587 int
copy(const DataC & d2)588 NdbPack::Data::copy(const DataC& d2)
589 {
590   reset();
591   Iter r2(d2);
592   const Uint32 cnt2 = d2.m_cnt;
593   for (Uint32 i = 0; i < cnt2; i++)
594   {
595     if (d2.desc(r2) == -1)
596       return -1;
597     Uint32 len_out = ~(Uint32)0;
598     if (r2.m_itemLen != 0)
599     {
600       if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
601           return -1;
602       assert(len_out == r2.m_itemLen);
603     }
604     else
605     {
606       if (add_null(&len_out) == -1)
607         return -1;
608       assert(len_out ==0);
609     }
610   }
611   if (finalize() == -1)
612     return -1;
613   return 0;
614 }
615 
616 int
convert_impl(Endian::Value to_endian)617 NdbPack::Data::convert_impl(Endian::Value to_endian)
618 {
619   const Spec& spec = m_spec;
620   Iter r(*this);
621   for (Uint32 i = 0; i < m_cnt; i++)
622   {
623     if (DataC::desc(r) == -1)
624     {
625       set_error(r);
626       return -1;
627     }
628     const Type& type = spec.m_buf[i];
629     const Uint32 typeId = type.m_typeId;
630     const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
631     if (info.m_convert)
632     {
633       Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
634       Uint32 len = r.m_itemLen;
635       Endian::convert(ptr, len);
636     }
637   }
638   return 0;
639 }
640 
641 // NdbPack::BoundC
642 
643 int
finalize(int side)644 NdbPack::BoundC::finalize(int side)
645 {
646   if (m_data.m_cnt == 0 && side != 0)
647   {
648     set_error(BoundEmptySide, __LINE__);
649     return -1;
650   }
651   if (m_data.m_cnt != 0 && side != -1 && side != +1)
652   {
653     set_error(BoundNonemptySide, __LINE__);
654     return -1;
655   }
656   m_side = side;
657   return 0;
658 }
659 
660 int
cmp(const BoundC & b2,Uint32 cnt,Uint32 & num_eq) const661 NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
662 {
663   const BoundC& b1 = *this;
664   const DataC& d1 = b1.m_data;
665   const DataC& d2 = b2.m_data;
666   int res = d1.cmp(d2, cnt, num_eq);
667   if (res == 0)
668   {
669     if (cnt < d1.m_cnt && cnt < d2.m_cnt)
670       ;
671     else if (d1.m_cnt < d2.m_cnt)
672       res = (+1) * b1.m_side;
673     else if (d1.m_cnt > d2.m_cnt)
674       res = (-1) * b2.m_side;
675     else if (b1.m_side < b2.m_side)
676       res = -1;
677     else if (b1.m_side > b2.m_side)
678       res = +1;
679   }
680   return res;
681 }
682 
683 // NdbPack::Bound
684 
685 // print
686 
Print(char * buf,Uint32 bufsz)687 NdbPack::Print::Print(char* buf, Uint32 bufsz) :
688   m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
689 
690 void
print(const char * fmt,...)691 NdbPack::Print::print(const char* fmt, ...)
692 {
693   va_list ap;
694   va_start(ap, fmt);
695   if (m_bufsz > m_sz)
696   {
697     BaseString::vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
698     m_sz += (Uint32)strlen(&m_buf[m_sz]);
699   }
700   va_end(ap);
701 }
702 
703 // print Type
704 
705 NdbOut&
operator <<(NdbOut & out,const NdbPack::Type & a)706 operator<<(NdbOut& out, const NdbPack::Type& a)
707 {
708   a.print(out);
709   return out;
710 }
711 
712 void
print(NdbOut & out) const713 NdbPack::Type::print(NdbOut& out) const
714 {
715   char buf[200];
716   out << print(buf, sizeof(buf));
717 }
718 
719 const char*
print(char * buf,Uint32 bufsz) const720 NdbPack::Type::print(char* buf, Uint32 bufsz) const
721 {
722   Print p(buf, bufsz);
723   p.print("typeId:%u", m_typeId);
724   p.print(" byteSize:%u", m_byteSize);
725   p.print(" nullable:%u", m_nullable);
726   p.print(" csNumber:%u", m_csNumber);
727   return buf;
728 }
729 
730 // print Spec
731 
732 NdbOut&
operator <<(NdbOut & out,const NdbPack::Spec & a)733 operator<<(NdbOut& out, const NdbPack::Spec& a)
734 {
735   a.print(out);
736   return out;
737 }
738 
739 void
print(NdbOut & out) const740 NdbPack::Spec::print(NdbOut& out) const
741 {
742   char buf[8000];
743   out << print(buf, sizeof(buf));
744 }
745 
746 const char*
print(char * buf,Uint32 bufsz) const747 NdbPack::Spec::print(char* buf, Uint32 bufsz) const
748 {
749   Print p(buf, bufsz);
750   p.print("cnt:%u", m_cnt);
751   p.print(" nullableCnt:%u", m_nullableCnt);
752   p.print(" varsizeCnt:%u", m_varsizeCnt);
753   p.print(" nullmaskLen:%u", get_nullmask_len(false));
754   p.print(" maxByteSize:%u", m_maxByteSize);
755   for (Uint32 i = 0; i < m_cnt; i++)
756   {
757     const Type& type = m_buf[i];
758     p.print(" [%u", i);
759     p.print(" typeId:%u", type.m_typeId);
760     p.print(" nullable:%u", type.m_nullable);
761     p.print(" byteSize:%u", type.m_byteSize);
762     p.print(" csNumber:%u", type.m_csNumber);
763     p.print("]");
764   }
765   return buf;
766 }
767 
768 // print DataC
769 
770 bool g_ndb_pack_print_hex_always = true;
771 
772 NdbOut&
operator <<(NdbOut & out,const NdbPack::DataC & a)773 operator<<(NdbOut& out, const NdbPack::DataC& a)
774 {
775   a.print(out);
776   return out;
777 }
778 
779 void
print(NdbOut & out) const780 NdbPack::DataC::print(NdbOut& out) const
781 {
782   char buf[8000];
783   out << print(buf, sizeof(buf));
784 }
785 
786 const char*
print(char * buf,Uint32 bufsz,bool convert_flag) const787 NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
788 {
789   Print p(buf, bufsz);
790   const Spec& spec = m_spec;
791   const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
792   if (nullmask_len != 0)
793   {
794     p.print("nullmask:");
795     for (Uint32 i = 0; i < nullmask_len; i++)
796     {
797       int x = m_buf[i];
798       p.print("%02x", x);
799     }
800   }
801   Iter r(*this);
802   for (Uint32 i = 0; i < m_cnt; i++)
803   {
804     desc(r);
805     const Uint8* value = &m_buf[r.m_itemPos];
806     p.print(" [%u", i);
807     p.print(" pos:%u", r.m_itemPos);
808     p.print(" len:%u", r.m_itemLen);
809     if (r.m_itemLen > 0)
810     {
811       p.print(" value:");
812       // some specific types for debugging
813       const Type& type = spec.m_buf[i];
814       bool ok = true;
815       switch (type.m_typeId) {
816       case NDB_TYPE_TINYINT:
817         {
818           Int8 x;
819           memcpy(&x, value, 1);
820           if (convert_flag)
821             Endian::convert(&x, 1);
822           p.print("%d", (int)x);
823         }
824         break;
825       case NDB_TYPE_TINYUNSIGNED:
826         {
827           Uint8 x;
828           memcpy(&x, value, 1);
829           if (convert_flag)
830             Endian::convert(&x, 1);
831           p.print("%u", (uint)x);
832         }
833         break;
834       case NDB_TYPE_SMALLINT:
835         {
836           Int16 x;
837           memcpy(&x, value, 2);
838           if (convert_flag)
839             Endian::convert(&x, 2);
840           p.print("%d", (int)x);
841         }
842         break;
843       case NDB_TYPE_SMALLUNSIGNED:
844         {
845           Uint16 x;
846           memcpy(&x, value, 2);
847           if (convert_flag)
848             Endian::convert(&x, 2);
849           p.print("%u", (uint)x);
850         }
851         break;
852       case NDB_TYPE_INT:
853         {
854           Int32 x;
855           memcpy(&x, value, 4);
856           if (convert_flag)
857             Endian::convert(&x, 4);
858           p.print("%d", (int)x);
859         }
860         break;
861       case NDB_TYPE_UNSIGNED:
862         {
863           Uint32 x;
864           memcpy(&x, value, 4);
865           if (convert_flag)
866             Endian::convert(&x, 4);
867           p.print("%u", (uint)x);
868         }
869         break;
870       case NDB_TYPE_FLOAT:
871         {
872           float x;
873           memcpy(&x, value, 4);
874           if (convert_flag)
875             Endian::convert(&x, 4);
876           p.print("%g", (double)x);
877         }
878         break;
879       case NDB_TYPE_DOUBLE:
880         {
881           double x;
882           memcpy(&x, value, 8);
883           if (convert_flag)
884             Endian::convert(&x, 8);
885           p.print("%g", x);
886         }
887         break;
888       case NDB_TYPE_CHAR:
889       case NDB_TYPE_VARCHAR:
890       case NDB_TYPE_LONGVARCHAR:
891         {
892           const Uint32 off = type.m_arrayType;
893           for (Uint32 j = 0; j < r.m_bareLen; j++)
894           {
895             Uint8 x = value[off + j];
896             p.print("%c", (int)x);
897           }
898         }
899         break;
900       default:
901         ok = false;
902         break;
903       }
904       if (!ok || g_ndb_pack_print_hex_always)
905       {
906         p.print("<");
907         for (Uint32 j = 0; j < r.m_itemLen; j++)
908         {
909           int x = value[j];
910           p.print("%02x", x);
911         }
912         p.print(">");
913       }
914     }
915     p.print("]");
916   }
917   return buf;
918 }
919 
920 // print Data
921 
922 NdbOut&
operator <<(NdbOut & out,const NdbPack::Data & a)923 operator<<(NdbOut& out, const NdbPack::Data& a)
924 {
925   a.print(out);
926   return out;
927 }
928 
929 void
print(NdbOut & out) const930 NdbPack::Data::print(NdbOut& out) const
931 {
932   char buf[8000];
933   out << print(buf, sizeof(buf));
934 }
935 
936 const char*
print(char * buf,Uint32 bufsz) const937 NdbPack::Data::print(char* buf, Uint32 bufsz) const
938 {
939   Print p(buf, bufsz);
940   char* ptr = buf;
941   if (m_varBytes != 0)
942   {
943     p.print("varBytes:");
944     for (Uint32 i = 0; i < m_varBytes; i++)
945     {
946       int r = m_buf[i];
947       p.print("%02x", r);
948     }
949     p.print(" ");
950   }
951   p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
952   p.print(" ");
953   const bool convert_flag =
954     m_endian != Endian::Native &&
955     m_endian != Endian::get_endian();
956   DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
957   return buf;
958 }
959 
960 // print BoundC
961 
962 NdbOut&
operator <<(NdbOut & out,const NdbPack::BoundC & a)963 operator<<(NdbOut& out, const NdbPack::BoundC& a)
964 {
965   a.print(out);
966   return out;
967 }
968 
969 void
print(NdbOut & out) const970 NdbPack::BoundC::print(NdbOut& out) const
971 {
972   char buf[8000];
973   out << print(buf, sizeof(buf));
974 }
975 
976 const char*
print(char * buf,Uint32 bufsz) const977 NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
978 {
979   Print p(buf, bufsz);
980   p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
981   m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
982   return buf;
983 }
984 
985 // print Bound
986 
987 NdbOut&
operator <<(NdbOut & out,const NdbPack::Bound & a)988 operator<<(NdbOut& out, const NdbPack::Bound& a)
989 {
990   a.print(out);
991   return out;
992 }
993 
994 void
print(NdbOut & out) const995 NdbPack::Bound::print(NdbOut& out) const
996 {
997   char buf[8000];
998   out << print(buf, sizeof(buf));
999 }
1000 
1001 const char*
print(char * buf,Uint32 bufsz) const1002 NdbPack::Bound::print(char* buf, Uint32 bufsz) const
1003 {
1004   BoundC::print(buf, bufsz);
1005   return buf;
1006 }
1007 
1008 // validate
1009 
1010 int
validate() const1011 NdbPack::Type::validate() const
1012 {
1013   Type type2 = *this;
1014   if (type2.complete() == -1)
1015   {
1016     set_error(type2);
1017     return -1;
1018   }
1019   if (memcmp(this, &type2, sizeof(Type)) != 0)
1020   {
1021     set_error(ValidationError, __LINE__);
1022     return -1;
1023   }
1024   return 0;
1025 }
1026 
1027 int
validate() const1028 NdbPack::Spec::validate() const
1029 {
1030   Uint32 nullableCnt = 0;
1031   Uint32 varsizeCnt = 0;
1032   for (Uint32 i = 0; i < m_cnt; i++)
1033   {
1034     const Type& type = m_buf[i];
1035     if (type.validate() == -1)
1036     {
1037       set_error(type);
1038       return -1;
1039     }
1040     if (type.m_nullable)
1041       nullableCnt++;
1042     if (type.m_arrayType != 0)
1043       varsizeCnt++;
1044   }
1045   if (m_nullableCnt != nullableCnt)
1046   {
1047     set_error(ValidationError, __LINE__);
1048     return -1;
1049   }
1050   if (m_varsizeCnt != varsizeCnt)
1051   {
1052     set_error(ValidationError, __LINE__);
1053     return -1;
1054   }
1055   return 0;
1056 }
1057 
1058 int
validate() const1059 NdbPack::Data::validate() const
1060 {
1061   if (DataC::validate() == -1)
1062     return -1;
1063   const Iter& r = m_iter;
1064   if (r.m_cnt != m_cnt)
1065   {
1066     set_error(ValidationError, __LINE__);
1067     return -1;
1068   }
1069   Iter r2(*this);
1070   for (Uint32 i = 0; i < m_cnt; i++)
1071   {
1072     if (desc(r2) == -1)
1073       return -1;
1074   }
1075   if (r.m_itemPos != r2.m_itemPos)
1076   {
1077     set_error(ValidationError, __LINE__);
1078     return -1;
1079   }
1080   if (r.m_cnt != r2.m_cnt)
1081   {
1082     set_error(ValidationError, __LINE__);
1083     return -1;
1084   }
1085   if (r.m_nullCnt != r2.m_nullCnt)
1086   {
1087     set_error(ValidationError, __LINE__);
1088     return -1;
1089   }
1090   if (r.m_itemLen != r2.m_itemLen)
1091   {
1092     set_error(ValidationError, __LINE__);
1093     return -1;
1094   }
1095   return 0;
1096 }
1097 
1098 int
validate() const1099 NdbPack::BoundC::validate() const
1100 {
1101   if (m_data.validate() == -1)
1102   {
1103     set_error(m_data);
1104     return -1;
1105   }
1106   if (m_data.m_cnt == 0 && m_side != 0)
1107   {
1108     set_error(ValidationError, __LINE__);
1109     return -1;
1110   }
1111   if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
1112   {
1113     set_error(ValidationError, __LINE__);
1114     return -1;
1115   }
1116   return 0;
1117 }
1118 
1119 int
validate() const1120 NdbPack::Bound::validate() const
1121 {
1122   if (BoundC::validate() == -1)
1123     return -1;
1124   if (m_data.validate() == -1)
1125   {
1126     set_error(m_data);
1127     return -1;
1128   }
1129   return 0;
1130 }
1131 
1132 #ifdef TEST_NDB_PACK
1133 #include <util/NdbTap.hpp>
1134 
1135 #define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
1136 
1137 #define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
1138 
1139 #define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
1140 #define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
1141 #define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
1142 #define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
1143 
1144 #define xmin(a, b) ((a) < (b) ? (a) : (b))
1145 
1146 #include <ndb_rand.h>
1147 
1148 static uint // random 0..n-1
getrandom(uint n)1149 getrandom(uint n)
1150 {
1151   if (n != 0) {
1152     uint k = ndb_rand();
1153     return k % n;
1154   }
1155   return 0;
1156 }
1157 
1158 static uint // random 0..n-1 biased exponentially to smaller
getrandom(uint n,uint bias)1159 getrandom(uint n, uint bias)
1160 {
1161   assert(bias != 0);
1162   uint k = getrandom(n);
1163   bias--;
1164   while (bias != 0) {
1165     k = getrandom(k + 1);
1166     bias--;
1167   }
1168   return k;
1169 }
1170 
1171 static bool
getrandompct(uint pct)1172 getrandompct(uint pct)
1173 {
1174   return getrandom(100) < pct;
1175 }
1176 
1177 // change in TAPTEST
1178 static int seed = -1; // random
1179 static int loops = 0;
1180 static int spec_cnt = -1; // random
1181 static int fix_type = 0; // all types
1182 static int no_nullable = 0;
1183 static int data_cnt = -1; // Max
1184 static int bound_cnt = -1; // Max
1185 static int verbose = 0;
1186 
1187 struct Tspec {
1188   enum { Max = 100 };
1189   enum { MaxBuf = Max * 4000 };
1190   NdbPack::Spec m_spec;
1191   NdbPack::Type m_type[Max];
TspecTspec1192   Tspec() {
1193     m_spec.set_buf(m_type, Max);
1194   }
1195   void create();
1196 };
1197 
1198 static NdbOut&
operator <<(NdbOut & out,const Tspec & tspec)1199 operator<<(NdbOut& out, const Tspec& tspec)
1200 {
1201   out << tspec.m_spec;
1202   return out;
1203 }
1204 
1205 void
create()1206 Tspec::create()
1207 {
1208   m_spec.reset();
1209   int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
1210   int i = 0;
1211   while (i < cnt) {
1212     int typeId = fix_type;
1213     if (typeId == 0)
1214       typeId = getrandom(g_ndb_pack_type_info_cnt);
1215     const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1216     switch (typeId) {
1217     case NDB_TYPE_INT:
1218     case NDB_TYPE_UNSIGNED:
1219     case NDB_TYPE_CHAR:
1220     case NDB_TYPE_VARCHAR:
1221     case NDB_TYPE_LONGVARCHAR:
1222       break;
1223     default:
1224       continue;
1225     }
1226     require(info.m_supported);
1227     int byteSize = 0;
1228     if (info.m_fixSize != 0)
1229       byteSize = info.m_fixSize;
1230     else if (info.m_arrayType == 0)
1231       byteSize = 1 + getrandom(128, 1);  // char(1-128)
1232     else if (info.m_arrayType == 1)
1233       byteSize = 1 + getrandom(256, 2);  // varchar(0-255)
1234     else if (info.m_arrayType == 2)
1235       byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
1236     else
1237       require(false);
1238     bool nullable = no_nullable ? false : getrandompct(50);
1239     int csNumber = 0;
1240     if (info.m_charType) {
1241       csNumber = 8; // should include ascii
1242     }
1243     NdbPack::Type type(typeId, byteSize, nullable, csNumber);
1244     chk2(m_spec.add(type) == 0, m_spec);
1245     i++;
1246   }
1247   chk2(m_spec.validate() == 0, m_spec);
1248 }
1249 
1250 struct Tdata {
1251   const Tspec& m_tspec;
1252   NdbPack::Data m_data;
1253   const bool m_isBound;
1254   int m_cnt;
1255   Uint8* m_xbuf;        // unpacked
1256   int m_xsize;
1257   int m_xoff[Tspec::Max];
1258   int m_xlen[Tspec::Max];
1259   bool m_xnull[Tspec::Max];
1260   int m_xnulls;
1261   Uint32* m_poaiBuf;    // plain old attr info
1262   int m_poaiSize;
1263   Uint8* m_packBuf;     // packed
1264   int m_packLen;
TdataTdata1265   Tdata(Tspec& tspec, bool isBound, uint varBytes) :
1266     m_tspec(tspec),
1267     m_data(tspec.m_spec, isBound, varBytes),
1268     m_isBound(isBound)
1269   {
1270     m_cnt = tspec.m_spec.get_cnt();
1271     m_xbuf = 0;
1272     m_poaiBuf = 0;
1273     m_packBuf = 0;
1274   }
~TdataTdata1275   ~Tdata() {
1276     delete [] m_xbuf;
1277     delete [] m_poaiBuf;
1278     delete [] m_packBuf;
1279   }
1280   void create();
1281   void add();
1282   void finalize();
1283   // compare using unpacked data
1284   int xcmp(const Tdata& tdata2, int* num_eq) const;
1285 };
1286 
1287 static NdbOut&
operator <<(NdbOut & out,const Tdata & tdata)1288 operator<<(NdbOut& out, const Tdata& tdata)
1289 {
1290   out << tdata.m_data;
1291   return out;
1292 }
1293 
1294 void
create()1295 Tdata::create()
1296 {
1297   union {
1298     Uint8 xbuf[Tspec::MaxBuf];
1299     Uint64 xbuf_align;
1300   };
1301   memset(xbuf, 0x3f, sizeof(xbuf));
1302   m_xsize = 0;
1303   m_xnulls = 0;
1304   Uint32 poaiBuf[Tspec::MaxBuf / 4];
1305   memset(poaiBuf, 0x5f, sizeof(poaiBuf));
1306   m_poaiSize = 0;
1307   m_packLen = m_data.get_var_bytes();
1308   m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
1309   int i = 0, j;
1310   while (i < m_cnt) {
1311     const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
1312     const int typeId = type.get_type_id();
1313     const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1314     m_xnull[i] = type.get_nullable() && getrandompct(25);
1315     m_xnull[i] = false;
1316     if (type.get_nullable() || m_isBound)
1317       m_xnull[i] = getrandompct(20);
1318     int pad = 0; // null-char pad not counted in xlen
1319     if (!m_xnull[i]) {
1320       m_xoff[i] = m_xsize;
1321       Uint8* xptr = &xbuf[m_xsize];
1322       switch (typeId) {
1323       case NDB_TYPE_INT:
1324         {
1325           Int32 x = getrandom(10);
1326           if (getrandompct(50))
1327             x = (-1) * x;
1328           memcpy(xptr, &x, 4);
1329           m_xlen[i] = info.m_fixSize;
1330         }
1331         break;
1332       case NDB_TYPE_UNSIGNED:
1333         {
1334           Uint32 x = getrandom(10);
1335           memcpy(xptr, &x, 4);
1336           m_xlen[i] = info.m_fixSize;
1337         }
1338         break;
1339       case NDB_TYPE_CHAR:
1340         {
1341           require(type.get_byte_size() >= 1);
1342           int max_len = type.get_byte_size();
1343           int len = getrandom(max_len + 1, 1);
1344           for (j = 0; j < len; j++)
1345           {
1346             xptr[j] = 'a' + getrandom(3);
1347           }
1348           for (j = len; j < max_len; j++)
1349           {
1350             xptr[j] = 0x20;
1351           }
1352           m_xlen[i] = max_len;
1353           xptr[max_len] = 0;
1354           pad = 1;
1355         }
1356         break;
1357       case NDB_TYPE_VARCHAR:
1358         {
1359           require(type.get_byte_size() >= 1);
1360           int max_len = type.get_byte_size() - 1;
1361           int len = getrandom(max_len, 2);
1362           require(len < 256);
1363           xptr[0] = len;
1364           for (j = 0; j < len; j++)
1365           {
1366             xptr[1 + j] = 'a' + getrandom(3);
1367           }
1368           m_xlen[i] = 1 + len;
1369           xptr[1 + len] = 0;
1370           pad = 1;
1371         }
1372         break;
1373       case NDB_TYPE_LONGVARCHAR:
1374         {
1375           require(type.get_byte_size() >= 2);
1376           int max_len = type.get_byte_size() - 2;
1377           int len = getrandom(max_len, 3);
1378           require(len < 256 * 256);
1379           xptr[0] = (len & 0xFF);
1380           xptr[1] = (len >> 8);
1381           for (j = 0; j < len; j++)
1382           {
1383             xptr[2 + j] = 'a' + getrandom(3);
1384           }
1385           m_xlen[i] = 2 + len;
1386           xptr[2 + len] = 0;
1387           pad = 1;
1388         }
1389         break;
1390       default:
1391         require(false);
1392         break;
1393       }
1394       m_xsize += m_xlen[i] + pad;
1395       while (m_xsize % 8 != 0)
1396         m_xsize++;
1397       m_packLen += m_xlen[i];
1398     } else {
1399       m_xoff[i] = -1;
1400       m_xlen[i] = 0;
1401       m_xnulls++;
1402     }
1403     require(m_xnull[i] == (m_xoff[i] == -1));
1404     require(m_xnull[i] == (m_xlen[i] == 0));
1405     AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
1406     ah->setAttributeId(i); // not used
1407     ah->setByteSize(m_xlen[i]);
1408     m_poaiSize++;
1409     if (!m_xnull[i]) {
1410       memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
1411       m_poaiSize += (m_xlen[i] + 3) / 4;
1412     }
1413     i++;
1414   }
1415   require(m_xsize % 8 == 0);
1416   m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
1417   memcpy(m_xbuf, xbuf, m_xsize);
1418   m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
1419   memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
1420 }
1421 
1422 void
add()1423 Tdata::add()
1424 {
1425   m_packBuf = new Uint8 [m_packLen];
1426   m_data.set_buf(m_packBuf, m_packLen);
1427   int i, j;
1428   j = 0;
1429   while (j <= 1) {
1430     if (j == 1)
1431       m_data.reset();
1432     i = 0;
1433     while (i < m_cnt) {
1434       Uint32 xlen = ~(Uint32)0;
1435       if (!m_xnull[i]) {
1436         int xoff = m_xoff[i];
1437         const Uint8* xptr = &m_xbuf[xoff];
1438         chk2(m_data.add(xptr, &xlen) == 0, m_data);
1439         chk1((int)xlen == m_xlen[i]);
1440       } else {
1441         chk2(m_data.add_null(&xlen) == 0, m_data);
1442         chk1(xlen == 0);
1443       }
1444       i++;
1445     }
1446     chk2(m_data.validate() == 0, m_data);
1447     chk1((int)m_data.get_null_cnt() == m_xnulls);
1448     j++;
1449   }
1450 }
1451 
1452 void
finalize()1453 Tdata::finalize()
1454 {
1455   chk2(m_data.finalize() == 0, m_data);
1456   ll3("create: " << m_data);
1457   chk1((int)m_data.get_full_len() == m_packLen);
1458   {
1459     const Uint8* p = (const Uint8*)m_data.get_full_buf();
1460     chk1(p[0] + (p[1] << 8) == m_packLen - 2);
1461   }
1462 }
1463 
1464 int
xcmp(const Tdata & tdata2,int * num_eq) const1465 Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
1466 {
1467   const Tdata& tdata1 = *this;
1468   require(&tdata1.m_tspec == &tdata2.m_tspec);
1469   const Tspec& tspec = tdata1.m_tspec;
1470   int res = 0;
1471   int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
1472   int i;
1473   for (i = 0; i < cnt; i++) {
1474     if (!tdata1.m_xnull[i]) {
1475       if (!tdata2.m_xnull[i]) {
1476         // the pointers are Uint64-aligned
1477         const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
1478         const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
1479         const int xlen1 = tdata1.m_xlen[i];
1480         const int xlen2 = tdata2.m_xlen[i];
1481         const NdbPack::Type& type = tspec.m_spec.get_type(i);
1482         const int typeId = type.get_type_id();
1483         const int csNumber = type.get_cs_number();
1484         CHARSET_INFO* cs = all_charsets[csNumber];
1485         switch (typeId) {
1486         case NDB_TYPE_INT:
1487           {
1488             require(cs == 0);
1489             Int32 x1 = *(const Int32*)xptr1;
1490             Int32 x2 = *(const Int32*)xptr2;
1491             if (x1 < x2)
1492               res = -1;
1493             else if (x1 > x2)
1494               res = +1;
1495             ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1496           }
1497           break;
1498         case NDB_TYPE_UNSIGNED:
1499           {
1500             require(cs == 0);
1501             Uint32 x1 = *(const Uint32*)xptr1;
1502             Uint32 x2 = *(const Uint32*)xptr2;
1503             if (x1 < x2)
1504               res = -1;
1505             else if (x1 > x2)
1506               res = +1;
1507             ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1508           }
1509           break;
1510         case NDB_TYPE_CHAR:
1511           {
1512             require(cs != 0 && cs->coll != 0);
1513             const uint n1 = xlen1;
1514             const uint n2 = xlen2;
1515             const uchar* t1 = &xptr1[0];
1516             const uchar* t2 = &xptr2[0];
1517             const char* s1 = (const char*)t1;
1518             const char* s2 = (const char*)t2;
1519             chk1(n1 == strlen(s1));
1520             chk1(n2 == strlen(s2));
1521             res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1522             ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1523           }
1524           break;
1525         case NDB_TYPE_VARCHAR:
1526           {
1527             require(cs != 0 && cs->coll != 0);
1528             const uint n1 = xptr1[0];
1529             const uint n2 = xptr2[0];
1530             const uchar* t1 = &xptr1[1];
1531             const uchar* t2 = &xptr2[1];
1532             const char* s1 = (const char*)t1;
1533             const char* s2 = (const char*)t2;
1534             chk1(n1 == strlen(s1));
1535             chk1(n2 == strlen(s2));
1536             res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1537             ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1538           }
1539           break;
1540         case NDB_TYPE_LONGVARCHAR:
1541           {
1542             require(cs != 0 && cs->coll != 0);
1543             const uint n1 = xptr1[0] | (xptr1[1] << 8);
1544             const uint n2 = xptr2[0] | (xptr2[1] << 8);
1545             const uchar* t1 = &xptr1[2];
1546             const uchar* t2 = &xptr2[2];
1547             const char* s1 = (const char*)t1;
1548             const char* s2 = (const char*)t2;
1549             chk1(n1 == strlen(s1));
1550             chk1(n2 == strlen(s2));
1551             res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1552             ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1553           }
1554           break;
1555         default:
1556           require(false);
1557           break;
1558         }
1559       } else
1560         res = +1;
1561     } else if (!tdata2.m_xnull[i])
1562       res = -1;
1563     if (res != 0)
1564       break;
1565   }
1566   *num_eq = i;
1567   ll3("xcmp res:" << res << " num_eq:" << *num_eq);
1568   return res;
1569 }
1570 
1571 struct Tbound {
1572   Tdata& m_tdata;
1573   NdbPack::Bound m_bound;
TboundTbound1574   Tbound(Tdata& tdata) :
1575     m_tdata(tdata),
1576     m_bound(tdata.m_data)
1577   {
1578     m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
1579   }
1580   void create();
1581   void add();
1582   void finalize();
1583   int xcmp(const Tdata& tdata2, int* num_eq) const;
1584   int xcmp(const Tbound& tbound2, int* num_eq) const;
1585 };
1586 
1587 static NdbOut&
operator <<(NdbOut & out,const Tbound & tbound)1588 operator<<(NdbOut& out, const Tbound& tbound)
1589 {
1590   out << tbound.m_bound;
1591   return out;
1592 }
1593 
1594 void
create()1595 Tbound::create()
1596 {
1597   m_tdata.create();
1598 }
1599 
1600 void
add()1601 Tbound::add()
1602 {
1603   m_tdata.add();
1604 }
1605 
1606 void
finalize()1607 Tbound::finalize()
1608 {
1609   int side = getrandompct(50) ? -1 : +1;
1610   chk2(m_bound.finalize(side) == 0, m_bound);
1611   chk2(m_bound.validate() == 0, m_bound);
1612   chk1((int)m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
1613 }
1614 
1615 int
xcmp(const Tdata & tdata2,int * num_eq) const1616 Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
1617 {
1618   const Tbound& tbound1 = *this;
1619   const Tdata& tdata1 = tbound1.m_tdata;
1620   require(tdata1.m_cnt <= tdata2.m_cnt);
1621   *num_eq = -1;
1622   int res = tdata1.xcmp(tdata2, num_eq);
1623   if (res == 0) {
1624     chk1(*num_eq == tdata1.m_cnt);
1625     res = m_bound.get_side();
1626   }
1627   return res;
1628 }
1629 
1630 int
xcmp(const Tbound & tbound2,int * num_eq) const1631 Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
1632 {
1633   const Tbound& tbound1 = *this;
1634   const Tdata& tdata1 = tbound1.m_tdata;
1635   const Tdata& tdata2 = tbound2.m_tdata;
1636   *num_eq = -1;
1637   int res = tdata1.xcmp(tdata2, num_eq);
1638   chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
1639   if (res == 0) {
1640     chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
1641     if (tdata1.m_cnt < tdata2.m_cnt)
1642       res = (+1) * tbound1.m_bound.get_side();
1643     else if (tdata1.m_cnt > tdata2.m_cnt)
1644       res = (-1) * tbound2.m_bound.get_side();
1645     else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
1646       res = -1;
1647     else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
1648       res = +1;
1649   }
1650   return res;
1651 }
1652 
1653 struct Tdatalist {
1654   enum { Max = 1000 };
1655   Tdata* m_tdata[Max];
1656   int m_cnt;
TdatalistTdatalist1657   Tdatalist(Tspec& tspec) {
1658     m_cnt = data_cnt == -1 ? Max : data_cnt;
1659     int i;
1660     for (i = 0; i < m_cnt; i++) {
1661       m_tdata[i] = new Tdata(tspec, false, 2);
1662     }
1663   }
~TdatalistTdatalist1664   ~Tdatalist() {
1665     int i;
1666     for (i = 0; i < m_cnt; i++) {
1667       delete m_tdata[i];
1668     }
1669   }
1670   void create();
1671   void sort();
1672 };
1673 
1674 static NdbOut&
operator <<(NdbOut & out,const Tdatalist & tdatalist)1675 operator<<(NdbOut& out, const Tdatalist& tdatalist)
1676 {
1677   int i;
1678   for (i = 0; i < tdatalist.m_cnt; i++) {
1679     out << "data " << i << ": " << *tdatalist.m_tdata[i];
1680     if (i + 1 < tdatalist.m_cnt)
1681       out << endl;
1682   }
1683   return out;
1684 }
1685 
1686 void
create()1687 Tdatalist::create()
1688 {
1689   int i;
1690   for (i = 0; i < m_cnt; i++) {
1691     Tdata& tdata = *m_tdata[i];
1692     tdata.create();
1693     tdata.add();
1694     tdata.finalize();
1695   }
1696 }
1697 
1698 static int
data_cmp(const void * a1,const void * a2)1699 data_cmp(const void* a1, const void* a2)
1700 {
1701   const Tdata& tdata1 = **(const Tdata**)a1;
1702   const Tdata& tdata2 = **(const Tdata**)a2;
1703   require(tdata1.m_cnt == tdata2.m_cnt);
1704   const Uint32 cnt = tdata1.m_cnt;
1705   Uint32 num_eq = ~(Uint32)0;
1706   int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
1707   require(num_eq <= (Uint32)tdata1.m_cnt);
1708   require(num_eq <= (Uint32)tdata2.m_cnt);
1709   return res;
1710 }
1711 
1712 void
sort()1713 Tdatalist::sort()
1714 {
1715   ll1("data sort: in");
1716   ll3(endl << *this);
1717   qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
1718   ll1("data sort: out");
1719   ll3(endl << *this);
1720   int i;
1721   for (i = 0; i + 1 < m_cnt; i++) {
1722     const Tdata& tdata1 = *m_tdata[i];
1723     const Tdata& tdata2 = *m_tdata[i + 1];
1724     require(tdata1.m_cnt == tdata2.m_cnt);
1725     const Uint32 cnt = tdata1.m_cnt;
1726     Uint32 num_eq1 = ~(Uint32)0;
1727     int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
1728     chk1(res <= 0);
1729     // also via unpacked data
1730     int num_eq2 = -1;
1731     int res2 = tdata1.xcmp(tdata2, &num_eq2);
1732     if (res < 0)
1733       chk1(res2 < 0);
1734     else if (res == 0)
1735       chk1(res2 == 0);
1736     else
1737       chk1(res2 > 0);
1738     chk1(num_eq1 == (Uint32)num_eq2);
1739   }
1740 }
1741 
1742 struct Tboundlist {
1743   enum { Max = 1000 };
1744   Tbound* m_tbound[Max];
1745   int m_cnt;
TboundlistTboundlist1746   Tboundlist(Tspec& tspec) {
1747     m_cnt = bound_cnt == -1 ? Max : bound_cnt;
1748     int i;
1749     for (i = 0; i < m_cnt; i++) {
1750       Tdata* tdata = new Tdata(tspec, true, 0);
1751       m_tbound[i] = new Tbound(*tdata);
1752     }
1753   }
~TboundlistTboundlist1754   ~Tboundlist() {
1755     int i;
1756     for (i = 0; i < m_cnt; i++) {
1757       Tdata* tdata = &m_tbound[i]->m_tdata;
1758       delete m_tbound[i];
1759       delete tdata;
1760     }
1761   }
1762   void create();
1763   void sort();
1764 };
1765 
1766 static NdbOut&
operator <<(NdbOut & out,const Tboundlist & tboundlist)1767 operator<<(NdbOut& out, const Tboundlist& tboundlist)
1768 {
1769   int i;
1770   for (i = 0; i < tboundlist.m_cnt; i++) {
1771     out << "bound " << i << ": " << *tboundlist.m_tbound[i];
1772     if (i + 1 < tboundlist.m_cnt)
1773       out << endl;
1774   }
1775   return out;
1776 }
1777 
1778 void
create()1779 Tboundlist::create()
1780 {
1781   int i;
1782   for (i = 0; i < m_cnt; i++) {
1783     Tbound& tbound = *m_tbound[i];
1784     tbound.create();
1785     tbound.add();
1786     tbound.finalize();
1787   }
1788 }
1789 
1790 static int
bound_cmp(const void * a1,const void * a2)1791 bound_cmp(const void* a1, const void* a2)
1792 {
1793   const Tbound& tbound1 = **(const Tbound**)a1;
1794   const Tbound& tbound2 = **(const Tbound**)a2;
1795   const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1796   Uint32 num_eq = ~(Uint32)0;
1797   int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
1798   require(num_eq <= cnt);
1799   require(num_eq <= cnt);
1800   return res;
1801 }
1802 
1803 void
sort()1804 Tboundlist::sort()
1805 {
1806   ll1("bound sort: in");
1807   ll3(endl << *this);
1808   qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
1809   ll1("bound sort: out");
1810   ll3(endl << *this);
1811   int i;
1812   for (i = 0; i + 1 < m_cnt; i++) {
1813     const Tbound& tbound1 = *m_tbound[i];
1814     const Tbound& tbound2 = *m_tbound[i + 1];
1815     const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1816     Uint32 num_eq1 = ~(Uint32)0;
1817     int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
1818     chk1(res <= 0);
1819     // also via unpacked data
1820     int num_eq2 = -1;
1821     int res2 = tbound1.xcmp(tbound2, &num_eq2);
1822     if (res < 0)
1823       chk1(res2 < 0);
1824     else if (res == 0)
1825       chk1(res2 == 0);
1826     else
1827       chk1(res2 > 0);
1828     chk1(num_eq1 == (Uint32)num_eq2);
1829   }
1830 }
1831 
1832 static void
testdesc(const Tdata & tdata)1833 testdesc(const Tdata& tdata)
1834 {
1835   ll3("testdesc: " << tdata);
1836   const Tspec& tspec = tdata.m_tspec;
1837   const NdbPack::Data& data = tdata.m_data;
1838   const Uint8* buf_old = (const Uint8*)data.get_full_buf();
1839   const Uint32 varBytes = data.get_var_bytes();
1840   const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
1841   const Uint32 dataLen = data.get_data_len();
1842   const Uint32 fullLen = data.get_full_len();
1843   const Uint32 cnt = data.get_cnt();
1844   chk1(fullLen == varBytes + dataLen);
1845   NdbPack::Data data_new(tspec.m_spec, false, varBytes);
1846   Uint8 buf_new[Tspec::MaxBuf];
1847   data_new.set_buf(buf_new, sizeof(buf_new));
1848   memcpy(buf_new, buf_old, fullLen);
1849   chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
1850   chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
1851   chk1(data_new.get_data_len() == data.get_data_len());
1852   chk1(data_new.get_cnt() == data.get_cnt());
1853   chk1(data_new.get_null_cnt() == data.get_null_cnt());
1854 }
1855 
1856 static void
testcopy(const Tdata & tdata)1857 testcopy(const Tdata& tdata)
1858 {
1859   ll3("testcopy: " << tdata);
1860   const Tspec& tspec = tdata.m_tspec;
1861   const NdbPack::Data& data = tdata.m_data;
1862   uint n = getrandom(tdata.m_cnt + 1);
1863   do {
1864     ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
1865     NdbPack::DataC data_old(tspec.m_spec, false);
1866     data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
1867     chk1(data_old.get_cnt() == n);
1868     NdbPack::Data data_new(tspec.m_spec, false, 0);
1869     Uint8 buf_new[Tspec::MaxBuf];
1870     data_new.set_buf(buf_new, sizeof(buf_new));
1871     chk2(data_new.copy(data_old) == 0, data_new);
1872     chk1(data_new.get_cnt() == n);
1873     Uint32 num_eq1 = ~(Uint32)0;
1874     chk1(data_new.cmp(data_old, n, num_eq1) == 0);
1875     chk1(num_eq1 == n);
1876     Uint32 num_eq2 = ~(Uint32)0;
1877     chk1(data_old.cmp(data_new, n, num_eq2) == 0);
1878     chk1(num_eq2 == n);
1879     n = getrandom(n);
1880   } while (n != 0);
1881 }
1882 
1883 static void
testpoai(const Tdata & tdata)1884 testpoai(const Tdata& tdata)
1885 {
1886   ll3("testpoai: " << tdata);
1887   const Tspec& tspec = tdata.m_tspec;
1888   const NdbPack::Data& data = tdata.m_data;
1889   NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
1890   Uint8 buf_new[Tspec::MaxBuf];
1891   data_new.set_buf(buf_new, sizeof(buf_new));
1892   Uint32 poaiLen = ~(Uint32)0;
1893   chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
1894   chk2(data_new.finalize() == 0, data_new);
1895   chk2(data_new.validate() == 0, data_new);
1896   chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
1897   chk1(data_new.get_full_len() == data.get_full_len());
1898   chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
1899   chk1(data_new.get_null_cnt() == data.get_null_cnt());
1900 }
1901 
1902 static void
testconvert(const Tdata & tdata)1903 testconvert(const Tdata& tdata)
1904 {
1905   ll3("testconvert: " << tdata);
1906   const Tspec& tspec = tdata.m_tspec;
1907   const NdbPack::Data& data = tdata.m_data;
1908   NdbPack::Data data_new(tspec.m_spec, false, 2);
1909   Uint8 buf_new[Tspec::MaxBuf];
1910   data_new.set_buf(buf_new, sizeof(buf_new));
1911   chk2(data_new.copy(data) == 0, data_new);
1912   require(tdata.m_cnt == (int)data.get_cnt());
1913   require(data.get_cnt() == data_new.get_cnt());
1914   const Uint32 cnt = tdata.m_cnt;
1915   Uint32 num_eq;
1916   int i;
1917   for (i = 0; i < 10; i++) {
1918     int k = getrandom(3); // assumes Endian::Value 0,1,2
1919     NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
1920     chk2(data_new.convert(v) == 0, data_new);
1921     if (v == NdbPack::Endian::Native ||
1922         v == NdbPack::Endian::get_endian()) {
1923       num_eq = ~(Uint32)0;
1924       chk1(data.cmp(data_new, cnt, num_eq) == 0);
1925       require(num_eq == cnt);
1926     }
1927   }
1928 }
1929 
1930 static void
testdata(const Tdatalist & tdatalist)1931 testdata(const Tdatalist& tdatalist)
1932 {
1933   int i;
1934   for (i = 0; i < tdatalist.m_cnt; i++) {
1935     const Tdata& tdata = *tdatalist.m_tdata[i];
1936     testdesc(tdata);
1937     testcopy(tdata);
1938     testpoai(tdata);
1939     testconvert(tdata);
1940   }
1941 }
1942 
1943 static void
testcmp(const Tbound & tbound,const Tdatalist & tdatalist,int * kb)1944 testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
1945 {
1946   ll3("testcmp: " << tbound);
1947   int oldres = 0;
1948   int n1 = 0;
1949   int n2 = 0;
1950   int i;
1951   for (i = 0; i < tdatalist.m_cnt; i++) {
1952     const Tdata& tdata = *tdatalist.m_tdata[i];
1953     require(tbound.m_tdata.m_cnt == (int)tbound.m_bound.get_data().get_cnt());
1954     const Uint32 cnt = tbound.m_tdata.m_cnt;
1955     Uint32 num_eq1 = ~(Uint32)0;
1956     // reverse result for key vs bound
1957     int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
1958     chk1(res != 0);
1959     res = (res < 0 ? (n1++, -1) : (n2++, +1));
1960     if (i > 0) {
1961       // at some point flips from -1 to +1
1962       chk1(oldres <= res);
1963     }
1964     oldres = res;
1965     // also via unpacked data
1966     int num_eq2 = -1;
1967     int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
1968     if (res < 0)
1969       chk1(res2 < 0);
1970     else
1971       chk1(res2 > 0);
1972     chk1(num_eq1 == (Uint32)num_eq2);
1973   }
1974   require(n1 + n2 == tdatalist.m_cnt);
1975   ll2("keys before:" << n1 << " after:" << n2);
1976   *kb = n1;
1977 }
1978 
1979 static void
testcmp(const Tboundlist & tboundlist,const Tdatalist & tdatalist)1980 testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
1981 {
1982   int i;
1983   int oldkb = 0;
1984   for (i = 0; i < tboundlist.m_cnt; i++) {
1985     const Tbound& tbound = *tboundlist.m_tbound[i];
1986     int kb = 0;
1987     testcmp(tbound, tdatalist, &kb);
1988     if (i > 0) {
1989       chk1(oldkb <= kb);
1990     }
1991     oldkb = kb;
1992   }
1993 }
1994 
1995 static void
testrun()1996 testrun()
1997 {
1998   Tspec tspec;
1999   tspec.create();
2000   ll1("spec: " << tspec);
2001   Tdatalist tdatalist(tspec);
2002   tdatalist.create();
2003   tdatalist.sort();
2004   testdata(tdatalist);
2005   if (bound_cnt != 0) {
2006     Tboundlist tboundlist(tspec);
2007     tboundlist.create();
2008     tboundlist.sort();
2009     testcmp(tboundlist, tdatalist);
2010   }
2011 }
2012 
2013 extern void NdbOut_Init();
2014 
2015 static int
testmain()2016 testmain()
2017 {
2018   my_init();
2019   NdbOut_Init();
2020   signal(SIGABRT, SIG_DFL);
2021   { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
2022     if (p != 0)
2023       verbose = atoi(p);
2024   }
2025   if (seed == 0)
2026     ll0("random seed: loop number");
2027   else {
2028     if (seed < 0)
2029       seed = getpid();
2030     ll0("random seed: " << seed);
2031     ndb_srand(seed);
2032   }
2033   loops = 100;
2034   int i;
2035   for (i = 0; loops == 0 || i < loops; i++) {
2036     ll0("loop:" << i << "/" << loops);
2037     if (seed == 0)
2038       ndb_srand(i);
2039     testrun();
2040   }
2041   // do not print "ok" in TAPTEST
2042   ndbout << "passed" << endl;
2043   return 0;
2044 }
2045 
TAPTEST(NdbPack)2046 TAPTEST(NdbPack)
2047 {
2048   int ret = testmain();
2049   return (ret == 0);
2050 }
2051 
2052 #endif
2053