1 /*
2 Copyright (c) 2014, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <NdbApi.hpp>
26 #include <iostream>
27 #include <vector>
28 #include <cstdlib>
29 #include <cstring>
30
31 using namespace std;
32
33 #include "../common/error_handling.hpp"
34 #include "../common/array_adapter.hpp"
35 #include "../common/ndb_util.hpp"
36 #include "../common/util.hpp"
37
38 /**
39 This program inserts [VAR]CHAR/BINARY column data into the table
40 by constructing aRefs using array adapters and then reads those
41 columns back and extracts the data using array adapters.
42
43 schema used
44 CREATE TABLE api_array_using_adapter(
45 ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,
46 ATTR2 CHAR(20) NOT NULL,
47 ATTR3 VARCHAR(20) NOT NULL,
48 ATTR4 VARCHAR(500) NOT NULL,
49 ATTR5 BINARY(20) NOT NULL,
50 ATTR6 VARBINARY(20) NOT NULL,
51 ATTR7 VARBINARY(500) NOT NULL
52 ) engine ndb charset latin1;
53 */
54
55 // Do a cleanup of all inserted rows
do_cleanup(Ndb & ndb)56 static void do_cleanup(Ndb& ndb)
57 {
58 const NdbDictionary::Dictionary* dict = ndb.getDictionary();
59
60 const NdbDictionary::Table *table = dict->getTable("api_array_using_adapter");
61 if (table == nullptr) APIERROR(dict->getNdbError());
62
63 NdbTransaction *transaction= ndb.startTransaction();
64 if (transaction == nullptr) APIERROR(ndb.getNdbError());
65
66 // Delete all 21 rows using a single transaction
67 for (int i = 0; i <= 20; i++)
68 {
69 NdbOperation* myOperation = transaction->getNdbOperation(table);
70 if (myOperation == nullptr) APIERROR(transaction->getNdbError());
71 myOperation->deleteTuple();
72 myOperation->equal("ATTR1", i);
73 }
74
75 if (transaction->execute(NdbTransaction::Commit) != 0)
76 {
77 APIERROR(transaction->getNdbError());
78 }
79 ndb.closeTransaction(transaction);
80 }
81
82 // Use one transaction and insert 21 rows in one batch.
do_insert(Ndb & ndb)83 static void do_insert(Ndb& ndb)
84 {
85 const NdbDictionary::Dictionary* dict = ndb.getDictionary();
86 const NdbDictionary::Table *table = dict->getTable("api_array_using_adapter");
87
88 if (table == NULL)
89 {
90 APIERROR(dict->getNdbError());
91 }
92
93 // Get a column object for each CHAR/VARCHAR/BINARY/VARBINARY column
94 // to insert into.
95 const NdbDictionary::Column *column2 = table->getColumn("ATTR2");
96 if (column2 == NULL)
97 {
98 APIERROR(dict->getNdbError());
99 }
100
101 const NdbDictionary::Column *column3 = table->getColumn("ATTR3");
102 if (column3 == NULL)
103 {
104 APIERROR(dict->getNdbError());
105 }
106
107 const NdbDictionary::Column *column4 = table->getColumn("ATTR4");
108 if (column4 == NULL)
109 {
110 APIERROR(dict->getNdbError());
111 }
112
113 const NdbDictionary::Column *column5 = table->getColumn("ATTR5");
114 if (column5 == NULL)
115 {
116 APIERROR(dict->getNdbError());
117 }
118
119 const NdbDictionary::Column *column6 = table->getColumn("ATTR6");
120 if (column6 == NULL)
121 {
122 APIERROR(dict->getNdbError());
123 }
124
125 const NdbDictionary::Column *column7 = table->getColumn("ATTR7");
126 if (column7 == NULL)
127 {
128 APIERROR(dict->getNdbError());
129 }
130
131 // Create a read/write attribute adapter to be used for all
132 // CHAR/VARCHAR/BINARY/VARBINARY columns.
133 ReadWriteArrayAdapter attr_adapter;
134
135 // Create and initialize sample data.
136 const string meter = 50 * string("''''-,,,,|");
137 unsigned char binary_meter[500];
138 for (unsigned i = 0; i < 500; i++)
139 {
140 binary_meter[i] = (unsigned char)(i % 256);
141 }
142
143 NdbTransaction *transaction= ndb.startTransaction();
144 if (transaction == NULL) APIERROR(ndb.getNdbError());
145
146 // Create 21 operations and put a reference to them in a vector to
147 // be able to find failing operations.
148 vector<NdbOperation*> operations;
149 for (int i = 0; i <= 20; i++)
150 {
151 NdbOperation* operation = transaction->getNdbOperation(table);
152 if (operation == NULL) APIERROR(transaction->getNdbError());
153 operation->insertTuple();
154
155 operation->equal("ATTR1", i);
156
157 /* use ReadWrite Adapter to convert string to aRefs */
158 ReadWriteArrayAdapter::ErrorType error;
159
160 char *attr2_aRef;
161 attr2_aRef= attr_adapter.make_aRef(column2, meter.substr(0,i), error);
162 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
163 "make_aRef failed for ATTR2");
164 operation->setValue("ATTR2", attr2_aRef);
165
166 char *attr3_aRef;
167 attr3_aRef= attr_adapter.make_aRef(column3, meter.substr(0,i), error);
168 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
169 "make_aRef failed for ATTR3");
170 operation->setValue("ATTR3", attr3_aRef);
171
172 char *attr4_aRef;
173 attr4_aRef= attr_adapter.make_aRef(column4, meter.substr(0,20*i), error);
174 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
175 "make_aRef failed for ATTR4");
176 operation->setValue("ATTR4", attr4_aRef);
177
178 char* attr5_aRef;
179 char* attr5_first;
180 attr_adapter.allocate_in_bytes(column5, attr5_aRef, attr5_first, i, error);
181 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
182 "allocate_in_bytes failed for ATTR5");
183 memcpy(attr5_first, binary_meter, i);
184 operation->setValue("ATTR5", attr5_aRef);
185
186 char* attr6_aRef;
187 char* attr6_first;
188 attr_adapter.allocate_in_bytes(column6, attr6_aRef, attr6_first, i, error);
189 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
190 "allocate_in_bytes failed for ATTR6");
191 memcpy(attr6_first, binary_meter, i);
192 operation->setValue("ATTR6", attr6_aRef);
193
194 char* attr7_aRef;
195 char* attr7_first;
196 attr_adapter.allocate_in_bytes(column7, attr7_aRef, attr7_first, 20*i, error);
197 PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
198 "allocate_in_bytes failed for ATTR7");
199 memcpy(attr7_first, binary_meter, 20*i);
200 operation->setValue("ATTR7", attr7_aRef);
201
202 operations.push_back(operation);
203 }
204
205 // Now execute all operations in one batch, and check for errors.
206 if (transaction->execute( NdbTransaction::Commit ) != 0)
207 {
208 for (size_t i = 0; i < operations.size(); i++)
209 {
210 const NdbError err= operations[i]->getNdbError();
211 if(err.code != NdbError::Success)
212 {
213 cout << "Error inserting Row : " << i << endl;
214 PRINT_ERROR(err.code, err.message);
215 }
216 }
217 APIERROR(transaction->getNdbError());
218 }
219 ndb.closeTransaction(transaction);
220 }
221
222 /*
223 Reads the row with id = 17
224 Retrieves an prints value of the [VAR]CHAR/BINARY using array_adapter
225 */
do_read(Ndb & ndb)226 static void do_read(Ndb& ndb)
227 {
228 const NdbDictionary::Dictionary* dict= ndb.getDictionary();
229 const NdbDictionary::Table* table= dict->getTable("api_array_using_adapter");
230
231 if (table == NULL) APIERROR(dict->getNdbError());
232
233 NdbTransaction *transaction= ndb.startTransaction();
234 if (transaction == NULL) APIERROR(ndb.getNdbError());
235
236 NdbOperation *operation= transaction->getNdbOperation(table);
237 if (operation == NULL) APIERROR(transaction->getNdbError());
238
239 operation->readTuple(NdbOperation::LM_Read);
240 operation->equal("ATTR1", 17);
241
242 vector<NdbRecAttr*> attr;
243 const int column_count= table->getNoOfColumns();
244 attr.reserve(column_count);
245 attr.push_back(nullptr);
246 for (int i= 1; i < column_count; i++)
247 {
248 attr.push_back(operation->getValue(i, NULL));
249 if (attr[i] == NULL) APIERROR(transaction->getNdbError());
250 }
251
252 if(transaction->execute( NdbTransaction::Commit ) == -1)
253 APIERROR(transaction->getNdbError());
254
255 /* Now use an array adapter to read the data from columns */
256 const ReadOnlyArrayAdapter attr_adapter;
257 ReadOnlyArrayAdapter::ErrorType error;
258
259 /* print the fetched data */
260 cout << "Row ID : 17\n";
261 for (int i= 1; i < column_count; i++)
262 {
263 if (attr[i] != NULL)
264 {
265 NdbDictionary::Column::Type column_type = attr[i]->getType();
266 cout << "Column id: " << i
267 << ", name: " << attr[i]->getColumn()->getName()
268 << ", size: " << attr[i]->get_size_in_bytes()
269 << ", type: " << column_type_to_string(attr[i]->getType());
270 if(attr_adapter.is_binary_array_type(column_type))
271 {
272 /* if column is [VAR]BINARY, get the byte array and print their sum */
273 const char* data_ptr;
274 size_t data_length;
275 attr_adapter.get_byte_array(attr[i], data_ptr,
276 data_length, error);
277 if(error == ReadOnlyArrayAdapter::Success)
278 {
279 int sum = 0;
280 for (size_t j = 0; j < data_length; j++)
281 sum += (int)(data_ptr[j]);
282 cout << ", stored bytes length: " << data_length
283 << ", sum of byte array: " << sum << endl;
284 }
285 else
286 cout << ", error fetching value." << endl;
287 }
288 else
289 {
290 /* if the column is [VAR]CHAR, retrieve the string and print */
291 std::string value= attr_adapter.get_string(attr[i], error);
292 if(error == ReadOnlyArrayAdapter::Success)
293 {
294 cout << ", stored string length: " << value.length()
295 << ", value: " << value
296 << endl;
297 }
298 else
299 cout << ", error fetching value." << endl;
300 }
301 }
302 }
303
304 ndb.closeTransaction(transaction);
305 }
306
run_application(Ndb_cluster_connection & cluster_connection,const char * database_name)307 static void run_application(Ndb_cluster_connection &cluster_connection,
308 const char* database_name)
309 {
310 /********************************************
311 * Connect to database via NdbApi *
312 ********************************************/
313 // Object representing the database
314 Ndb ndb( &cluster_connection, database_name);
315 if (ndb.init()) APIERROR(ndb.getNdbError());
316
317 /*
318 * Do different operations on database
319 */
320 do_insert(ndb);
321 do_read(ndb);
322 do_cleanup(ndb);
323 }
324
main(int argc,char ** argv)325 int main(int argc, char** argv)
326 {
327 if (argc != 3)
328 {
329 std::cout << "Arguments are <connect_string cluster> <database_name>.\n";
330 exit(-1);
331 }
332 /* ndb_init must be called first */
333 ndb_init();
334 {
335 /* connect to cluster */
336 const char *connectstring = argv[1];
337 Ndb_cluster_connection cluster_connection(connectstring);
338 if (cluster_connection.connect(30 /* retries */,
339 1 /* delay between retries */,
340 0 /* verbose */))
341 {
342 std::cout << "Cluster management server was not ready within 30 secs.\n";
343 exit(-1);
344 }
345
346 /* Connect and wait for the storage nodes */
347 if (cluster_connection.wait_until_ready(30,10) < 0)
348 {
349 std::cout << "Cluster was not ready within 30 secs.\n";
350 exit(-1);
351 }
352
353 /* run the application code */
354 const char* dbname = argv[2];
355 run_application(cluster_connection, dbname);
356 }
357 ndb_end(0);
358
359 return 0;
360 }
361