1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26 #include <NdbPack.hpp>
27 #include <NdbOut.hpp>
28 #include <NdbEnv.h>
29
30 // NdbPack::Error
31
32 int
get_error_code() const33 NdbPack::Error::get_error_code() const
34 {
35 return m_error_code;
36 }
37
38 int
get_error_line() const39 NdbPack::Error::get_error_line() const
40 {
41 return m_error_line;
42 }
43
44 void
set_error(int code,int line) const45 NdbPack::Error::set_error(int code, int line) const
46 {
47 m_error_code = code;
48 m_error_line = line;
49 #ifdef VM_TRACE
50 #ifdef NDB_USE_GET_ENV
51 const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
52 if (p != 0 && strchr("1Y", p[0]) != 0)
53 require(false);
54 #endif
55 #endif
56 }
57
58 void
set_error(const Error & e2) const59 NdbPack::Error::set_error(const Error& e2) const
60 {
61 set_error(e2.m_error_code, e2.m_error_line);
62 }
63
64 // NdbPack::Endian
65
66 void
convert(void * ptr,Uint32 len)67 NdbPack::Endian::convert(void* ptr, Uint32 len)
68 {
69 Uint8* p = (Uint8*)ptr;
70 for (Uint32 i = 0; i < len / 2; i++)
71 {
72 Uint32 j = len - i - 1;
73 Uint8 tmp = p[i];
74 p[i] = p[j];
75 p[j] = tmp;
76 }
77 }
78
79 // NdbPack::Type
80
81 struct Ndb_pack_type_info {
82 bool m_supported;
83 Uint16 m_fixSize; // if non-zero must have this exact size
84 Uint16 m_arrayType; // 0,1,2 length bytes
85 bool m_charType; // type with character set
86 bool m_convert; // convert endian (reverse byte order)
87 };
88
89 static const Ndb_pack_type_info
90 g_ndb_pack_type_info[] = {
91 { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
92 { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
93 { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
94 { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
95 { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
96 { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
97 { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
98 { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
99 { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
100 { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
101 { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
102 { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
103 { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
104 { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
105 { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
106 { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
107 { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
108 { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
109 { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
110 { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
111 { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
112 { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
113 { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
114 { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
115 { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
116 { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
117 { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
118 { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
119 { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
120 { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
121 { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMALUNSIGNED
122 /*
123 * Fractional time types are varsized.
124 * There is no size validation yet.
125 */
126 { 1, 0, 0, 0, 0 }, // NDB_TYPE_TIME2 (3+(0-3) bytes)
127 { 1, 0, 0, 0, 0 }, // NDB_TYPE_DATETIME2 (5+(0-3) bytes)
128 { 1, 0, 0, 0, 0 } // NDB_TYPE_TIMESTAMP2 (4+(0-3) bytes)
129 };
130
131 static const int g_ndb_pack_type_info_cnt =
132 sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
133
134 int
complete()135 NdbPack::Type::complete()
136 {
137 if (m_typeId == 0)
138 {
139 set_error(TypeNotSet, __LINE__);
140 return -1;
141 }
142 if (m_typeId >= g_ndb_pack_type_info_cnt)
143 {
144 set_error(TypeNotSet, __LINE__);
145 return -1;
146 }
147 const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
148 if (!info.m_supported)
149 {
150 set_error(TypeNotSupported, __LINE__);
151 return -1;
152 }
153 if (m_byteSize == 0)
154 {
155 set_error(TypeSizeZero, __LINE__);
156 return -1;
157 }
158 if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
159 {
160 set_error(TypeFixSizeInvalid, __LINE__);
161 return -1;
162 }
163 if (!(m_nullable <= 1))
164 {
165 set_error(TypeNullableNotBool, __LINE__);
166 return -1;
167 }
168 if (info.m_charType && m_csNumber == 0)
169 {
170 set_error(CharsetNotSpecified, __LINE__);
171 return -1;
172 }
173 if (info.m_charType && all_charsets[m_csNumber] == 0)
174 {
175 CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
176 if (cs == 0)
177 {
178 set_error(CharsetNotFound, __LINE__);
179 return -1;
180 }
181 all_charsets[m_csNumber] = cs; // yes caller must do this
182 }
183 if (!info.m_charType && m_csNumber != 0)
184 {
185 set_error(CharsetNotAllowed, __LINE__);
186 return -1;
187 }
188 m_arrayType = info.m_arrayType;
189 return 0;
190 }
191
192 // NdbPack::Spec
193
194 int
add(Type type)195 NdbPack::Spec::add(Type type)
196 {
197 Uint32 cnt = m_cnt;
198 Uint32 nullable_cnt = m_nullableCnt;
199 Uint32 varsize_cnt = m_varsizeCnt;
200 Uint32 max_byte_size = m_maxByteSize;
201 if (type.complete() == -1)
202 {
203 set_error(type);
204 return -1;
205 }
206 type.m_nullbitPos = 0xFFFF;
207 if (type.m_nullable)
208 {
209 type.m_nullbitPos = nullable_cnt;
210 nullable_cnt++;
211 }
212 if (type.m_arrayType != 0)
213 {
214 varsize_cnt++;
215 }
216 max_byte_size += type.m_byteSize;
217 if (cnt >= m_bufMaxCnt)
218 {
219 set_error(SpecBufOverflow, __LINE__);
220 return -1;
221 }
222 m_buf[cnt] = type;
223 cnt++;
224 m_cnt = cnt;
225 m_nullableCnt = nullable_cnt;
226 m_varsizeCnt = varsize_cnt;
227 m_maxByteSize = max_byte_size;
228 return 0;
229 }
230
231 int
add(Type type,Uint32 cnt)232 NdbPack::Spec::add(Type type, Uint32 cnt)
233 {
234 for (Uint32 i = 0; i < cnt; i++)
235 {
236 if (add(type) == -1)
237 return -1;
238 }
239 return 0;
240 }
241
242 void
copy(const Spec & s2)243 NdbPack::Spec::copy(const Spec& s2)
244 {
245 assert(m_bufMaxCnt >= s2.m_cnt);
246 reset();
247 m_cnt = s2.m_cnt;
248 m_nullableCnt = s2.m_nullableCnt;
249 m_varsizeCnt = s2.m_varsizeCnt;
250 m_maxByteSize = s2.m_maxByteSize;
251 for (Uint32 i = 0; i < m_cnt; i++)
252 {
253 m_buf[i] = s2.m_buf[i];
254 }
255 }
256
257 // NdbPack::Iter
258
259 int
desc(const Uint8 * item)260 NdbPack::Iter::desc(const Uint8* item)
261 {
262 const Uint32 i = m_cnt; // item index
263 assert(i < m_spec.m_cnt);
264 const Type& type = m_spec.m_buf[i];
265 const Uint32 lenBytes = type.m_arrayType;
266 Uint32 bareLen = 0;
267 switch (lenBytes) {
268 case 0:
269 bareLen = type.m_byteSize;
270 break;
271 case 1:
272 bareLen = item[0];
273 break;
274 case 2:
275 bareLen = item[0] + (item[1] << 8);
276 break;
277 default:
278 assert(false);
279 set_error(InternalError, __LINE__);
280 return -1;
281 }
282 const Uint32 itemLen = lenBytes + bareLen;
283 if (itemLen > type.m_byteSize)
284 {
285 set_error(DataValueOverflow, __LINE__);
286 return -1;
287 }
288 m_itemPos += m_itemLen; // skip previous item
289 m_cnt++;
290 m_lenBytes = lenBytes;
291 m_bareLen = bareLen;
292 m_itemLen = itemLen;
293 return 0;
294 }
295
296 int
desc_null()297 NdbPack::Iter::desc_null()
298 {
299 assert(m_cnt < m_spec.m_cnt);
300 // caller checks if null allowed
301 m_itemPos += m_itemLen; // skip previous item
302 m_cnt++;
303 m_nullCnt++;
304 m_lenBytes = 0;
305 m_bareLen = 0;
306 m_itemLen = 0;
307 return 0;
308 }
309
310 int
cmp(const Iter & r2,const Uint8 * buf1,const Uint8 * buf2) const311 NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
312 {
313 const Iter& r1 = *this;
314 assert(&r1.m_spec == &r2.m_spec);
315 assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
316 const Uint32 i = r1.m_cnt - 1; // item index
317 int res = 0;
318 const Uint32 n1 = r1.m_itemLen;
319 const Uint32 n2 = r2.m_itemLen;
320 if (n1 != 0)
321 {
322 if (n2 != 0)
323 {
324 const Type& type = r1.m_spec.m_buf[i];
325 const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
326 const Uint8* p1 = &buf1[r1.m_itemPos];
327 const Uint8* p2 = &buf2[r2.m_itemPos];
328 CHARSET_INFO* cs = all_charsets[type.m_csNumber];
329 res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2);
330 }
331 else
332 {
333 res = +1;
334 }
335 }
336 else
337 {
338 if (n2 != 0)
339 res = -1;
340 }
341 return res;
342 }
343
344 // NdbPack::DataC
345
346 int
desc(Iter & r) const347 NdbPack::DataC::desc(Iter& r) const
348 {
349 const Uint32 i = r.m_cnt; // item index
350 assert(i < m_cnt);
351 const Type& type = m_spec.m_buf[i];
352 if (type.m_nullable || m_allNullable)
353 {
354 Uint32 nullbitPos = 0;
355 if (!m_allNullable)
356 nullbitPos = type.m_nullbitPos;
357 else
358 nullbitPos = i;
359 const Uint32 byte_pos = nullbitPos / 8;
360 const Uint32 bit_pos = nullbitPos % 8;
361 const Uint8 bit_mask = (1 << bit_pos);
362 const Uint8& the_byte = m_buf[byte_pos];
363 if ((the_byte & bit_mask) != 0)
364 {
365 if (r.desc_null() == -1)
366 {
367 set_error(r);
368 return -1;
369 }
370 return 0;
371 }
372 }
373 const Uint32 pos = r.m_itemPos + r.m_itemLen;
374 const Uint8* item = &m_buf[pos];
375 if (r.desc(item) == -1)
376 {
377 set_error(r);
378 return -1;
379 }
380 return 0;
381 }
382
383 int
cmp(const DataC & d2,Uint32 cnt,Uint32 & num_eq) const384 NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
385 {
386 const DataC& d1 = *this;
387 assert(cnt <= d1.m_cnt);
388 assert(cnt <= d2.m_cnt);
389 Iter r1(d1);
390 Iter r2(d2);
391 int res = 0;
392 Uint32 i; // remember last
393 for (i = 0; i < cnt; i++)
394 {
395 d1.desc(r1);
396 d2.desc(r2);
397 res = r1.cmp(r2, d1.m_buf, d2.m_buf);
398 if (res != 0)
399 break;
400 }
401 num_eq = i;
402 return res;
403 }
404
405 // NdbPack::Data
406
407 int
add(const void * data,Uint32 * len_out)408 NdbPack::Data::add(const void* data, Uint32* len_out)
409 {
410 assert(data != 0);
411 const Uint8* item = (const Uint8*)data;
412 const Uint32 i = m_cnt; // item index
413 if (i >= m_spec.m_cnt)
414 {
415 set_error(DataCntOverflow, __LINE__);
416 return -1;
417 }
418 Iter& r = m_iter;
419 assert(r.m_cnt == i);
420 const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
421 if (r.desc(item) == -1)
422 {
423 set_error(r);
424 return -1;
425 }
426 if (fullLen + r.m_itemLen > m_bufMaxLen)
427 {
428 set_error(DataBufOverflow, __LINE__);
429 return -1;
430 }
431 memcpy(&m_buf[fullLen], item, r.m_itemLen);
432 *len_out = r.m_itemLen;
433 m_cnt++;
434 return 0;
435 }
436
437 int
add(const void * data,Uint32 cnt,Uint32 * len_out)438 NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
439 {
440 const Uint8* data_ptr = (const Uint8*)data;
441 Uint32 len_tot = 0;
442 for (Uint32 i = 0; i < cnt; i++)
443 {
444 Uint32 len;
445 if (add(data_ptr, &len) == -1)
446 return -1;
447 if (data != 0)
448 data_ptr += len;
449 len_tot += len;
450 }
451 *len_out = len_tot;
452 return 0;
453 }
454
455 int
add_null(Uint32 * len_out)456 NdbPack::Data::add_null(Uint32* len_out)
457 {
458 const Uint32 i = m_cnt; // item index
459 if (i >= m_spec.m_cnt)
460 {
461 set_error(DataCntOverflow, __LINE__);
462 return -1;
463 }
464 Iter& r = m_iter;
465 assert(r.m_cnt == i);
466 if (r.desc_null() == -1)
467 {
468 set_error(r);
469 return -1;
470 }
471 Uint32 nullbitPos = 0;
472 if (!m_allNullable)
473 {
474 const Type& type = m_spec.m_buf[i];
475 if (!type.m_nullable)
476 {
477 set_error(DataNotNullable, __LINE__);
478 return -1;
479 }
480 nullbitPos = type.m_nullbitPos;
481 }
482 else
483 {
484 nullbitPos = i;
485 }
486 const Uint32 byte_pos = nullbitPos / 8;
487 const Uint32 bit_pos = nullbitPos % 8;
488 const Uint8 bit_mask = (1 << bit_pos);
489 Uint8& the_byte = m_buf[m_varBytes + byte_pos];
490 assert((the_byte & bit_mask) == 0);
491 the_byte |= bit_mask;
492 *len_out = r.m_itemLen;
493 m_cnt++;
494 return 0;
495 }
496
497 int
add_null(Uint32 cnt,Uint32 * len_out)498 NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
499 {
500 Uint32 len_tot = 0;
501 for (Uint32 i = 0; i < cnt; i++)
502 {
503 Uint32 len;
504 if (add_null(&len) == -1)
505 return -1;
506 len_tot += len;
507 }
508 *len_out = len_tot;
509 return 0;
510 }
511
512 int
add_poai(const Uint32 * poai,Uint32 * len_out)513 NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
514 {
515 const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
516 if (!ah.isNULL())
517 {
518 if (add(&poai[1], len_out) == -1)
519 return -1;
520 }
521 else
522 {
523 if (add_null(len_out) == -1)
524 return -1;
525 }
526 if (ah.getByteSize() != *len_out)
527 {
528 set_error(InvalidAttrInfo, __LINE__);
529 return -1;
530 }
531 return 0;
532 }
533
534 int
add_poai(const Uint32 * poai,Uint32 cnt,Uint32 * len_out)535 NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
536 {
537 Uint32 len_tot = 0;
538 for (Uint32 i = 0; i < cnt; i++)
539 {
540 Uint32 len;
541 if (add_poai(poai, &len) == -1)
542 return -1;
543 len_tot += len;
544 poai += 1 + (len + 3) / 4;
545 }
546 *len_out = len_tot;
547 return 0;
548 }
549
550 int
finalize_impl()551 NdbPack::Data::finalize_impl()
552 {
553 const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
554 switch (m_varBytes) {
555 // case 0: inlined
556 case 1:
557 if (dataLen <= 0xFF)
558 {
559 m_buf[0] = dataLen;
560 return 0;
561 }
562 break;
563 case 2:
564 if (dataLen <= 0xFFFF)
565 {
566 m_buf[0] = (dataLen & 0xFF);
567 m_buf[1] = (dataLen >> 8);
568 return 0;
569 }
570 break;
571 default:
572 break;
573 }
574 set_error(InternalError, __LINE__);
575 return -1;
576 }
577
578 int
desc_all(Uint32 cnt,Endian::Value from_endian)579 NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
580 {
581 if (from_endian == NdbPack::Endian::Native)
582 from_endian = NdbPack::Endian::get_endian();
583 m_endian = from_endian;
584 assert(m_cnt == 0); // reset() would destroy nullmask
585 for (Uint32 i = 0; i < cnt; i++)
586 {
587 m_cnt++;
588 if (desc(m_iter) == -1)
589 return -1;
590 }
591 if (finalize() == -1)
592 return -1;
593 return 0;
594 }
595
596 int
copy(const DataC & d2)597 NdbPack::Data::copy(const DataC& d2)
598 {
599 reset();
600 Iter r2(d2);
601 const Uint32 cnt2 = d2.m_cnt;
602 for (Uint32 i = 0; i < cnt2; i++)
603 {
604 if (d2.desc(r2) == -1)
605 return -1;
606 Uint32 len_out = ~(Uint32)0;
607 if (r2.m_itemLen != 0)
608 {
609 if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
610 return -1;
611 assert(len_out == r2.m_itemLen);
612 }
613 else
614 {
615 if (add_null(&len_out) == -1)
616 return -1;
617 assert(len_out ==0);
618 }
619 }
620 if (finalize() == -1)
621 return -1;
622 return 0;
623 }
624
625 int
convert_impl(Endian::Value to_endian)626 NdbPack::Data::convert_impl(Endian::Value to_endian)
627 {
628 const Spec& spec = m_spec;
629 Iter r(*this);
630 for (Uint32 i = 0; i < m_cnt; i++)
631 {
632 if (DataC::desc(r) == -1)
633 {
634 set_error(r);
635 return -1;
636 }
637 const Type& type = spec.m_buf[i];
638 const Uint32 typeId = type.m_typeId;
639 const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
640 if (info.m_convert)
641 {
642 Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
643 Uint32 len = r.m_itemLen;
644 Endian::convert(ptr, len);
645 }
646 }
647 return 0;
648 }
649
650 // NdbPack::BoundC
651
652 int
finalize(int side)653 NdbPack::BoundC::finalize(int side)
654 {
655 if (m_data.m_cnt == 0 && side != 0)
656 {
657 set_error(BoundEmptySide, __LINE__);
658 return -1;
659 }
660 if (m_data.m_cnt != 0 && side != -1 && side != +1)
661 {
662 set_error(BoundNonemptySide, __LINE__);
663 return -1;
664 }
665 m_side = side;
666 return 0;
667 }
668
669 int
cmp(const BoundC & b2,Uint32 cnt,Uint32 & num_eq) const670 NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
671 {
672 const BoundC& b1 = *this;
673 const DataC& d1 = b1.m_data;
674 const DataC& d2 = b2.m_data;
675 int res = d1.cmp(d2, cnt, num_eq);
676 if (res == 0)
677 {
678 if (cnt < d1.m_cnt && cnt < d2.m_cnt)
679 ;
680 else if (d1.m_cnt < d2.m_cnt)
681 res = (+1) * b1.m_side;
682 else if (d1.m_cnt > d2.m_cnt)
683 res = (-1) * b2.m_side;
684 else if (b1.m_side < b2.m_side)
685 res = -1;
686 else if (b1.m_side > b2.m_side)
687 res = +1;
688 }
689 return res;
690 }
691
692 // NdbPack::Bound
693
694 // print
695
Print(char * buf,Uint32 bufsz)696 NdbPack::Print::Print(char* buf, Uint32 bufsz) :
697 m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
698
699 void
print(const char * fmt,...)700 NdbPack::Print::print(const char* fmt, ...)
701 {
702 va_list ap;
703 va_start(ap, fmt);
704 if (m_bufsz > m_sz)
705 {
706 BaseString::vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
707 m_sz += (Uint32)strlen(&m_buf[m_sz]);
708 }
709 va_end(ap);
710 }
711
712 // print Type
713
714 NdbOut&
operator <<(NdbOut & out,const NdbPack::Type & a)715 operator<<(NdbOut& out, const NdbPack::Type& a)
716 {
717 a.print(out);
718 return out;
719 }
720
721 void
print(NdbOut & out) const722 NdbPack::Type::print(NdbOut& out) const
723 {
724 char buf[200];
725 out << print(buf, sizeof(buf));
726 }
727
728 const char*
print(char * buf,Uint32 bufsz) const729 NdbPack::Type::print(char* buf, Uint32 bufsz) const
730 {
731 Print p(buf, bufsz);
732 p.print("typeId:%u", m_typeId);
733 p.print(" byteSize:%u", m_byteSize);
734 p.print(" nullable:%u", m_nullable);
735 p.print(" csNumber:%u", m_csNumber);
736 return buf;
737 }
738
739 // print Spec
740
741 NdbOut&
operator <<(NdbOut & out,const NdbPack::Spec & a)742 operator<<(NdbOut& out, const NdbPack::Spec& a)
743 {
744 a.print(out);
745 return out;
746 }
747
748 void
print(NdbOut & out) const749 NdbPack::Spec::print(NdbOut& out) const
750 {
751 char buf[8000];
752 out << print(buf, sizeof(buf));
753 }
754
755 const char*
print(char * buf,Uint32 bufsz) const756 NdbPack::Spec::print(char* buf, Uint32 bufsz) const
757 {
758 Print p(buf, bufsz);
759 p.print("cnt:%u", m_cnt);
760 p.print(" nullableCnt:%u", m_nullableCnt);
761 p.print(" varsizeCnt:%u", m_varsizeCnt);
762 p.print(" nullmaskLen:%u", get_nullmask_len(false));
763 p.print(" maxByteSize:%u", m_maxByteSize);
764 for (Uint32 i = 0; i < m_cnt; i++)
765 {
766 const Type& type = m_buf[i];
767 p.print(" [%u", i);
768 p.print(" typeId:%u", type.m_typeId);
769 p.print(" nullable:%u", type.m_nullable);
770 p.print(" byteSize:%u", type.m_byteSize);
771 p.print(" csNumber:%u", type.m_csNumber);
772 p.print("]");
773 }
774 return buf;
775 }
776
777 // print DataC
778
779 bool g_ndb_pack_print_hex_always = true;
780
781 NdbOut&
operator <<(NdbOut & out,const NdbPack::DataC & a)782 operator<<(NdbOut& out, const NdbPack::DataC& a)
783 {
784 a.print(out);
785 return out;
786 }
787
788 void
print(NdbOut & out) const789 NdbPack::DataC::print(NdbOut& out) const
790 {
791 char buf[8000];
792 out << print(buf, sizeof(buf));
793 }
794
795 const char*
print(char * buf,Uint32 bufsz,bool convert_flag) const796 NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
797 {
798 Print p(buf, bufsz);
799 const Spec& spec = m_spec;
800 const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
801 if (nullmask_len != 0)
802 {
803 p.print("nullmask:");
804 for (Uint32 i = 0; i < nullmask_len; i++)
805 {
806 int x = m_buf[i];
807 p.print("%02x", x);
808 }
809 }
810 Iter r(*this);
811 for (Uint32 i = 0; i < m_cnt; i++)
812 {
813 desc(r);
814 const Uint8* value = &m_buf[r.m_itemPos];
815 p.print(" [%u", i);
816 p.print(" pos:%u", r.m_itemPos);
817 p.print(" len:%u", r.m_itemLen);
818 if (r.m_itemLen > 0)
819 {
820 p.print(" value:");
821 // some specific types for debugging
822 const Type& type = spec.m_buf[i];
823 bool ok = true;
824 switch (type.m_typeId) {
825 case NDB_TYPE_TINYINT:
826 {
827 Int8 x;
828 memcpy(&x, value, 1);
829 if (convert_flag)
830 Endian::convert(&x, 1);
831 p.print("%d", (int)x);
832 }
833 break;
834 case NDB_TYPE_TINYUNSIGNED:
835 {
836 Uint8 x;
837 memcpy(&x, value, 1);
838 if (convert_flag)
839 Endian::convert(&x, 1);
840 p.print("%u", (uint)x);
841 }
842 break;
843 case NDB_TYPE_SMALLINT:
844 {
845 Int16 x;
846 memcpy(&x, value, 2);
847 if (convert_flag)
848 Endian::convert(&x, 2);
849 p.print("%d", (int)x);
850 }
851 break;
852 case NDB_TYPE_SMALLUNSIGNED:
853 {
854 Uint16 x;
855 memcpy(&x, value, 2);
856 if (convert_flag)
857 Endian::convert(&x, 2);
858 p.print("%u", (uint)x);
859 }
860 break;
861 case NDB_TYPE_INT:
862 {
863 Int32 x;
864 memcpy(&x, value, 4);
865 if (convert_flag)
866 Endian::convert(&x, 4);
867 p.print("%d", (int)x);
868 }
869 break;
870 case NDB_TYPE_UNSIGNED:
871 {
872 Uint32 x;
873 memcpy(&x, value, 4);
874 if (convert_flag)
875 Endian::convert(&x, 4);
876 p.print("%u", (uint)x);
877 }
878 break;
879 case NDB_TYPE_FLOAT:
880 {
881 float x;
882 memcpy(&x, value, 4);
883 if (convert_flag)
884 Endian::convert(&x, 4);
885 p.print("%g", (double)x);
886 }
887 break;
888 case NDB_TYPE_DOUBLE:
889 {
890 double x;
891 memcpy(&x, value, 8);
892 if (convert_flag)
893 Endian::convert(&x, 8);
894 p.print("%g", x);
895 }
896 break;
897 case NDB_TYPE_CHAR:
898 case NDB_TYPE_VARCHAR:
899 case NDB_TYPE_LONGVARCHAR:
900 {
901 const Uint32 off = type.m_arrayType;
902 for (Uint32 j = 0; j < r.m_bareLen; j++)
903 {
904 Uint8 x = value[off + j];
905 p.print("%c", (int)x);
906 }
907 }
908 break;
909 default:
910 ok = false;
911 break;
912 }
913 if (!ok || g_ndb_pack_print_hex_always)
914 {
915 p.print("<");
916 for (Uint32 j = 0; j < r.m_itemLen; j++)
917 {
918 int x = value[j];
919 p.print("%02x", x);
920 }
921 p.print(">");
922 }
923 }
924 p.print("]");
925 }
926 return buf;
927 }
928
929 // print Data
930
931 NdbOut&
operator <<(NdbOut & out,const NdbPack::Data & a)932 operator<<(NdbOut& out, const NdbPack::Data& a)
933 {
934 a.print(out);
935 return out;
936 }
937
938 void
print(NdbOut & out) const939 NdbPack::Data::print(NdbOut& out) const
940 {
941 char buf[8000];
942 out << print(buf, sizeof(buf));
943 }
944
945 const char*
print(char * buf,Uint32 bufsz) const946 NdbPack::Data::print(char* buf, Uint32 bufsz) const
947 {
948 Print p(buf, bufsz);
949 if (m_varBytes != 0)
950 {
951 p.print("varBytes:");
952 for (Uint32 i = 0; i < m_varBytes; i++)
953 {
954 int r = m_buf[i];
955 p.print("%02x", r);
956 }
957 p.print(" ");
958 }
959 p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
960 p.print(" ");
961 const bool convert_flag =
962 m_endian != Endian::Native &&
963 m_endian != Endian::get_endian();
964 DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
965 return buf;
966 }
967
968 // print BoundC
969
970 NdbOut&
operator <<(NdbOut & out,const NdbPack::BoundC & a)971 operator<<(NdbOut& out, const NdbPack::BoundC& a)
972 {
973 a.print(out);
974 return out;
975 }
976
977 void
print(NdbOut & out) const978 NdbPack::BoundC::print(NdbOut& out) const
979 {
980 char buf[8000];
981 out << print(buf, sizeof(buf));
982 }
983
984 const char*
print(char * buf,Uint32 bufsz) const985 NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
986 {
987 Print p(buf, bufsz);
988 p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
989 m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
990 return buf;
991 }
992
993 // print Bound
994
995 NdbOut&
operator <<(NdbOut & out,const NdbPack::Bound & a)996 operator<<(NdbOut& out, const NdbPack::Bound& a)
997 {
998 a.print(out);
999 return out;
1000 }
1001
1002 void
print(NdbOut & out) const1003 NdbPack::Bound::print(NdbOut& out) const
1004 {
1005 char buf[8000];
1006 out << print(buf, sizeof(buf));
1007 }
1008
1009 const char*
print(char * buf,Uint32 bufsz) const1010 NdbPack::Bound::print(char* buf, Uint32 bufsz) const
1011 {
1012 BoundC::print(buf, bufsz);
1013 return buf;
1014 }
1015
1016 // validate
1017
1018 int
validate() const1019 NdbPack::Type::validate() const
1020 {
1021 Type type2 = *this;
1022 if (type2.complete() == -1)
1023 {
1024 set_error(type2);
1025 return -1;
1026 }
1027 if (memcmp(this, &type2, sizeof(Type)) != 0)
1028 {
1029 set_error(ValidationError, __LINE__);
1030 return -1;
1031 }
1032 return 0;
1033 }
1034
1035 int
validate() const1036 NdbPack::Spec::validate() const
1037 {
1038 Uint32 nullableCnt = 0;
1039 Uint32 varsizeCnt = 0;
1040 for (Uint32 i = 0; i < m_cnt; i++)
1041 {
1042 const Type& type = m_buf[i];
1043 if (type.validate() == -1)
1044 {
1045 set_error(type);
1046 return -1;
1047 }
1048 if (type.m_nullable)
1049 nullableCnt++;
1050 if (type.m_arrayType != 0)
1051 varsizeCnt++;
1052 }
1053 if (m_nullableCnt != nullableCnt)
1054 {
1055 set_error(ValidationError, __LINE__);
1056 return -1;
1057 }
1058 if (m_varsizeCnt != varsizeCnt)
1059 {
1060 set_error(ValidationError, __LINE__);
1061 return -1;
1062 }
1063 return 0;
1064 }
1065
1066 int
validate() const1067 NdbPack::Data::validate() const
1068 {
1069 if (DataC::validate() == -1)
1070 return -1;
1071 const Iter& r = m_iter;
1072 if (r.m_cnt != m_cnt)
1073 {
1074 set_error(ValidationError, __LINE__);
1075 return -1;
1076 }
1077 Iter r2(*this);
1078 for (Uint32 i = 0; i < m_cnt; i++)
1079 {
1080 if (desc(r2) == -1)
1081 return -1;
1082 }
1083 if (r.m_itemPos != r2.m_itemPos)
1084 {
1085 set_error(ValidationError, __LINE__);
1086 return -1;
1087 }
1088 if (r.m_cnt != r2.m_cnt)
1089 {
1090 set_error(ValidationError, __LINE__);
1091 return -1;
1092 }
1093 if (r.m_nullCnt != r2.m_nullCnt)
1094 {
1095 set_error(ValidationError, __LINE__);
1096 return -1;
1097 }
1098 if (r.m_itemLen != r2.m_itemLen)
1099 {
1100 set_error(ValidationError, __LINE__);
1101 return -1;
1102 }
1103 return 0;
1104 }
1105
1106 int
validate() const1107 NdbPack::BoundC::validate() const
1108 {
1109 if (m_data.validate() == -1)
1110 {
1111 set_error(m_data);
1112 return -1;
1113 }
1114 if (m_data.m_cnt == 0 && m_side != 0)
1115 {
1116 set_error(ValidationError, __LINE__);
1117 return -1;
1118 }
1119 if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
1120 {
1121 set_error(ValidationError, __LINE__);
1122 return -1;
1123 }
1124 return 0;
1125 }
1126
1127 int
validate() const1128 NdbPack::Bound::validate() const
1129 {
1130 if (BoundC::validate() == -1)
1131 return -1;
1132 if (m_data.validate() == -1)
1133 {
1134 set_error(m_data);
1135 return -1;
1136 }
1137 return 0;
1138 }
1139
1140 #ifdef TEST_NDB_PACK
1141 #include <util/NdbTap.hpp>
1142
1143 #define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
1144
1145 #define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
1146
1147 #define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
1148 #define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
1149 #define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
1150 #define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
1151
1152 #define xmin(a, b) ((a) < (b) ? (a) : (b))
1153
1154 #include <ndb_rand.h>
1155
1156 static uint // random 0..n-1
getrandom(uint n)1157 getrandom(uint n)
1158 {
1159 if (n != 0) {
1160 uint k = ndb_rand();
1161 return k % n;
1162 }
1163 return 0;
1164 }
1165
1166 static uint // random 0..n-1 biased exponentially to smaller
getrandom(uint n,uint bias)1167 getrandom(uint n, uint bias)
1168 {
1169 assert(bias != 0);
1170 uint k = getrandom(n);
1171 bias--;
1172 while (bias != 0) {
1173 k = getrandom(k + 1);
1174 bias--;
1175 }
1176 return k;
1177 }
1178
1179 static bool
getrandompct(uint pct)1180 getrandompct(uint pct)
1181 {
1182 return getrandom(100) < pct;
1183 }
1184
1185 // change in TAPTEST
1186 static int seed = -1; // random
1187 static int loops = 0;
1188 static int spec_cnt = -1; // random
1189 static int fix_type = 0; // all types
1190 static int no_nullable = 0;
1191 static int data_cnt = -1; // Max
1192 static int bound_cnt = -1; // Max
1193 static int verbose = 0;
1194
1195 struct Tspec {
1196 enum { Max = 100 };
1197 enum { MaxBuf = Max * 4000 };
1198 NdbPack::Spec m_spec;
1199 NdbPack::Type m_type[Max];
TspecTspec1200 Tspec() {
1201 m_spec.set_buf(m_type, Max);
1202 }
1203 void create();
1204 };
1205
1206 static NdbOut&
operator <<(NdbOut & out,const Tspec & tspec)1207 operator<<(NdbOut& out, const Tspec& tspec)
1208 {
1209 out << tspec.m_spec;
1210 return out;
1211 }
1212
1213 void
create()1214 Tspec::create()
1215 {
1216 m_spec.reset();
1217 int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
1218 int i = 0;
1219 while (i < cnt) {
1220 int typeId = fix_type;
1221 if (typeId == 0)
1222 typeId = getrandom(g_ndb_pack_type_info_cnt);
1223 const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1224 switch (typeId) {
1225 case NDB_TYPE_INT:
1226 case NDB_TYPE_UNSIGNED:
1227 case NDB_TYPE_CHAR:
1228 case NDB_TYPE_VARCHAR:
1229 case NDB_TYPE_LONGVARCHAR:
1230 break;
1231 default:
1232 continue;
1233 }
1234 require(info.m_supported);
1235 int byteSize = 0;
1236 if (info.m_fixSize != 0)
1237 byteSize = info.m_fixSize;
1238 else if (info.m_arrayType == 0)
1239 byteSize = 1 + getrandom(128, 1); // char(1-128)
1240 else if (info.m_arrayType == 1)
1241 byteSize = 1 + getrandom(256, 2); // varchar(0-255)
1242 else if (info.m_arrayType == 2)
1243 byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
1244 else
1245 require(false);
1246 bool nullable = no_nullable ? false : getrandompct(50);
1247 int csNumber = 0;
1248 if (info.m_charType) {
1249 csNumber = 8; // should include ascii
1250 }
1251 NdbPack::Type type(typeId, byteSize, nullable, csNumber);
1252 chk2(m_spec.add(type) == 0, m_spec);
1253 i++;
1254 }
1255 chk2(m_spec.validate() == 0, m_spec);
1256 }
1257
1258 struct Tdata {
1259 const Tspec& m_tspec;
1260 NdbPack::Data m_data;
1261 const bool m_isBound;
1262 int m_cnt;
1263 Uint8* m_xbuf; // unpacked
1264 int m_xsize;
1265 int m_xoff[Tspec::Max];
1266 int m_xlen[Tspec::Max];
1267 bool m_xnull[Tspec::Max];
1268 int m_xnulls;
1269 Uint32* m_poaiBuf; // plain old attr info
1270 int m_poaiSize;
1271 Uint8* m_packBuf; // packed
1272 int m_packLen;
TdataTdata1273 Tdata(Tspec& tspec, bool isBound, uint varBytes) :
1274 m_tspec(tspec),
1275 m_data(tspec.m_spec, isBound, varBytes),
1276 m_isBound(isBound)
1277 {
1278 m_cnt = tspec.m_spec.get_cnt();
1279 m_xbuf = 0;
1280 m_poaiBuf = 0;
1281 m_packBuf = 0;
1282 }
~TdataTdata1283 ~Tdata() {
1284 delete [] m_xbuf;
1285 delete [] m_poaiBuf;
1286 delete [] m_packBuf;
1287 }
1288 void create();
1289 void add();
1290 void finalize();
1291 // compare using unpacked data
1292 int xcmp(const Tdata& tdata2, int* num_eq) const;
1293 };
1294
1295 static NdbOut&
operator <<(NdbOut & out,const Tdata & tdata)1296 operator<<(NdbOut& out, const Tdata& tdata)
1297 {
1298 out << tdata.m_data;
1299 return out;
1300 }
1301
1302 void
create()1303 Tdata::create()
1304 {
1305 union {
1306 Uint8 xbuf[Tspec::MaxBuf];
1307 Uint64 xbuf_align;
1308 };
1309 (void)xbuf_align; // compiler warning
1310 memset(xbuf, 0x3f, sizeof(xbuf));
1311 m_xsize = 0;
1312 m_xnulls = 0;
1313 Uint32 poaiBuf[Tspec::MaxBuf / 4];
1314 memset(poaiBuf, 0x5f, sizeof(poaiBuf));
1315 m_poaiSize = 0;
1316 m_packLen = m_data.get_var_bytes();
1317 m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
1318 int i = 0, j;
1319 while (i < m_cnt) {
1320 const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
1321 const int typeId = type.get_type_id();
1322 const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1323 m_xnull[i] = type.get_nullable() && getrandompct(25);
1324 m_xnull[i] = false;
1325 if (type.get_nullable() || m_isBound)
1326 m_xnull[i] = getrandompct(20);
1327 int pad = 0; // null-char pad not counted in xlen
1328 if (!m_xnull[i]) {
1329 m_xoff[i] = m_xsize;
1330 Uint8* xptr = &xbuf[m_xsize];
1331 switch (typeId) {
1332 case NDB_TYPE_INT:
1333 {
1334 Int32 x = getrandom(10);
1335 if (getrandompct(50))
1336 x = (-1) * x;
1337 memcpy(xptr, &x, 4);
1338 m_xlen[i] = info.m_fixSize;
1339 }
1340 break;
1341 case NDB_TYPE_UNSIGNED:
1342 {
1343 Uint32 x = getrandom(10);
1344 memcpy(xptr, &x, 4);
1345 m_xlen[i] = info.m_fixSize;
1346 }
1347 break;
1348 case NDB_TYPE_CHAR:
1349 {
1350 require(type.get_byte_size() >= 1);
1351 int max_len = type.get_byte_size();
1352 int len = getrandom(max_len + 1, 1);
1353 for (j = 0; j < len; j++)
1354 {
1355 xptr[j] = 'a' + getrandom(3);
1356 }
1357 for (j = len; j < max_len; j++)
1358 {
1359 xptr[j] = 0x20;
1360 }
1361 m_xlen[i] = max_len;
1362 xptr[max_len] = 0;
1363 pad = 1;
1364 }
1365 break;
1366 case NDB_TYPE_VARCHAR:
1367 {
1368 require(type.get_byte_size() >= 1);
1369 int max_len = type.get_byte_size() - 1;
1370 int len = getrandom(max_len, 2);
1371 require(len < 256);
1372 xptr[0] = len;
1373 for (j = 0; j < len; j++)
1374 {
1375 xptr[1 + j] = 'a' + getrandom(3);
1376 }
1377 m_xlen[i] = 1 + len;
1378 xptr[1 + len] = 0;
1379 pad = 1;
1380 }
1381 break;
1382 case NDB_TYPE_LONGVARCHAR:
1383 {
1384 require(type.get_byte_size() >= 2);
1385 int max_len = type.get_byte_size() - 2;
1386 int len = getrandom(max_len, 3);
1387 require(len < 256 * 256);
1388 xptr[0] = (len & 0xFF);
1389 xptr[1] = (len >> 8);
1390 for (j = 0; j < len; j++)
1391 {
1392 xptr[2 + j] = 'a' + getrandom(3);
1393 }
1394 m_xlen[i] = 2 + len;
1395 xptr[2 + len] = 0;
1396 pad = 1;
1397 }
1398 break;
1399 default:
1400 require(false);
1401 break;
1402 }
1403 m_xsize += m_xlen[i] + pad;
1404 while (m_xsize % 8 != 0)
1405 m_xsize++;
1406 m_packLen += m_xlen[i];
1407 } else {
1408 m_xoff[i] = -1;
1409 m_xlen[i] = 0;
1410 m_xnulls++;
1411 }
1412 require(m_xnull[i] == (m_xoff[i] == -1));
1413 require(m_xnull[i] == (m_xlen[i] == 0));
1414 AttributeHeader ah;
1415 ah.setAttributeId(i); // not used
1416 ah.setByteSize(m_xlen[i]);
1417 poaiBuf[m_poaiSize] = ah.m_value;
1418 m_poaiSize++;
1419 if (!m_xnull[i]) {
1420 memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
1421 m_poaiSize += (m_xlen[i] + 3) / 4;
1422 }
1423 i++;
1424 }
1425 require(m_xsize % 8 == 0);
1426 m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
1427 memcpy(m_xbuf, xbuf, m_xsize);
1428 m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
1429 memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
1430 }
1431
1432 void
add()1433 Tdata::add()
1434 {
1435 m_packBuf = new Uint8 [m_packLen];
1436 m_data.set_buf(m_packBuf, m_packLen);
1437 int i, j;
1438 j = 0;
1439 while (j <= 1) {
1440 if (j == 1)
1441 m_data.reset();
1442 i = 0;
1443 while (i < m_cnt) {
1444 Uint32 xlen = ~(Uint32)0;
1445 if (!m_xnull[i]) {
1446 int xoff = m_xoff[i];
1447 const Uint8* xptr = &m_xbuf[xoff];
1448 chk2(m_data.add(xptr, &xlen) == 0, m_data);
1449 chk1((int)xlen == m_xlen[i]);
1450 } else {
1451 chk2(m_data.add_null(&xlen) == 0, m_data);
1452 chk1(xlen == 0);
1453 }
1454 i++;
1455 }
1456 chk2(m_data.validate() == 0, m_data);
1457 chk1((int)m_data.get_null_cnt() == m_xnulls);
1458 j++;
1459 }
1460 }
1461
1462 void
finalize()1463 Tdata::finalize()
1464 {
1465 chk2(m_data.finalize() == 0, m_data);
1466 ll3("create: " << m_data);
1467 chk1((int)m_data.get_full_len() == m_packLen);
1468 {
1469 const Uint8* p = (const Uint8*)m_data.get_full_buf();
1470 chk1(p[0] + (p[1] << 8) == m_packLen - 2);
1471 }
1472 }
1473
1474 int
xcmp(const Tdata & tdata2,int * num_eq) const1475 Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
1476 {
1477 const Tdata& tdata1 = *this;
1478 require(&tdata1.m_tspec == &tdata2.m_tspec);
1479 const Tspec& tspec = tdata1.m_tspec;
1480 int res = 0;
1481 int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
1482 int i;
1483 for (i = 0; i < cnt; i++) {
1484 if (!tdata1.m_xnull[i]) {
1485 if (!tdata2.m_xnull[i]) {
1486 // the pointers are Uint64-aligned
1487 const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
1488 const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
1489 const int xlen1 = tdata1.m_xlen[i];
1490 const int xlen2 = tdata2.m_xlen[i];
1491 const NdbPack::Type& type = tspec.m_spec.get_type(i);
1492 const int typeId = type.get_type_id();
1493 const int csNumber = type.get_cs_number();
1494 CHARSET_INFO* cs = all_charsets[csNumber];
1495 switch (typeId) {
1496 case NDB_TYPE_INT:
1497 {
1498 require(cs == 0);
1499 Int32 x1 = *(const Int32*)xptr1;
1500 Int32 x2 = *(const Int32*)xptr2;
1501 if (x1 < x2)
1502 res = -1;
1503 else if (x1 > x2)
1504 res = +1;
1505 ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1506 }
1507 break;
1508 case NDB_TYPE_UNSIGNED:
1509 {
1510 require(cs == 0);
1511 Uint32 x1 = *(const Uint32*)xptr1;
1512 Uint32 x2 = *(const Uint32*)xptr2;
1513 if (x1 < x2)
1514 res = -1;
1515 else if (x1 > x2)
1516 res = +1;
1517 ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1518 }
1519 break;
1520 case NDB_TYPE_CHAR:
1521 {
1522 require(cs != 0 && cs->coll != 0);
1523 const uint n1 = xlen1;
1524 const uint n2 = xlen2;
1525 const uchar* t1 = &xptr1[0];
1526 const uchar* t2 = &xptr2[0];
1527 const char* s1 = (const char*)t1;
1528 const char* s2 = (const char*)t2;
1529 chk1(n1 == strlen(s1));
1530 chk1(n2 == strlen(s2));
1531 res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1532 ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1533 }
1534 break;
1535 case NDB_TYPE_VARCHAR:
1536 {
1537 require(cs != 0 && cs->coll != 0);
1538 const uint n1 = xptr1[0];
1539 const uint n2 = xptr2[0];
1540 const uchar* t1 = &xptr1[1];
1541 const uchar* t2 = &xptr2[1];
1542 const char* s1 = (const char*)t1;
1543 const char* s2 = (const char*)t2;
1544 chk1(n1 == strlen(s1));
1545 chk1(n2 == strlen(s2));
1546 res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1547 ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1548 }
1549 break;
1550 case NDB_TYPE_LONGVARCHAR:
1551 {
1552 require(cs != 0 && cs->coll != 0);
1553 const uint n1 = xptr1[0] | (xptr1[1] << 8);
1554 const uint n2 = xptr2[0] | (xptr2[1] << 8);
1555 const uchar* t1 = &xptr1[2];
1556 const uchar* t2 = &xptr2[2];
1557 const char* s1 = (const char*)t1;
1558 const char* s2 = (const char*)t2;
1559 chk1(n1 == strlen(s1));
1560 chk1(n2 == strlen(s2));
1561 res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1562 ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1563 }
1564 break;
1565 default:
1566 require(false);
1567 break;
1568 }
1569 } else
1570 res = +1;
1571 } else if (!tdata2.m_xnull[i])
1572 res = -1;
1573 if (res != 0)
1574 break;
1575 }
1576 *num_eq = i;
1577 ll3("xcmp res:" << res << " num_eq:" << *num_eq);
1578 return res;
1579 }
1580
1581 struct Tbound {
1582 Tdata& m_tdata;
1583 NdbPack::Bound m_bound;
TboundTbound1584 Tbound(Tdata& tdata) :
1585 m_tdata(tdata),
1586 m_bound(tdata.m_data)
1587 {
1588 m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
1589 }
1590 void create();
1591 void add();
1592 void finalize();
1593 int xcmp(const Tdata& tdata2, int* num_eq) const;
1594 int xcmp(const Tbound& tbound2, int* num_eq) const;
1595 };
1596
1597 static NdbOut&
operator <<(NdbOut & out,const Tbound & tbound)1598 operator<<(NdbOut& out, const Tbound& tbound)
1599 {
1600 out << tbound.m_bound;
1601 return out;
1602 }
1603
1604 void
create()1605 Tbound::create()
1606 {
1607 m_tdata.create();
1608 }
1609
1610 void
add()1611 Tbound::add()
1612 {
1613 m_tdata.add();
1614 }
1615
1616 void
finalize()1617 Tbound::finalize()
1618 {
1619 int side = getrandompct(50) ? -1 : +1;
1620 chk2(m_bound.finalize(side) == 0, m_bound);
1621 chk2(m_bound.validate() == 0, m_bound);
1622 chk1((int)m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
1623 }
1624
1625 int
xcmp(const Tdata & tdata2,int * num_eq) const1626 Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
1627 {
1628 const Tbound& tbound1 = *this;
1629 const Tdata& tdata1 = tbound1.m_tdata;
1630 require(tdata1.m_cnt <= tdata2.m_cnt);
1631 *num_eq = -1;
1632 int res = tdata1.xcmp(tdata2, num_eq);
1633 if (res == 0) {
1634 chk1(*num_eq == tdata1.m_cnt);
1635 res = m_bound.get_side();
1636 }
1637 return res;
1638 }
1639
1640 int
xcmp(const Tbound & tbound2,int * num_eq) const1641 Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
1642 {
1643 const Tbound& tbound1 = *this;
1644 const Tdata& tdata1 = tbound1.m_tdata;
1645 const Tdata& tdata2 = tbound2.m_tdata;
1646 *num_eq = -1;
1647 int res = tdata1.xcmp(tdata2, num_eq);
1648 chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
1649 if (res == 0) {
1650 chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
1651 if (tdata1.m_cnt < tdata2.m_cnt)
1652 res = (+1) * tbound1.m_bound.get_side();
1653 else if (tdata1.m_cnt > tdata2.m_cnt)
1654 res = (-1) * tbound2.m_bound.get_side();
1655 else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
1656 res = -1;
1657 else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
1658 res = +1;
1659 }
1660 return res;
1661 }
1662
1663 struct Tdatalist {
1664 enum { Max = 1000 };
1665 Tdata* m_tdata[Max];
1666 int m_cnt;
TdatalistTdatalist1667 Tdatalist(Tspec& tspec) {
1668 m_cnt = data_cnt == -1 ? Max : data_cnt;
1669 int i;
1670 for (i = 0; i < m_cnt; i++) {
1671 m_tdata[i] = new Tdata(tspec, false, 2);
1672 }
1673 }
~TdatalistTdatalist1674 ~Tdatalist() {
1675 int i;
1676 for (i = 0; i < m_cnt; i++) {
1677 delete m_tdata[i];
1678 }
1679 }
1680 void create();
1681 void sort();
1682 };
1683
1684 static NdbOut&
operator <<(NdbOut & out,const Tdatalist & tdatalist)1685 operator<<(NdbOut& out, const Tdatalist& tdatalist)
1686 {
1687 int i;
1688 for (i = 0; i < tdatalist.m_cnt; i++) {
1689 out << "data " << i << ": " << *tdatalist.m_tdata[i];
1690 if (i + 1 < tdatalist.m_cnt)
1691 out << endl;
1692 }
1693 return out;
1694 }
1695
1696 void
create()1697 Tdatalist::create()
1698 {
1699 int i;
1700 for (i = 0; i < m_cnt; i++) {
1701 Tdata& tdata = *m_tdata[i];
1702 tdata.create();
1703 tdata.add();
1704 tdata.finalize();
1705 }
1706 }
1707
1708 static int
data_cmp(const void * a1,const void * a2)1709 data_cmp(const void* a1, const void* a2)
1710 {
1711 const Tdata& tdata1 = **(const Tdata**)a1;
1712 const Tdata& tdata2 = **(const Tdata**)a2;
1713 require(tdata1.m_cnt == tdata2.m_cnt);
1714 const Uint32 cnt = tdata1.m_cnt;
1715 Uint32 num_eq = ~(Uint32)0;
1716 int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
1717 require(num_eq <= (Uint32)tdata1.m_cnt);
1718 require(num_eq <= (Uint32)tdata2.m_cnt);
1719 return res;
1720 }
1721
1722 void
sort()1723 Tdatalist::sort()
1724 {
1725 ll1("data sort: in");
1726 ll3(endl << *this);
1727 qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
1728 ll1("data sort: out");
1729 ll3(endl << *this);
1730 int i;
1731 for (i = 0; i + 1 < m_cnt; i++) {
1732 const Tdata& tdata1 = *m_tdata[i];
1733 const Tdata& tdata2 = *m_tdata[i + 1];
1734 require(tdata1.m_cnt == tdata2.m_cnt);
1735 const Uint32 cnt = tdata1.m_cnt;
1736 Uint32 num_eq1 = ~(Uint32)0;
1737 int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
1738 chk1(res <= 0);
1739 // also via unpacked data
1740 int num_eq2 = -1;
1741 int res2 = tdata1.xcmp(tdata2, &num_eq2);
1742 if (res < 0)
1743 chk1(res2 < 0);
1744 else if (res == 0)
1745 chk1(res2 == 0);
1746 else
1747 chk1(res2 > 0);
1748 chk1(num_eq1 == (Uint32)num_eq2);
1749 }
1750 }
1751
1752 struct Tboundlist {
1753 enum { Max = 1000 };
1754 Tbound* m_tbound[Max];
1755 int m_cnt;
TboundlistTboundlist1756 Tboundlist(Tspec& tspec) {
1757 m_cnt = bound_cnt == -1 ? Max : bound_cnt;
1758 int i;
1759 for (i = 0; i < m_cnt; i++) {
1760 Tdata* tdata = new Tdata(tspec, true, 0);
1761 m_tbound[i] = new Tbound(*tdata);
1762 }
1763 }
~TboundlistTboundlist1764 ~Tboundlist() {
1765 int i;
1766 for (i = 0; i < m_cnt; i++) {
1767 Tdata* tdata = &m_tbound[i]->m_tdata;
1768 delete m_tbound[i];
1769 delete tdata;
1770 }
1771 }
1772 void create();
1773 void sort();
1774 };
1775
1776 static NdbOut&
operator <<(NdbOut & out,const Tboundlist & tboundlist)1777 operator<<(NdbOut& out, const Tboundlist& tboundlist)
1778 {
1779 int i;
1780 for (i = 0; i < tboundlist.m_cnt; i++) {
1781 out << "bound " << i << ": " << *tboundlist.m_tbound[i];
1782 if (i + 1 < tboundlist.m_cnt)
1783 out << endl;
1784 }
1785 return out;
1786 }
1787
1788 void
create()1789 Tboundlist::create()
1790 {
1791 int i;
1792 for (i = 0; i < m_cnt; i++) {
1793 Tbound& tbound = *m_tbound[i];
1794 tbound.create();
1795 tbound.add();
1796 tbound.finalize();
1797 }
1798 }
1799
1800 static int
bound_cmp(const void * a1,const void * a2)1801 bound_cmp(const void* a1, const void* a2)
1802 {
1803 const Tbound& tbound1 = **(const Tbound**)a1;
1804 const Tbound& tbound2 = **(const Tbound**)a2;
1805 const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1806 Uint32 num_eq = ~(Uint32)0;
1807 int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
1808 require(num_eq <= cnt);
1809 require(num_eq <= cnt);
1810 return res;
1811 }
1812
1813 void
sort()1814 Tboundlist::sort()
1815 {
1816 ll1("bound sort: in");
1817 ll3(endl << *this);
1818 qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
1819 ll1("bound sort: out");
1820 ll3(endl << *this);
1821 int i;
1822 for (i = 0; i + 1 < m_cnt; i++) {
1823 const Tbound& tbound1 = *m_tbound[i];
1824 const Tbound& tbound2 = *m_tbound[i + 1];
1825 const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1826 Uint32 num_eq1 = ~(Uint32)0;
1827 int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
1828 chk1(res <= 0);
1829 // also via unpacked data
1830 int num_eq2 = -1;
1831 int res2 = tbound1.xcmp(tbound2, &num_eq2);
1832 if (res < 0)
1833 chk1(res2 < 0);
1834 else if (res == 0)
1835 chk1(res2 == 0);
1836 else
1837 chk1(res2 > 0);
1838 chk1(num_eq1 == (Uint32)num_eq2);
1839 }
1840 }
1841
1842 static void
testdesc(const Tdata & tdata)1843 testdesc(const Tdata& tdata)
1844 {
1845 ll3("testdesc: " << tdata);
1846 const Tspec& tspec = tdata.m_tspec;
1847 const NdbPack::Data& data = tdata.m_data;
1848 const Uint8* buf_old = (const Uint8*)data.get_full_buf();
1849 const Uint32 varBytes = data.get_var_bytes();
1850 // const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
1851 const Uint32 dataLen = data.get_data_len();
1852 const Uint32 fullLen = data.get_full_len();
1853 const Uint32 cnt = data.get_cnt();
1854 chk1(fullLen == varBytes + dataLen);
1855 NdbPack::Data data_new(tspec.m_spec, false, varBytes);
1856 Uint8 buf_new[Tspec::MaxBuf];
1857 data_new.set_buf(buf_new, sizeof(buf_new));
1858 memcpy(buf_new, buf_old, fullLen);
1859 chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
1860 chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
1861 chk1(data_new.get_data_len() == data.get_data_len());
1862 chk1(data_new.get_cnt() == data.get_cnt());
1863 chk1(data_new.get_null_cnt() == data.get_null_cnt());
1864 }
1865
1866 static void
testcopy(const Tdata & tdata)1867 testcopy(const Tdata& tdata)
1868 {
1869 ll3("testcopy: " << tdata);
1870 const Tspec& tspec = tdata.m_tspec;
1871 const NdbPack::Data& data = tdata.m_data;
1872 uint n = getrandom(tdata.m_cnt + 1);
1873 do {
1874 ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
1875 NdbPack::DataC data_old(tspec.m_spec, false);
1876 data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
1877 chk1(data_old.get_cnt() == n);
1878 NdbPack::Data data_new(tspec.m_spec, false, 0);
1879 Uint8 buf_new[Tspec::MaxBuf];
1880 data_new.set_buf(buf_new, sizeof(buf_new));
1881 chk2(data_new.copy(data_old) == 0, data_new);
1882 chk1(data_new.get_cnt() == n);
1883 Uint32 num_eq1 = ~(Uint32)0;
1884 chk1(data_new.cmp(data_old, n, num_eq1) == 0);
1885 chk1(num_eq1 == n);
1886 Uint32 num_eq2 = ~(Uint32)0;
1887 chk1(data_old.cmp(data_new, n, num_eq2) == 0);
1888 chk1(num_eq2 == n);
1889 n = getrandom(n);
1890 } while (n != 0);
1891 }
1892
1893 static void
testpoai(const Tdata & tdata)1894 testpoai(const Tdata& tdata)
1895 {
1896 ll3("testpoai: " << tdata);
1897 const Tspec& tspec = tdata.m_tspec;
1898 const NdbPack::Data& data = tdata.m_data;
1899 NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
1900 Uint8 buf_new[Tspec::MaxBuf];
1901 data_new.set_buf(buf_new, sizeof(buf_new));
1902 Uint32 poaiLen = ~(Uint32)0;
1903 chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
1904 chk2(data_new.finalize() == 0, data_new);
1905 chk2(data_new.validate() == 0, data_new);
1906 chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
1907 chk1(data_new.get_full_len() == data.get_full_len());
1908 chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
1909 chk1(data_new.get_null_cnt() == data.get_null_cnt());
1910 }
1911
1912 static void
testconvert(const Tdata & tdata)1913 testconvert(const Tdata& tdata)
1914 {
1915 ll3("testconvert: " << tdata);
1916 const Tspec& tspec = tdata.m_tspec;
1917 const NdbPack::Data& data = tdata.m_data;
1918 NdbPack::Data data_new(tspec.m_spec, false, 2);
1919 Uint8 buf_new[Tspec::MaxBuf];
1920 data_new.set_buf(buf_new, sizeof(buf_new));
1921 chk2(data_new.copy(data) == 0, data_new);
1922 require(tdata.m_cnt == (int)data.get_cnt());
1923 require(data.get_cnt() == data_new.get_cnt());
1924 const Uint32 cnt = tdata.m_cnt;
1925 Uint32 num_eq;
1926 int i;
1927 for (i = 0; i < 10; i++) {
1928 int k = getrandom(3); // assumes Endian::Value 0,1,2
1929 NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
1930 chk2(data_new.convert(v) == 0, data_new);
1931 if (v == NdbPack::Endian::Native ||
1932 v == NdbPack::Endian::get_endian()) {
1933 num_eq = ~(Uint32)0;
1934 chk1(data.cmp(data_new, cnt, num_eq) == 0);
1935 require(num_eq == cnt);
1936 }
1937 }
1938 }
1939
1940 static void
testdata(const Tdatalist & tdatalist)1941 testdata(const Tdatalist& tdatalist)
1942 {
1943 int i;
1944 for (i = 0; i < tdatalist.m_cnt; i++) {
1945 const Tdata& tdata = *tdatalist.m_tdata[i];
1946 testdesc(tdata);
1947 testcopy(tdata);
1948 testpoai(tdata);
1949 testconvert(tdata);
1950 }
1951 }
1952
1953 static void
testcmp(const Tbound & tbound,const Tdatalist & tdatalist,int * kb)1954 testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
1955 {
1956 ll3("testcmp: " << tbound);
1957 int oldres = 0;
1958 int n1 = 0;
1959 int n2 = 0;
1960 int i;
1961 for (i = 0; i < tdatalist.m_cnt; i++) {
1962 const Tdata& tdata = *tdatalist.m_tdata[i];
1963 require(tbound.m_tdata.m_cnt == (int)tbound.m_bound.get_data().get_cnt());
1964 const Uint32 cnt = tbound.m_tdata.m_cnt;
1965 Uint32 num_eq1 = ~(Uint32)0;
1966 // reverse result for key vs bound
1967 int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
1968 chk1(res != 0);
1969 res = (res < 0 ? (n1++, -1) : (n2++, +1));
1970 if (i > 0) {
1971 // at some point flips from -1 to +1
1972 chk1(oldres <= res);
1973 }
1974 oldres = res;
1975 // also via unpacked data
1976 int num_eq2 = -1;
1977 int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
1978 if (res < 0)
1979 chk1(res2 < 0);
1980 else
1981 chk1(res2 > 0);
1982 chk1(num_eq1 == (Uint32)num_eq2);
1983 }
1984 require(n1 + n2 == tdatalist.m_cnt);
1985 ll2("keys before:" << n1 << " after:" << n2);
1986 *kb = n1;
1987 }
1988
1989 static void
testcmp(const Tboundlist & tboundlist,const Tdatalist & tdatalist)1990 testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
1991 {
1992 int i;
1993 int oldkb = 0;
1994 for (i = 0; i < tboundlist.m_cnt; i++) {
1995 const Tbound& tbound = *tboundlist.m_tbound[i];
1996 int kb = 0;
1997 testcmp(tbound, tdatalist, &kb);
1998 if (i > 0) {
1999 chk1(oldkb <= kb);
2000 }
2001 oldkb = kb;
2002 }
2003 }
2004
2005 static void
testrun()2006 testrun()
2007 {
2008 Tspec tspec;
2009 tspec.create();
2010 ll1("spec: " << tspec);
2011 Tdatalist tdatalist(tspec);
2012 tdatalist.create();
2013 tdatalist.sort();
2014 testdata(tdatalist);
2015 if (bound_cnt != 0) {
2016 Tboundlist tboundlist(tspec);
2017 tboundlist.create();
2018 tboundlist.sort();
2019 testcmp(tboundlist, tdatalist);
2020 }
2021 }
2022
2023 extern void NdbOut_Init();
2024
2025 static int
testmain()2026 testmain()
2027 {
2028 my_init();
2029 NdbOut_Init();
2030 signal(SIGABRT, SIG_DFL);
2031 { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
2032 if (p != 0)
2033 verbose = atoi(p);
2034 }
2035 if (seed == 0)
2036 ll0("random seed: loop number");
2037 else {
2038 if (seed < 0)
2039 seed = getpid();
2040 ll0("random seed: " << seed);
2041 ndb_srand(seed);
2042 }
2043 loops = 100;
2044 int i;
2045 for (i = 0; loops == 0 || i < loops; i++) {
2046 ll0("loop:" << i << "/" << loops);
2047 if (seed == 0)
2048 ndb_srand(i);
2049 testrun();
2050 }
2051 // do not print "ok" in TAPTEST
2052 ndbout << "passed" << endl;
2053 return 0;
2054 }
2055
TAPTEST(NdbPack)2056 TAPTEST(NdbPack)
2057 {
2058 int ret = testmain();
2059 return (ret == 0);
2060 }
2061
2062 #endif
2063