1 /*
2 Copyright (C) 2003-2008 MySQL AB, 2009 Sun Microsystems, Inc.
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #ifndef NdbReceiver_H
27 #define NdbReceiver_H
28 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL // Not part of public interface
29
30 #include <ndb_types.h>
31
32
33 class Ndb;
34 class NdbImpl;
35 class NdbTransaction;
36 class NdbRecord;
37 class NdbQueryOperationImpl;
38
39 class NdbReceiver
40 {
41 friend class Ndb;
42 friend class NdbOperation;
43 friend class NdbQueryImpl;
44 friend class NdbQueryOperationImpl;
45 friend class NdbResultStream;
46 friend class NdbScanOperation;
47 friend class NdbIndexOperation;
48 friend class NdbIndexScanOperation;
49 friend class NdbTransaction;
50 friend class NdbRootFragment;
51 friend int compare_ndbrecord(const NdbReceiver *r1,
52 const NdbReceiver *r2,
53 const NdbRecord *key_record,
54 const NdbRecord *result_record,
55 bool descending,
56 bool read_range_no);
57 friend int spjTest(int argc, char** argv);
58
59 public:
60 enum ReceiverType { NDB_UNINITIALIZED,
61 NDB_OPERATION = 1,
62 NDB_SCANRECEIVER = 2,
63 NDB_INDEX_OPERATION = 3,
64 NDB_QUERY_OPERATION = 4
65 };
66
67 NdbReceiver(Ndb *aNdb);
68 int init(ReceiverType type, bool useRec, void* owner);
69 void release();
70 ~NdbReceiver();
71
getId() const72 Uint32 getId() const{
73 return m_id;
74 }
75
getType() const76 ReceiverType getType() const {
77 return m_type;
78 }
79
80 inline NdbTransaction * getTransaction() const;
getOwner() const81 void* getOwner() const {
82 return m_owner;
83 }
84
85 bool checkMagicNumber() const;
86
next(NdbReceiver * next_arg)87 inline void next(NdbReceiver* next_arg) { m_next = next_arg;}
next()88 inline NdbReceiver* next() { return m_next; }
89
90 void setErrorCode(int);
91
92 /* Prepare for receiving of rows into specified buffer */
93 void prepareReceive(char *buf);
94
95 /* Prepare for reading of rows from specified buffer */
96 void prepareRead(char *buf, Uint32 rows);
97
98 private:
99 Uint32 theMagicNumber;
100 Ndb* m_ndb;
101 Uint32 m_id;
102 Uint32 m_tcPtrI;
103 ReceiverType m_type;
104 void* m_owner;
105 NdbReceiver* m_next;
106
107 /**
108 * At setup
109 */
110 class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
111 void getValues(const NdbRecord*, char*);
112 void prepareSend();
113
114 static
115 void calculate_batch_size(const NdbImpl&,
116 const NdbRecord *,
117 const NdbRecAttr *first_rec_attr,
118 Uint32, Uint32, Uint32&, Uint32&, Uint32&);
119
120 void calculate_batch_size(Uint32 key_size,
121 Uint32 parallelism,
122 Uint32& batch_size,
123 Uint32& batch_byte_size,
124 Uint32& first_batch_size,
125 const NdbRecord *rec) const;
126
127 /*
128 Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
129 during a scan using NdbRecord.
130 */
131 void do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
132 Uint32 key_size, Uint32 read_range_no,
133 Uint32 rowsize, char *buf);
134
135 static
136 Uint32 ndbrecord_rowsize(const NdbRecord *ndb_record,
137 const NdbRecAttr *first_rec_attr,
138 Uint32 key_size,
139 bool read_range_no);
140
141
142 int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
143 int execTRANSID_AI(const Uint32* ptr, Uint32 len);
144 int execTCOPCONF(Uint32 len);
145 int execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows);
146
147 /*
148 We keep different state for old NdbRecAttr based operation and for
149 new NdbRecord style operation.
150 */
151 bool m_using_ndb_record;
152
153 /* members used for NdbRecord operation. */
154 struct {
155 const NdbRecord *m_ndb_record;
156 /* Destination to receive next row into. */
157 char *m_row_recv;
158 /* Block of memory used to read all rows in a batch during scan. */
159 char *m_row_buffer;
160 /*
161 Offsets between two rows in m_row_buffer.
162 This can be different from m_ndb_record->m_row_size, as we sometimes
163 store extra information after each row (range_no and keyinfo).
164 For non-scan operations, this is set to zero.
165 */
166 Uint32 m_row_offset;
167 /*
168 m_read_range_no is true if we are storing the range_no at the end of
169 each row during scans.
170 */
171 bool m_read_range_no;
172 } m_record;
173
174 class NdbRecAttr* theFirstRecAttr;
175 class NdbRecAttr* theCurrentRecAttr;
176
177 /*
178 m_rows is only used in NdbRecAttr mode, but is kept during NdbRecord mode
179 operation to avoid the need for re-allocation.
180 */
181 class NdbRecAttr** m_rows;
182
183 /*
184 When an NdbReceiver is sitting in the NdbScanOperation::m_sent_receivers
185 array, waiting to receive TRANSID_AI data from the kernel, its index into
186 m_sent_receivers is stored in m_list_index, so that we can remove it when
187 done without having to search for it.
188 */
189 Uint32 m_list_index;
190 /*
191 m_current_row serves two purposes, both used during scans:
192
193 1. While rows are being received from the kernel (and the receiver is
194 sitting in the NdbScanOperation::m_sent_receivers array), it holds the
195 row index (into m_rows) for the row to receive the next KEYINFO20 data.
196 This is used to receive keyInfo during scans (for scans that request
197 keyInfo).
198
199 2. While rows are being delivered to the application (and the receiver is
200 sitting in the NdbScanOperation::m_api_receivers array), it holds the
201 next row to be delivered to the application.
202
203 For NdbRecord operation, it works similarly, but instead indexes rows in
204 the RdbRecord m_row_buffer.
205 */
206 Uint32 m_current_row;
207 /* m_result_rows: Total number of rows contained in this batch. */
208 Uint32 m_result_rows;
209
210 Uint32 m__UNUSED;
211
212 /*
213 m_expected_result_length: Total number of 32-bit words of TRANSID_AI and
214 KEYINFO20 data to receive. This is set to zero until SCAN_TABCONF has
215 been received.
216 */
217 Uint32 m_expected_result_length;
218 Uint32 m_received_result_length;
219
hasResults() const220 bool hasResults() const { return m_result_rows > 0; }
nextResult() const221 bool nextResult() const { return m_current_row < m_result_rows; }
222 Uint32 receive_packed_recattr(NdbRecAttr**, Uint32 bmlen,
223 const Uint32* aDataPtr, Uint32 aLength);
224 Uint32 receive_packed_ndbrecord(Uint32 bmlen,
225 const Uint32* aDataPtr,
226 char* row);
227 /* get_row() returns the next available row during NdbRecord scans. */
228 const char *get_row();
229 /*
230 peek_row() returns the row pointer that get_row() will return on next call,
231 without advancing the internal pointer.
232 So two successive calls to peek_row() will return the same pointer, whereas
233 two successive calls to get_row would return different pointers.
234 */
235 const char *peek_row() const;
236 /* get_range_no() returns the range_no from the last returned row. */
237 int get_range_no() const;
238 /* get_keyinfo20)_ returns keyinfo from KEYINFO20 signal. */
239 int get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
240 const char * & data_ptr) const;
241 int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
242 /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
243 void setCurrentRow(char* buffer, Uint32 row);
244 /** Used by NdbQueryOperationImpl.*/
getCurrentRow() const245 Uint32 getCurrentRow() const { return m_current_row; }
246 };
247
248 #ifdef NDB_NO_DROPPED_SIGNAL
249 #include <stdlib.h>
250 #endif
251
252 inline
253 bool
checkMagicNumber() const254 NdbReceiver::checkMagicNumber() const {
255 bool retVal = (theMagicNumber == 0x11223344);
256 #ifdef NDB_NO_DROPPED_SIGNAL
257 if(!retVal){
258 abort();
259 }
260 #endif
261 return retVal;
262 }
263
264 inline
265 void
prepareSend()266 NdbReceiver::prepareSend(){
267 /* Set pointers etc. to prepare for receiving the first row of the batch. */
268 theMagicNumber = 0x11223344;
269 m_current_row = 0;
270 m_result_rows = 0;
271 m_received_result_length = 0;
272 m_expected_result_length = 0;
273 if (m_using_ndb_record)
274 {
275 if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
276 m_record.m_row_recv= m_record.m_row_buffer;
277 }
278 theCurrentRecAttr = theFirstRecAttr;
279 }
280
281 inline
282 int
execTCOPCONF(Uint32 len)283 NdbReceiver::execTCOPCONF(Uint32 len){
284 Uint32 tmp = m_received_result_length;
285 m_expected_result_length = len;
286 #ifdef assert
287 assert(!(tmp && !len));
288 #endif
289 return ((bool)len ^ (bool)tmp ? 0 : 1);
290 }
291
292 inline
293 int
execSCANOPCONF(Uint32 tcPtrI,Uint32 len,Uint32 rows)294 NdbReceiver::execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows){
295 m_tcPtrI = tcPtrI;
296 m_result_rows = rows;
297 Uint32 tmp = m_received_result_length;
298 m_expected_result_length = len;
299 return (tmp == len ? 1 : 0);
300 }
301
302 inline
303 void
setCurrentRow(char * buffer,Uint32 row)304 NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
305 {
306 m_record.m_row_buffer = buffer;
307 m_current_row = row;
308 #ifdef assert
309 assert(m_current_row < m_result_rows);
310 #endif
311 }
312
313 inline
314 const char *
get_row()315 NdbReceiver::get_row()
316 {
317 #ifdef assert
318 assert(m_current_row < m_result_rows);
319 #endif
320 return m_record.m_row_buffer + (m_current_row++ * m_record.m_row_offset);
321 }
322
323 inline
324 const char *
peek_row() const325 NdbReceiver::peek_row() const
326 {
327 return m_record.m_row_buffer + m_current_row * m_record.m_row_offset;
328 }
329
330
331 #endif // DOXYGEN_SHOULD_SKIP_INTERNAL
332 #endif
333