1 /*
2 Copyright (c) 2012, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <assert.h>
26 #include <stdint.h>
27 #include <cstddef>
28
29 #include "adapter_global.h"
30 #include "unified_debug.h"
31 #include "Record.h"
32
Record(NdbDictionary::Dictionary * d,int ncol)33 Record::Record(NdbDictionary::Dictionary *d, int ncol) :
34 dict(d),
35 ncolumns(ncol),
36 n_nullable(0),
37 nblobs(0),
38 index(0),
39 rec_size(0),
40 start_of_nullmap(0),
41 size_of_nullmap(0),
42 ndb_record(0),
43 specs(new NdbDictionary::RecordSpecification[ncol]),
44 pkColumnMask(),
45 allColumnMask(),
46 isPartitionKey(true) {};
47
~Record()48 Record::~Record() {
49 // dict->releaseRecord(ndb_record); // causes crashes due to dict==0. ??
50 delete[] specs;
51 }
52
53 /*
54 * add a column to a Record
55 */
addColumn(const NdbDictionary::Column * column)56 void Record::addColumn(const NdbDictionary::Column *column) {
57 assert(index < ncolumns);
58
59 /* Link to the Dictionary Column */
60 specs[index].column = column;
61
62 /* If the data type requires alignment, insert some padding.
63 This call will alter rec_size if needed */
64 pad_offset_for_alignment();
65
66 /* The current record size is the offset of this column */
67 specs[index].offset = rec_size;
68
69 /* Set nullbits in the record specification */
70 if(column->getNullable()) {
71 specs[index].nullbit_byte_offset = n_nullable / 8;
72 specs[index].nullbit_bit_in_byte = n_nullable % 8;
73 n_nullable++;
74 }
75 else {
76 specs[index].nullbit_byte_offset = 0;
77 specs[index].nullbit_bit_in_byte = 0;
78 }
79
80 /* Maintain masks of all columns and of PK columns */
81 unsigned char mask_bit = (unsigned char) (1U << (index & 7));
82 int mask_byte = index >> 3;
83 assert(mask_byte < 4);
84 allColumnMask.array[mask_byte] |= mask_bit;
85 if(column->getPrimaryKey()) {
86 pkColumnMask.array[mask_byte] |= mask_bit;
87 }
88
89 /* Track the number of blob columns in the record */
90 if((column->getType() == NdbDictionary::Column::Text) ||
91 (column->getType() == NdbDictionary::Column::Blob))
92 {
93 nblobs++;
94 }
95
96 /* The record is the partition key only if every column is */
97 isPartitionKey &= column->getPartitionKey();
98
99 /* Increment the counter and record size */
100 index += 1;
101 rec_size += column->getSizeInBytes();
102 };
103
104
build_null_bitmap()105 void Record::build_null_bitmap() {
106 /* The map needs 1 bit for each nullable column */
107 size_of_nullmap = n_nullable / 8; // whole bytes
108 if(n_nullable % 8) size_of_nullmap += 1; // partially-used bytes
109
110 /* The null bitmap goes at the end of the record.
111 Adjust ("relink") the null offsets in every RecordSpecification.
112 Do this even if there are no nullable columns.
113 */
114 start_of_nullmap = rec_size;
115 for(unsigned int n = 0 ; n < ncolumns ; n++)
116 specs[n].nullbit_byte_offset += start_of_nullmap;
117
118 /* Then adjust the total record size */
119 rec_size += size_of_nullmap;
120 }
121
122
123 /*
124 * Finish Table or PrimaryKey record after all columns have been added.
125 */
completeTableRecord(const NdbDictionary::Table * table)126 bool Record::completeTableRecord(const NdbDictionary::Table *table) {
127 build_null_bitmap();
128 ndb_record = dict->createRecord(table, specs, ncolumns, sizeof(specs[0]));
129
130 assert(index == ncolumns);
131 assert(ndb_record);
132 assert(NdbDictionary::getRecordRowLength(ndb_record) == rec_size);
133
134 return true;
135 }
136
137 /* Finish Secondary Index record after all columns have been added.
138 */
completeIndexRecord(const NdbDictionary::Index * ndb_index)139 bool Record::completeIndexRecord(const NdbDictionary::Index *ndb_index) {
140 build_null_bitmap();
141 ndb_record = dict->createRecord(ndb_index, specs, ncolumns, sizeof(specs[0]));
142
143 assert(ndb_record);
144 assert(index == ncolumns);
145 assert(NdbDictionary::getRecordRowLength(ndb_record) == rec_size);
146
147 return true;
148 }
149
150
151 /* This implementation of padding will align all 2-, 4-, or 8- byte columns,
152 even if they happen to be character columns that do not strictly require
153 alignment. This is considered a plausibly good time/space trade-off, and
154 in the worst case wastes 3 bytes for a CHAR[5] column.
155 */
pad_offset_for_alignment()156 void Record::pad_offset_for_alignment() {
157 int alignment = specs[index].column->getSizeInBytes();
158 int bad_offset = 0;
159
160 switch(alignment) {
161 case 2: case 4: case 8: /* insert padding */
162 bad_offset = rec_size % alignment;
163 if(bad_offset)
164 rec_size += (alignment - bad_offset);
165 break;
166 default:
167 break;
168 }
169 }
170
171 /* Assuming that a value is already encoded in buffer, how long is it?
172 For VARCHAR and VARBINARY, this returns the actual length.
173 Otherwise it returns the full length allocated to the value.
174 */
getValueLength(int idx,const char * data) const175 Uint32 Record::getValueLength(int idx, const char *data) const {
176 DEBUG_MARKER(UDEB_DEBUG);
177 assert((size_t) idx < ncolumns);
178 Uint32 size;
179 const NdbDictionary::Column * col = specs[idx].column;
180
181 if(col->getType() == NdbDictionary::Column::Varchar ||
182 col->getType() == NdbDictionary::Column::Varbinary)
183 {
184 uint8_t size8 = *((const uint8_t *) (data));
185 size = size8;
186 } else if(col->getType() == NdbDictionary::Column::Longvarchar ||
187 col->getType() == NdbDictionary::Column::Longvarbinary)
188 {
189 uint16_t size16 = *((const uint16_t *) (data));
190 size = size16;
191 }
192 else size = col->getSizeInBytes();
193
194 return size;
195 }
196
197 /* How far into a record is the actual encoded value?
198 For all columns other than VARCHAR and VARBINARY, this returns 0.
199 For VARCHAR and VARBINARY, this returns the number of length bytes.
200 */
getValueOffset(int idx) const201 Uint32 Record::getValueOffset(int idx) const {
202 DEBUG_MARKER(UDEB_DEBUG);
203 assert((size_t) idx < ncolumns);
204 Uint32 offset = 0;
205 const NdbDictionary::Column * col = specs[idx].column;
206
207 if(col->getType() == NdbDictionary::Column::Varchar ||
208 col->getType() == NdbDictionary::Column::Varbinary) offset = 1;
209 else if(col->getType() == NdbDictionary::Column::Longvarchar ||
210 col->getType() == NdbDictionary::Column::Longvarbinary) offset = 2;
211
212 return offset;
213 }
214
215