1 /*
2    Copyright (c) 2014, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbApi.hpp>
26 #include <iostream>
27 #include <vector>
28 #include <cstdlib>
29 #include <cstring>
30 
31 using namespace std;
32 
33 #include "../common/error_handling.hpp"
34 #include "../common/array_adapter.hpp"
35 #include "../common/ndb_util.hpp"
36 #include "../common/util.hpp"
37 
38 /**
39 This program inserts [VAR]CHAR/BINARY column data into the table
40 by constructing aRefs using array adapters and then reads those
41 columns back and extracts the data using array adapters.
42 
43 schema used
44 CREATE TABLE api_array_using_adapter(
45   ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,
46   ATTR2 CHAR(20) NOT NULL,
47   ATTR3 VARCHAR(20) NOT NULL,
48   ATTR4 VARCHAR(500) NOT NULL,
49   ATTR5 BINARY(20) NOT NULL,
50   ATTR6 VARBINARY(20) NOT NULL,
51   ATTR7 VARBINARY(500) NOT NULL
52 ) engine ndb charset latin1;
53  */
54 
55 // Do a cleanup of all inserted rows
do_cleanup(Ndb & ndb)56 static void do_cleanup(Ndb& ndb)
57 {
58   const NdbDictionary::Dictionary* dict = ndb.getDictionary();
59 
60   const NdbDictionary::Table *table = dict->getTable("api_array_using_adapter");
61   if (table == nullptr) APIERROR(dict->getNdbError());
62 
63   NdbTransaction *transaction= ndb.startTransaction();
64   if (transaction == nullptr) APIERROR(ndb.getNdbError());
65 
66   // Delete all 21 rows using a single transaction
67   for (int i = 0; i <= 20; i++)
68   {
69     NdbOperation* myOperation = transaction->getNdbOperation(table);
70     if (myOperation == nullptr) APIERROR(transaction->getNdbError());
71     myOperation->deleteTuple();
72     myOperation->equal("ATTR1", i);
73   }
74 
75   if (transaction->execute(NdbTransaction::Commit) != 0)
76   {
77     APIERROR(transaction->getNdbError());
78   }
79   ndb.closeTransaction(transaction);
80 }
81 
82 // Use one transaction and insert 21 rows in one batch.
do_insert(Ndb & ndb)83 static void do_insert(Ndb& ndb)
84 {
85   const NdbDictionary::Dictionary* dict = ndb.getDictionary();
86   const NdbDictionary::Table *table = dict->getTable("api_array_using_adapter");
87 
88   if (table == NULL)
89   {
90     APIERROR(dict->getNdbError());
91   }
92 
93   // Get a column object for each CHAR/VARCHAR/BINARY/VARBINARY column
94   // to insert into.
95   const NdbDictionary::Column *column2 = table->getColumn("ATTR2");
96   if (column2 == NULL)
97   {
98     APIERROR(dict->getNdbError());
99   }
100 
101   const NdbDictionary::Column *column3 = table->getColumn("ATTR3");
102   if (column3 == NULL)
103   {
104     APIERROR(dict->getNdbError());
105   }
106 
107   const NdbDictionary::Column *column4 = table->getColumn("ATTR4");
108   if (column4 == NULL)
109   {
110     APIERROR(dict->getNdbError());
111   }
112 
113   const NdbDictionary::Column *column5 = table->getColumn("ATTR5");
114   if (column5 == NULL)
115   {
116     APIERROR(dict->getNdbError());
117   }
118 
119   const NdbDictionary::Column *column6 = table->getColumn("ATTR6");
120   if (column6 == NULL)
121   {
122     APIERROR(dict->getNdbError());
123   }
124 
125   const NdbDictionary::Column *column7 = table->getColumn("ATTR7");
126   if (column7 == NULL)
127   {
128     APIERROR(dict->getNdbError());
129   }
130 
131   // Create a read/write attribute adapter to be used for all
132   // CHAR/VARCHAR/BINARY/VARBINARY columns.
133   ReadWriteArrayAdapter attr_adapter;
134 
135   // Create and initialize sample data.
136   const string meter = 50 * string("''''-,,,,|");
137   unsigned char binary_meter[500];
138   for (unsigned i = 0; i < 500; i++)
139   {
140     binary_meter[i] = (unsigned char)(i % 256);
141   }
142 
143   NdbTransaction *transaction= ndb.startTransaction();
144   if (transaction == NULL) APIERROR(ndb.getNdbError());
145 
146   // Create 21 operations and put a reference to them in a vector to
147   // be able to find failing operations.
148   vector<NdbOperation*> operations;
149   for (int i = 0; i <= 20; i++)
150   {
151     NdbOperation* operation = transaction->getNdbOperation(table);
152     if (operation == NULL) APIERROR(transaction->getNdbError());
153     operation->insertTuple();
154 
155     operation->equal("ATTR1", i);
156 
157     /* use ReadWrite Adapter to convert string to aRefs */
158     ReadWriteArrayAdapter::ErrorType error;
159 
160     char *attr2_aRef;
161     attr2_aRef= attr_adapter.make_aRef(column2, meter.substr(0,i), error);
162     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
163                        "make_aRef failed for ATTR2");
164     operation->setValue("ATTR2", attr2_aRef);
165 
166     char *attr3_aRef;
167     attr3_aRef= attr_adapter.make_aRef(column3, meter.substr(0,i), error);
168     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
169                        "make_aRef failed for ATTR3");
170     operation->setValue("ATTR3", attr3_aRef);
171 
172     char *attr4_aRef;
173     attr4_aRef= attr_adapter.make_aRef(column4, meter.substr(0,20*i), error);
174     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
175                        "make_aRef failed for ATTR4");
176     operation->setValue("ATTR4", attr4_aRef);
177 
178     char* attr5_aRef;
179     char* attr5_first;
180     attr_adapter.allocate_in_bytes(column5, attr5_aRef, attr5_first, i, error);
181     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
182                        "allocate_in_bytes failed for ATTR5");
183     memcpy(attr5_first, binary_meter, i);
184     operation->setValue("ATTR5", attr5_aRef);
185 
186     char* attr6_aRef;
187     char* attr6_first;
188     attr_adapter.allocate_in_bytes(column6, attr6_aRef, attr6_first, i, error);
189     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
190                        "allocate_in_bytes failed for ATTR6");
191     memcpy(attr6_first, binary_meter, i);
192     operation->setValue("ATTR6", attr6_aRef);
193 
194     char* attr7_aRef;
195     char* attr7_first;
196     attr_adapter.allocate_in_bytes(column7, attr7_aRef, attr7_first, 20*i, error);
197     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
198                        "allocate_in_bytes failed for ATTR7");
199     memcpy(attr7_first, binary_meter, 20*i);
200     operation->setValue("ATTR7", attr7_aRef);
201 
202     operations.push_back(operation);
203   }
204 
205   // Now execute all operations in one batch, and check for errors.
206   if (transaction->execute( NdbTransaction::Commit ) != 0)
207   {
208     for (size_t i = 0; i < operations.size(); i++)
209     {
210       const NdbError err= operations[i]->getNdbError();
211       if(err.code != NdbError::Success)
212       {
213         cout << "Error inserting Row : " << i << endl;
214         PRINT_ERROR(err.code, err.message);
215       }
216     }
217     APIERROR(transaction->getNdbError());
218   }
219   ndb.closeTransaction(transaction);
220 }
221 
222 /*
223  Reads the row with id = 17
224  Retrieves an prints value of the [VAR]CHAR/BINARY using array_adapter
225  */
do_read(Ndb & ndb)226 static void do_read(Ndb& ndb)
227 {
228   const NdbDictionary::Dictionary* dict= ndb.getDictionary();
229   const NdbDictionary::Table* table= dict->getTable("api_array_using_adapter");
230 
231   if (table == NULL) APIERROR(dict->getNdbError());
232 
233   NdbTransaction *transaction= ndb.startTransaction();
234   if (transaction == NULL) APIERROR(ndb.getNdbError());
235 
236   NdbOperation *operation= transaction->getNdbOperation(table);
237   if (operation == NULL) APIERROR(transaction->getNdbError());
238 
239   operation->readTuple(NdbOperation::LM_Read);
240   operation->equal("ATTR1", 17);
241 
242   vector<NdbRecAttr*> attr;
243   const int column_count= table->getNoOfColumns();
244   attr.reserve(column_count);
245   attr.push_back(nullptr);
246   for (int i= 1; i < column_count; i++)
247   {
248     attr.push_back(operation->getValue(i, NULL));
249     if (attr[i] == NULL) APIERROR(transaction->getNdbError());
250   }
251 
252   if(transaction->execute( NdbTransaction::Commit ) == -1)
253     APIERROR(transaction->getNdbError());
254 
255   /* Now use an array adapter to read the data from columns */
256   const ReadOnlyArrayAdapter attr_adapter;
257   ReadOnlyArrayAdapter::ErrorType error;
258 
259   /* print the fetched data */
260   cout << "Row ID : 17\n";
261   for (int i= 1; i < column_count; i++)
262   {
263     if (attr[i] != NULL)
264     {
265       NdbDictionary::Column::Type column_type = attr[i]->getType();
266       cout << "Column id: " << i
267            << ", name: " << attr[i]->getColumn()->getName()
268            << ", size: " << attr[i]->get_size_in_bytes()
269            << ", type: " << column_type_to_string(attr[i]->getType());
270       if(attr_adapter.is_binary_array_type(column_type))
271       {
272         /* if column is [VAR]BINARY, get the byte array and print their sum */
273         const char* data_ptr;
274         size_t data_length;
275         attr_adapter.get_byte_array(attr[i], data_ptr,
276                                     data_length, error);
277         if(error == ReadOnlyArrayAdapter::Success)
278         {
279           int sum = 0;
280           for (size_t j = 0; j < data_length; j++)
281             sum += (int)(data_ptr[j]);
282           cout << ", stored bytes length: " << data_length
283                << ", sum of byte array: " << sum << endl;
284         }
285         else
286           cout << ", error fetching value." << endl;
287       }
288       else
289       {
290         /* if the column is [VAR]CHAR, retrieve the string and print */
291         std::string value= attr_adapter.get_string(attr[i], error);
292         if(error == ReadOnlyArrayAdapter::Success)
293         {
294           cout << ", stored string length: " << value.length()
295                << ", value: " << value
296                << endl;
297         }
298         else
299           cout << ", error fetching value." << endl;
300       }
301     }
302   }
303 
304   ndb.closeTransaction(transaction);
305 }
306 
run_application(Ndb_cluster_connection & cluster_connection,const char * database_name)307 static void run_application(Ndb_cluster_connection &cluster_connection,
308                             const char* database_name)
309 {
310   /********************************************
311    * Connect to database via NdbApi           *
312    ********************************************/
313   // Object representing the database
314   Ndb ndb( &cluster_connection, database_name);
315   if (ndb.init()) APIERROR(ndb.getNdbError());
316 
317   /*
318    * Do different operations on database
319    */
320   do_insert(ndb);
321   do_read(ndb);
322   do_cleanup(ndb);
323 }
324 
main(int argc,char ** argv)325 int main(int argc, char** argv)
326 {
327   if (argc != 3)
328   {
329     std::cout << "Arguments are <connect_string cluster> <database_name>.\n";
330     exit(-1);
331   }
332   /* ndb_init must be called first */
333   ndb_init();
334   {
335     /* connect to cluster */
336     const char *connectstring = argv[1];
337     Ndb_cluster_connection cluster_connection(connectstring);
338     if (cluster_connection.connect(30 /* retries */,
339                                    1  /* delay between retries */,
340                                    0  /* verbose */))
341     {
342       std::cout << "Cluster management server was not ready within 30 secs.\n";
343       exit(-1);
344     }
345 
346     /* Connect and wait for the storage nodes */
347     if (cluster_connection.wait_until_ready(30,10) < 0)
348     {
349       std::cout << "Cluster was not ready within 30 secs.\n";
350       exit(-1);
351     }
352 
353     /* run the application code */
354     const char* dbname = argv[2];
355     run_application(cluster_connection, dbname);
356   }
357   ndb_end(0);
358 
359   return 0;
360 }
361