1 /*
2    Copyright (c) 2014, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbApi.hpp>
26 #include <iostream>
27 #include <vector>
28 #include <cstdlib>
29 #include <cstring>
30 
31 using namespace std;
32 
33 #include "../common/error_handling.hpp"
34 #include "../common/array_adapter.hpp"
35 #include "../common/ndb_util.hpp"
36 #include "../common/util.hpp"
37 
38 /**
39 This program inserts [VAR]CHAR/BINARY column data into the table
40 by constructing aRefs using array adapters and then reads those
41 columns back and extracts the data using array adapters.
42 
43 schema used
44 CREATE TABLE api_array_using_adapter(
45   ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,
46   ATTR2 CHAR(20) NOT NULL,
47   ATTR3 VARCHAR(20) NOT NULL,
48   ATTR4 VARCHAR(500) NOT NULL,
49   ATTR5 BINARY(20) NOT NULL,
50   ATTR6 VARBINARY(20) NOT NULL,
51   ATTR7 VARBINARY(500) NOT NULL
52 ) engine ndb charset latin1;
53  */
54 
55 // Use one transaction and insert 21 rows in one batch.
do_insert(Ndb & ndb)56 static void do_insert(Ndb& ndb)
57 {
58   const NdbDictionary::Dictionary* dict = ndb.getDictionary();
59   const NdbDictionary::Table *table = dict->getTable("api_array_using_adapter");
60 
61   if (table == NULL)
62   {
63     APIERROR(dict->getNdbError());
64   }
65 
66   // Get a column object for each CHAR/VARCHAR/BINARY/VARBINARY column
67   // to insert into.
68   const NdbDictionary::Column *column2 = table->getColumn("ATTR2");
69   if (column2 == NULL)
70   {
71     APIERROR(dict->getNdbError());
72   }
73 
74   const NdbDictionary::Column *column3 = table->getColumn("ATTR3");
75   if (column3 == NULL)
76   {
77     APIERROR(dict->getNdbError());
78   }
79 
80   const NdbDictionary::Column *column4 = table->getColumn("ATTR4");
81   if (column4 == NULL)
82   {
83     APIERROR(dict->getNdbError());
84   }
85 
86   const NdbDictionary::Column *column5 = table->getColumn("ATTR5");
87   if (column5 == NULL)
88   {
89     APIERROR(dict->getNdbError());
90   }
91 
92   const NdbDictionary::Column *column6 = table->getColumn("ATTR6");
93   if (column6 == NULL)
94   {
95     APIERROR(dict->getNdbError());
96   }
97 
98   const NdbDictionary::Column *column7 = table->getColumn("ATTR7");
99   if (column7 == NULL)
100   {
101     APIERROR(dict->getNdbError());
102   }
103 
104   // Create a read/write attribute adapter to be used for all
105   // CHAR/VARCHAR/BINARY/VARBINARY columns.
106   ReadWriteArrayAdapter attr_adapter;
107 
108   // Create and initialize sample data.
109   const string meter = 50 * string("''''-,,,,|");
110   unsigned char binary_meter[500];
111   for (unsigned i = 0; i < 500; i++)
112   {
113     binary_meter[i] = (unsigned char)(i % 256);
114   }
115 
116   NdbTransaction *transaction= ndb.startTransaction();
117   if (transaction == NULL) APIERROR(ndb.getNdbError());
118 
119   // Create 21 operations and put a reference to them in a vector to
120   // be able to find failing operations.
121   vector<NdbOperation*> operations;
122   for (int i = 0; i <= 20; i++)
123   {
124     NdbOperation* operation = transaction->getNdbOperation(table);
125     if (operation == NULL) APIERROR(transaction->getNdbError());
126     operation->insertTuple();
127 
128     operation->equal("ATTR1", i);
129 
130     /* use ReadWrite Adapter to convert string to aRefs */
131     ReadWriteArrayAdapter::ErrorType error;
132 
133     char *attr2_aRef;
134     attr2_aRef= attr_adapter.make_aRef(column2, meter.substr(0,i), error);
135     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
136                        "make_aRef failed for ATTR2");
137     operation->setValue("ATTR2", attr2_aRef);
138 
139     char *attr3_aRef;
140     attr3_aRef= attr_adapter.make_aRef(column3, meter.substr(0,i), error);
141     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
142                        "make_aRef failed for ATTR3");
143     operation->setValue("ATTR3", attr3_aRef);
144 
145     char *attr4_aRef;
146     attr4_aRef= attr_adapter.make_aRef(column4, meter.substr(0,20*i), error);
147     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
148                        "make_aRef failed for ATTR4");
149     operation->setValue("ATTR4", attr4_aRef);
150 
151     char* attr5_aRef;
152     char* attr5_first;
153     attr_adapter.allocate_in_bytes(column5, attr5_aRef, attr5_first, i, error);
154     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
155                        "allocate_in_bytes failed for ATTR5");
156     memcpy(attr5_first, binary_meter, i);
157     operation->setValue("ATTR5", attr5_aRef);
158 
159     char* attr6_aRef;
160     char* attr6_first;
161     attr_adapter.allocate_in_bytes(column6, attr6_aRef, attr6_first, i, error);
162     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
163                        "allocate_in_bytes failed for ATTR6");
164     memcpy(attr6_first, binary_meter, i);
165     operation->setValue("ATTR6", attr6_aRef);
166 
167     char* attr7_aRef;
168     char* attr7_first;
169     attr_adapter.allocate_in_bytes(column7, attr7_aRef, attr7_first, 20*i, error);
170     PRINT_IF_NOT_EQUAL(error, ReadWriteArrayAdapter::Success,
171                        "allocate_in_bytes failed for ATTR7");
172     memcpy(attr7_first, binary_meter, 20*i);
173     operation->setValue("ATTR7", attr7_aRef);
174 
175     operations.push_back(operation);
176   }
177 
178   // Now execute all operations in one batch, and check for errors.
179   if (transaction->execute( NdbTransaction::Commit ) != 0)
180   {
181     for (size_t i = 0; i < operations.size(); i++)
182     {
183       const NdbError err= operations[i]->getNdbError();
184       if(err.code != NdbError::Success)
185       {
186         cout << "Error inserting Row : " << i << endl;
187         PRINT_ERROR(err.code, err.message);
188       }
189     }
190     APIERROR(transaction->getNdbError());
191   }
192   ndb.closeTransaction(transaction);
193 }
194 
195 /*
196  Reads the row with id = 17
197  Retrieves an prints value of the [VAR]CHAR/BINARY using array_adapter
198  */
do_read(Ndb & ndb)199 static void do_read(Ndb& ndb)
200 {
201   const NdbDictionary::Dictionary* dict= ndb.getDictionary();
202   const NdbDictionary::Table* table= dict->getTable("api_array_using_adapter");
203 
204   if (table == NULL) APIERROR(dict->getNdbError());
205 
206   NdbTransaction *transaction= ndb.startTransaction();
207   if (transaction == NULL) APIERROR(ndb.getNdbError());
208 
209   NdbOperation *operation= transaction->getNdbOperation(table);
210   if (operation == NULL) APIERROR(transaction->getNdbError());
211 
212   operation->readTuple(NdbOperation::LM_Read);
213   operation->equal("ATTR1", 17);
214 
215   vector<NdbRecAttr*> attr;
216   const int column_count= table->getNoOfColumns();
217   attr.reserve(column_count);
218 
219   for (int i= 1; i < column_count; i++)
220   {
221     attr[i] = operation->getValue(i, NULL);
222     if (attr[i] == NULL) APIERROR(transaction->getNdbError());
223   }
224 
225   if(transaction->execute( NdbTransaction::Commit ) == -1)
226     APIERROR(transaction->getNdbError());
227 
228   /* Now use an array adapter to read the data from columns */
229   const ReadOnlyArrayAdapter attr_adapter;
230   ReadOnlyArrayAdapter::ErrorType error;
231 
232   /* print the fetched data */
233   cout << "Row ID : 17\n";
234   for (int i= 1; i < column_count; i++)
235   {
236     if (attr[i] != NULL)
237     {
238       NdbDictionary::Column::Type column_type = attr[i]->getType();
239       cout << "Column id: " << i
240            << ", name: " << attr[i]->getColumn()->getName()
241            << ", size: " << attr[i]->get_size_in_bytes()
242            << ", type: " << column_type_to_string(attr[i]->getType());
243       if(attr_adapter.is_binary_array_type(column_type))
244       {
245         /* if column is [VAR]BINARY, get the byte array and print their sum */
246         const char* data_ptr;
247         size_t data_length;
248         attr_adapter.get_byte_array(attr[i], data_ptr,
249                                     data_length, error);
250         if(error == ReadOnlyArrayAdapter::Success)
251         {
252           int sum = 0;
253           for (size_t j = 0; j < data_length; j++)
254             sum += (int)(data_ptr[j]);
255           cout << ", stored bytes length: " << data_length
256                << ", sum of byte array: " << sum << endl;
257         }
258         else
259           cout << ", error fetching value." << endl;
260       }
261       else
262       {
263         /* if the column is [VAR]CHAR, retrieve the string and print */
264         std::string value= attr_adapter.get_string(attr[i], error);
265         if(error == ReadOnlyArrayAdapter::Success)
266         {
267           cout << ", stored string length: " << value.length()
268                << ", value: " << value
269                << endl;
270         }
271         else
272           cout << ", error fetching value." << endl;
273       }
274     }
275   }
276 
277   ndb.closeTransaction(transaction);
278 }
279 
run_application(Ndb_cluster_connection & cluster_connection,const char * database_name)280 static void run_application(Ndb_cluster_connection &cluster_connection,
281                             const char* database_name)
282 {
283   /********************************************
284    * Connect to database via NdbApi           *
285    ********************************************/
286   // Object representing the database
287   Ndb ndb( &cluster_connection, database_name);
288   if (ndb.init()) APIERROR(ndb.getNdbError());
289 
290   /*
291    * Do different operations on database
292    */
293   do_insert(ndb);
294   do_read(ndb);
295 }
296 
main(int argc,char ** argv)297 int main(int argc, char** argv)
298 {
299   if (argc != 3)
300   {
301     std::cout << "Arguments are <connect_string cluster> <database_name>.\n";
302     exit(-1);
303   }
304   /* ndb_init must be called first */
305   ndb_init();
306   {
307     /* connect to cluster */
308     const char *connectstring = argv[1];
309     Ndb_cluster_connection cluster_connection(connectstring);
310     if (cluster_connection.connect(30 /* retries */,
311                                    1  /* delay between retries */,
312                                    0  /* verbose */))
313     {
314       std::cout << "Cluster management server was not ready within 30 secs.\n";
315       exit(-1);
316     }
317 
318     /* Connect and wait for the storage nodes */
319     if (cluster_connection.wait_until_ready(30,10) < 0)
320     {
321       std::cout << "Cluster was not ready within 30 secs.\n";
322       exit(-1);
323     }
324 
325     /* run the application code */
326     const char* dbname = argv[2];
327     run_application(cluster_connection, dbname);
328   }
329   ndb_end(0);
330 
331   return 0;
332 }
333 
334