1 /*
2  Copyright (c) 2012, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <assert.h>
26 #include <stdint.h>
27 #include <cstddef>
28 
29 #include "adapter_global.h"
30 #include "unified_debug.h"
31 #include "Record.h"
32 
Record(NdbDictionary::Dictionary * d,int ncol)33 Record::Record(NdbDictionary::Dictionary *d, int ncol) :
34   dict(d),
35   ncolumns(ncol),
36   n_nullable(0),
37   nblobs(0),
38   index(0),
39   rec_size(0),
40   start_of_nullmap(0),
41   size_of_nullmap(0),
42   ndb_record(0),
43   specs(new NdbDictionary::RecordSpecification[ncol]),
44   pkColumnMask(),
45   allColumnMask(),
46   isPartitionKey(true)                                                     {};
47 
~Record()48 Record::~Record() {
49   // dict->releaseRecord(ndb_record);  // causes crashes due to dict==0. ??
50   delete[] specs;
51 }
52 
53 /*
54  * add a column to a Record
55  */
addColumn(const NdbDictionary::Column * column)56 void Record::addColumn(const NdbDictionary::Column *column) {
57   assert(index < ncolumns);
58 
59   /* Link to the Dictionary Column */
60   specs[index].column = column;
61 
62   /* If the data type requires alignment, insert some padding.
63      This call will alter rec_size if needed */
64   pad_offset_for_alignment();
65 
66   /* The current record size is the offset of this column */
67   specs[index].offset = rec_size;
68 
69   /* Set nullbits in the record specification */
70   if(column->getNullable()) {
71     specs[index].nullbit_byte_offset = n_nullable / 8;
72     specs[index].nullbit_bit_in_byte = n_nullable % 8;
73     n_nullable++;
74   }
75   else {
76     specs[index].nullbit_byte_offset = 0;
77     specs[index].nullbit_bit_in_byte = 0;
78   }
79 
80   /* Maintain masks of all columns and of PK columns */
81   unsigned char mask_bit = (unsigned char) (1U << (index & 7));
82   int mask_byte = index >> 3;
83   assert(mask_byte < 4);
84   allColumnMask.array[mask_byte] |= mask_bit;
85   if(column->getPrimaryKey()) {
86     pkColumnMask.array[mask_byte] |= mask_bit;
87   }
88 
89   /* Track the number of blob columns in the record */
90   if((column->getType() == NdbDictionary::Column::Text) ||
91      (column->getType() == NdbDictionary::Column::Blob))
92   {
93     nblobs++;
94   }
95 
96   /* The record is the partition key only if every column is */
97   isPartitionKey &= column->getPartitionKey();
98 
99   /* Increment the counter and record size */
100   index += 1;
101   rec_size += column->getSizeInBytes();
102 };
103 
104 
build_null_bitmap()105 void Record::build_null_bitmap() {
106   /* The map needs 1 bit for each nullable column */
107   size_of_nullmap = n_nullable / 8;         // whole bytes
108   if(n_nullable % 8) size_of_nullmap += 1;  // partially-used bytes
109 
110   /* The null bitmap goes at the end of the record.
111      Adjust ("relink") the null offsets in every RecordSpecification.
112      Do this even if there are no nullable columns.
113   */
114   start_of_nullmap = rec_size;
115   for(unsigned int n = 0 ; n < ncolumns ; n++)
116     specs[n].nullbit_byte_offset += start_of_nullmap;
117 
118   /* Then adjust the total record size */
119   rec_size += size_of_nullmap;
120 }
121 
122 
123 /*
124  * Finish Table or PrimaryKey record after all columns have been added.
125  */
completeTableRecord(const NdbDictionary::Table * table)126 bool Record::completeTableRecord(const NdbDictionary::Table *table) {
127   build_null_bitmap();
128   ndb_record = dict->createRecord(table, specs, ncolumns, sizeof(specs[0]));
129 
130   assert(index == ncolumns);
131   assert(ndb_record);
132   assert(NdbDictionary::getRecordRowLength(ndb_record) == rec_size);
133 
134   return true;
135 }
136 
137 /* Finish Secondary Index record after all columns have been added.
138 */
completeIndexRecord(const NdbDictionary::Index * ndb_index)139 bool Record::completeIndexRecord(const NdbDictionary::Index *ndb_index) {
140   build_null_bitmap();
141   ndb_record = dict->createRecord(ndb_index, specs, ncolumns, sizeof(specs[0]));
142 
143   assert(ndb_record);
144   assert(index == ncolumns);
145   assert(NdbDictionary::getRecordRowLength(ndb_record) == rec_size);
146 
147   return true;
148 }
149 
150 
151 /* This implementation of padding will align all 2-, 4-, or 8- byte columns,
152    even if they happen to be character columns that do not strictly require
153    alignment.  This is considered a plausibly good time/space trade-off, and
154    in the worst case wastes 3 bytes for a CHAR[5] column.
155 */
pad_offset_for_alignment()156 void Record::pad_offset_for_alignment() {
157   int alignment = specs[index].column->getSizeInBytes();
158   int bad_offset = 0;
159 
160   switch(alignment) {
161     case 2: case 4: case 8:   /* insert padding */
162       bad_offset = rec_size % alignment;
163       if(bad_offset)
164         rec_size += (alignment - bad_offset);
165       break;
166     default:
167       break;
168   }
169 }
170 
171 /*  Assuming that a value is already encoded in buffer, how long is it?
172     For VARCHAR and VARBINARY, this returns the actual length.
173     Otherwise it returns the full length allocated to the value.
174 */
getValueLength(int idx,const char * data) const175 Uint32 Record::getValueLength(int idx, const char *data) const {
176   DEBUG_MARKER(UDEB_DEBUG);
177   assert((size_t) idx < ncolumns);
178   Uint32 size;
179   const NdbDictionary::Column * col = specs[idx].column;
180 
181   if(col->getType() == NdbDictionary::Column::Varchar ||
182      col->getType() == NdbDictionary::Column::Varbinary)
183   {
184     uint8_t size8 = *((const uint8_t *) (data));
185     size = size8;
186   } else if(col->getType() == NdbDictionary::Column::Longvarchar ||
187             col->getType() == NdbDictionary::Column::Longvarbinary)
188   {
189     uint16_t size16 = *((const uint16_t *) (data));
190     size = size16;
191   }
192   else size = col->getSizeInBytes();
193 
194   return size;
195 }
196 
197 /*  How far into a record is the actual encoded value?
198     For all columns other than VARCHAR and VARBINARY, this returns 0.
199     For VARCHAR and VARBINARY, this returns the number of length bytes.
200 */
getValueOffset(int idx) const201 Uint32 Record::getValueOffset(int idx) const {
202   DEBUG_MARKER(UDEB_DEBUG);
203   assert((size_t) idx < ncolumns);
204   Uint32 offset = 0;
205   const NdbDictionary::Column * col = specs[idx].column;
206 
207   if(col->getType() == NdbDictionary::Column::Varchar ||
208      col->getType() == NdbDictionary::Column::Varbinary)             offset = 1;
209   else if(col->getType() == NdbDictionary::Column::Longvarchar ||
210           col->getType() == NdbDictionary::Column::Longvarbinary)    offset = 2;
211 
212   return offset;
213 }
214 
215