1 /*
2    Copyright (C) 2003-2008 MySQL AB, 2009 Sun Microsystems, Inc.
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #ifndef NdbReceiver_H
27 #define NdbReceiver_H
28 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL  // Not part of public interface
29 
30 #include <ndb_types.h>
31 
32 
33 class Ndb;
34 class NdbImpl;
35 class NdbTransaction;
36 class NdbRecord;
37 class NdbQueryOperationImpl;
38 
39 class NdbReceiver
40 {
41   friend class Ndb;
42   friend class NdbOperation;
43   friend class NdbQueryImpl;
44   friend class NdbQueryOperationImpl;
45   friend class NdbResultStream;
46   friend class NdbScanOperation;
47   friend class NdbIndexOperation;
48   friend class NdbIndexScanOperation;
49   friend class NdbTransaction;
50   friend class NdbRootFragment;
51   friend int compare_ndbrecord(const NdbReceiver *r1,
52                       const NdbReceiver *r2,
53                       const NdbRecord *key_record,
54                       const NdbRecord *result_record,
55                       bool descending,
56                       bool read_range_no);
57   friend int spjTest(int argc, char** argv);
58 
59 public:
60   enum ReceiverType	{ NDB_UNINITIALIZED,
61 			  NDB_OPERATION = 1,
62 			  NDB_SCANRECEIVER = 2,
63 			  NDB_INDEX_OPERATION = 3,
64                           NDB_QUERY_OPERATION = 4
65   };
66 
67   NdbReceiver(Ndb *aNdb);
68   int init(ReceiverType type, bool useRec, void* owner);
69   void release();
70   ~NdbReceiver();
71 
getId() const72   Uint32 getId() const{
73     return m_id;
74   }
75 
getType() const76   ReceiverType getType() const {
77     return m_type;
78   }
79 
80   inline NdbTransaction * getTransaction() const;
getOwner() const81   void* getOwner() const {
82     return m_owner;
83   }
84 
85   bool checkMagicNumber() const;
86 
next(NdbReceiver * next_arg)87   inline void next(NdbReceiver* next_arg) { m_next = next_arg;}
next()88   inline NdbReceiver* next() { return m_next; }
89 
90   void setErrorCode(int);
91 
92   /* Prepare for receiving of rows into specified buffer */
93   void prepareReceive(char *buf);
94 
95   /* Prepare for reading of rows from specified buffer */
96   void prepareRead(char *buf, Uint32 rows);
97 
98 private:
99   Uint32 theMagicNumber;
100   Ndb* m_ndb;
101   Uint32 m_id;
102   Uint32 m_tcPtrI;
103   ReceiverType m_type;
104   void* m_owner;
105   NdbReceiver* m_next;
106 
107   /**
108    * At setup
109    */
110   class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
111   void getValues(const NdbRecord*, char*);
112   void prepareSend();
113 
114   static
115   void calculate_batch_size(const NdbImpl&,
116                             const NdbRecord *,
117                             const NdbRecAttr *first_rec_attr,
118                             Uint32, Uint32, Uint32&, Uint32&, Uint32&);
119 
120   void calculate_batch_size(Uint32 key_size,
121                             Uint32 parallelism,
122                             Uint32& batch_size,
123                             Uint32& batch_byte_size,
124                             Uint32& first_batch_size,
125                             const NdbRecord *rec) const;
126 
127   /*
128     Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
129     during a scan using NdbRecord.
130   */
131   void do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
132                           Uint32 key_size, Uint32 read_range_no,
133                           Uint32 rowsize, char *buf);
134 
135   static
136   Uint32 ndbrecord_rowsize(const NdbRecord *ndb_record,
137                            const NdbRecAttr *first_rec_attr,
138                            Uint32 key_size,
139                            bool   read_range_no);
140 
141 
142   int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
143   int execTRANSID_AI(const Uint32* ptr, Uint32 len);
144   int execTCOPCONF(Uint32 len);
145   int execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows);
146 
147   /*
148     We keep different state for old NdbRecAttr based operation and for
149     new NdbRecord style operation.
150   */
151   bool m_using_ndb_record;
152 
153   /* members used for NdbRecord operation. */
154   struct {
155     const NdbRecord *m_ndb_record;
156     /* Destination to receive next row into. */
157     char *m_row_recv;
158     /* Block of memory used to read all rows in a batch during scan. */
159     char *m_row_buffer;
160     /*
161       Offsets between two rows in m_row_buffer.
162       This can be different from m_ndb_record->m_row_size, as we sometimes
163       store extra information after each row (range_no and keyinfo).
164       For non-scan operations, this is set to zero.
165     */
166     Uint32 m_row_offset;
167     /*
168       m_read_range_no is true if we are storing the range_no at the end of
169       each row during scans.
170     */
171     bool m_read_range_no;
172   } m_record;
173 
174   class NdbRecAttr* theFirstRecAttr;
175   class NdbRecAttr* theCurrentRecAttr;
176 
177   /*
178     m_rows is only used in NdbRecAttr mode, but is kept during NdbRecord mode
179     operation to avoid the need for re-allocation.
180   */
181   class NdbRecAttr** m_rows;
182 
183   /*
184     When an NdbReceiver is sitting in the NdbScanOperation::m_sent_receivers
185     array, waiting to receive TRANSID_AI data from the kernel, its index into
186     m_sent_receivers is stored in m_list_index, so that we can remove it when
187     done without having to search for it.
188   */
189   Uint32 m_list_index;
190   /*
191     m_current_row serves two purposes, both used during scans:
192 
193     1. While rows are being received from the kernel (and the receiver is
194        sitting in the NdbScanOperation::m_sent_receivers array), it holds the
195        row index (into m_rows) for the row to receive the next KEYINFO20 data.
196        This is used to receive keyInfo during scans (for scans that request
197        keyInfo).
198 
199     2. While rows are being delivered to the application (and the receiver is
200        sitting in the NdbScanOperation::m_api_receivers array), it holds the
201        next row to be delivered to the application.
202 
203     For NdbRecord operation, it works similarly, but instead indexes rows in
204     the RdbRecord m_row_buffer.
205   */
206   Uint32 m_current_row;
207   /* m_result_rows: Total number of rows contained in this batch. */
208   Uint32 m_result_rows;
209 
210   Uint32 m__UNUSED;
211 
212   /*
213     m_expected_result_length: Total number of 32-bit words of TRANSID_AI and
214     KEYINFO20 data to receive. This is set to zero until SCAN_TABCONF has
215     been received.
216    */
217   Uint32 m_expected_result_length;
218   Uint32 m_received_result_length;
219 
hasResults() const220   bool hasResults() const { return m_result_rows > 0; }
nextResult() const221   bool nextResult() const { return m_current_row < m_result_rows; }
222   Uint32 receive_packed_recattr(NdbRecAttr**, Uint32 bmlen,
223                                 const Uint32* aDataPtr, Uint32 aLength);
224   Uint32 receive_packed_ndbrecord(Uint32 bmlen,
225                                   const Uint32* aDataPtr,
226                                   char* row);
227   /* get_row() returns the next available row during NdbRecord scans. */
228   const char *get_row();
229   /*
230     peek_row() returns the row pointer that get_row() will return on next call,
231     without advancing the internal pointer.
232     So two successive calls to peek_row() will return the same pointer, whereas
233     two successive calls to get_row would return different pointers.
234   */
235   const char *peek_row() const;
236   /* get_range_no() returns the range_no from the last returned row. */
237   int get_range_no() const;
238   /* get_keyinfo20)_ returns keyinfo from KEYINFO20 signal. */
239   int get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
240                     const char * & data_ptr) const;
241   int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
242   /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
243   void setCurrentRow(char* buffer, Uint32 row);
244   /** Used by NdbQueryOperationImpl.*/
getCurrentRow() const245   Uint32 getCurrentRow() const { return m_current_row; }
246 };
247 
248 #ifdef NDB_NO_DROPPED_SIGNAL
249 #include <stdlib.h>
250 #endif
251 
252 inline
253 bool
checkMagicNumber() const254 NdbReceiver::checkMagicNumber() const {
255   bool retVal = (theMagicNumber == 0x11223344);
256 #ifdef NDB_NO_DROPPED_SIGNAL
257   if(!retVal){
258     abort();
259   }
260 #endif
261   return retVal;
262 }
263 
264 inline
265 void
prepareSend()266 NdbReceiver::prepareSend(){
267   /* Set pointers etc. to prepare for receiving the first row of the batch. */
268   theMagicNumber = 0x11223344;
269   m_current_row = 0;
270   m_result_rows = 0;
271   m_received_result_length = 0;
272   m_expected_result_length = 0;
273   if (m_using_ndb_record)
274   {
275     if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
276       m_record.m_row_recv= m_record.m_row_buffer;
277   }
278   theCurrentRecAttr = theFirstRecAttr;
279 }
280 
281 inline
282 int
execTCOPCONF(Uint32 len)283 NdbReceiver::execTCOPCONF(Uint32 len){
284   Uint32 tmp = m_received_result_length;
285   m_expected_result_length = len;
286 #ifdef assert
287   assert(!(tmp && !len));
288 #endif
289   return ((bool)len ^ (bool)tmp ? 0 : 1);
290 }
291 
292 inline
293 int
execSCANOPCONF(Uint32 tcPtrI,Uint32 len,Uint32 rows)294 NdbReceiver::execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows){
295   m_tcPtrI = tcPtrI;
296   m_result_rows = rows;
297   Uint32 tmp = m_received_result_length;
298   m_expected_result_length = len;
299   return (tmp == len ? 1 : 0);
300 }
301 
302 inline
303 void
setCurrentRow(char * buffer,Uint32 row)304 NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
305 {
306   m_record.m_row_buffer = buffer;
307   m_current_row = row;
308 #ifdef assert
309   assert(m_current_row < m_result_rows);
310 #endif
311 }
312 
313 inline
314 const char *
get_row()315 NdbReceiver::get_row()
316 {
317 #ifdef assert
318   assert(m_current_row < m_result_rows);
319 #endif
320   return m_record.m_row_buffer + (m_current_row++ * m_record.m_row_offset);
321 }
322 
323 inline
324 const char *
peek_row() const325 NdbReceiver::peek_row() const
326 {
327   return m_record.m_row_buffer + m_current_row * m_record.m_row_offset;
328 }
329 
330 
331 #endif // DOXYGEN_SHOULD_SKIP_INTERNAL
332 #endif
333