1 /*
2    Copyright (c) 2007, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 /*
27   ndbapi_blob_ndbrecord
28 
29   Illustrates the manipulation of BLOB (actually TEXT in this example).
30   This example uses the NdbRecord style way of accessing tuples.
31 
32   Shows insert, read, and update, using both inline value buffer and
33   read/write methods.
34  */
35 
36 #ifdef _WIN32
37 #include <winsock2.h>
38 #endif
39 #include <mysql.h>
40 #include <mysqld_error.h>
41 #include <NdbApi.hpp>
42 /* Used for cout. */
43 #include <iostream>
44 #include <stdio.h>
45 #include <ctype.h>
46 #include <stdlib.h>
47 #include <stddef.h>
48 #include <string.h>
49 
50 /**
51  * Helper debugging macros
52  */
53 #define PRINT_ERROR(code,msg) \
54   std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
55             << ", code: " << code \
56             << ", msg: " << msg << "." << std::endl
57 #define MYSQLERROR(mysql) { \
58   PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
59   exit(-1); }
60 #define APIERROR(error) { \
61   PRINT_ERROR(error.code,error.message); \
62   exit(-1); }
63 
64 /* Quote taken from Project Gutenberg. */
65 const char *text_quote=
66 "Just at this moment, somehow or other, they began to run.\n"
67 "\n"
68 "  Alice never could quite make out, in thinking it over\n"
69 "afterwards, how it was that they began:  all she remembers is,\n"
70 "that they were running hand in hand, and the Queen went so fast\n"
71 "that it was all she could do to keep up with her:  and still the\n"
72 "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n"
73 "go faster, though she had not breath left to say so.\n"
74 "\n"
75 "  The most curious part of the thing was, that the trees and the\n"
76 "other things round them never changed their places at all:\n"
77 "however fast they went, they never seemed to pass anything.  'I\n"
78 "wonder if all the things move along with us?' thought poor\n"
79 "puzzled Alice.  And the Queen seemed to guess her thoughts, for\n"
80 "she cried, 'Faster!  Don't try to talk!'\n"
81 "\n"
82 "  Not that Alice had any idea of doing THAT.  She felt as if she\n"
83 "would never be able to talk again, she was getting so much out of\n"
84 "breath:  and still the Queen cried 'Faster! Faster!' and dragged\n"
85 "her along.  'Are we nearly there?'  Alice managed to pant out at\n"
86 "last.\n"
87 "\n"
88 "  'Nearly there!' the Queen repeated.  'Why, we passed it ten\n"
89 "minutes ago!  Faster!'  And they ran on for a time in silence,\n"
90 "with the wind whistling in Alice's ears, and almost blowing her\n"
91 "hair off her head, she fancied.\n"
92 "\n"
93 "  'Now!  Now!' cried the Queen.  'Faster!  Faster!'  And they\n"
94 "went so fast that at last they seemed to skim through the air,\n"
95 "hardly touching the ground with their feet, till suddenly, just\n"
96 "as Alice was getting quite exhausted, they stopped, and she found\n"
97 "herself sitting on the ground, breathless and giddy.\n"
98 "\n"
99 "  The Queen propped her up against a tree, and said kindly, 'You\n"
100 "may rest a little now.'\n"
101 "\n"
102 "  Alice looked round her in great surprise.  'Why, I do believe\n"
103 "we've been under this tree the whole time!  Everything's just as\n"
104 "it was!'\n"
105 "\n"
106 "  'Of course it is,' said the Queen, 'what would you have it?'\n"
107 "\n"
108 "  'Well, in OUR country,' said Alice, still panting a little,\n"
109 "'you'd generally get to somewhere else--if you ran very fast\n"
110 "for a long time, as we've been doing.'\n"
111 "\n"
112 "  'A slow sort of country!' said the Queen.  'Now, HERE, you see,\n"
113 "it takes all the running YOU can do, to keep in the same place.\n"
114 "If you want to get somewhere else, you must run at least twice as\n"
115 "fast as that!'\n"
116 "\n"
117 "  'I'd rather not try, please!' said Alice.  'I'm quite content\n"
118 "to stay here--only I AM so hot and thirsty!'\n"
119 "\n"
120 " -- Lewis Carroll, 'Through the Looking-Glass'.";
121 
122 /* NdbRecord objects. */
123 
124 const NdbRecord *key_record;                    // For specifying table key
125 const NdbRecord *blob_record;                   // For accessing blob
126 const NdbRecord *full_record;                   // All columns, for insert
127 
128 /* C struct representing the row layout */
129 struct MyRow
130 {
131   unsigned int myId;
132 
133   /* Pointer to Blob handle for operations on the blob column
134    * Space must be left for it in the row, but a pointer to the
135    * blob handle can also be obtained via calls to
136    * NdbOperation::getBlobHandle()
137    */
138   NdbBlob* myText;
139 };
140 
setup_records(Ndb * myNdb)141 static void setup_records(Ndb *myNdb)
142 {
143   NdbDictionary::RecordSpecification spec[2];
144 
145   NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
146   const NdbDictionary::Table *myTable= myDict->getTable("api_blob_ndbrecord");
147   if (myTable == NULL)
148     APIERROR(myDict->getNdbError());
149   const NdbDictionary::Column *col1= myTable->getColumn("my_id");
150   if (col1 == NULL)
151     APIERROR(myDict->getNdbError());
152   const NdbDictionary::Column *col2= myTable->getColumn("my_text");
153   if (col2 == NULL)
154     APIERROR(myDict->getNdbError());
155 
156   spec[0].column= col1;
157   spec[0].offset= offsetof(MyRow, myId);
158   spec[0].nullbit_byte_offset= 0;
159   spec[0].nullbit_bit_in_byte= 0;
160   spec[1].column= col2;
161   spec[1].offset= offsetof(MyRow, myText);
162   spec[1].nullbit_byte_offset= 0;
163   spec[1].nullbit_bit_in_byte= 0;
164 
165   key_record= myDict->createRecord(myTable, &spec[0], 1, sizeof(spec[0]));
166   if (key_record == NULL)
167     APIERROR(myDict->getNdbError());
168   blob_record= myDict->createRecord(myTable, &spec[1], 1, sizeof(spec[0]));
169   if (blob_record == NULL)
170     APIERROR(myDict->getNdbError());
171   full_record= myDict->createRecord(myTable, &spec[0], 2, sizeof(spec[0]));
172   if (full_record == NULL)
173     APIERROR(myDict->getNdbError());
174 }
175 
176 /*
177   Function to drop table.
178 */
drop_table(MYSQL & mysql)179 void drop_table(MYSQL &mysql)
180 {
181   if (mysql_query(&mysql, "DROP TABLE api_blob_ndbrecord"))
182     MYSQLERROR(mysql);
183 }
184 
185 
186 /*
187   Functions to create table.
188 */
try_create_table(MYSQL & mysql)189 int try_create_table(MYSQL &mysql)
190 {
191   return mysql_query(&mysql,
192                      "CREATE TABLE"
193                      "  api_blob_ndbrecord"
194                      "    (my_id INT UNSIGNED NOT NULL,"
195                      "     my_text TEXT NOT NULL,"
196                      "     PRIMARY KEY USING HASH (my_id))"
197                      "  ENGINE=NDB");
198 }
199 
create_table(MYSQL & mysql)200 void create_table(MYSQL &mysql)
201 {
202   if (try_create_table(mysql))
203   {
204     if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
205       MYSQLERROR(mysql);
206     std::cout << "MySQL Cluster already has example table: api_blob_ndbrecord. "
207               << "Dropping it..." << std::endl;
208     /******************
209      * Recreate table *
210      ******************/
211     drop_table(mysql);
212     if (try_create_table(mysql))
213       MYSQLERROR(mysql);
214   }
215 }
216 
populate(Ndb * myNdb)217 int populate(Ndb *myNdb)
218 {
219   MyRow row;
220 
221   NdbTransaction *myTrans= myNdb->startTransaction();
222   if (myTrans == NULL)
223     APIERROR(myNdb->getNdbError());
224 
225   row.myId= 1;
226   const NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, (const char*) &row);
227   if (myNdbOperation == NULL)
228     APIERROR(myTrans->getNdbError());
229 
230   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
231   if (myBlobHandle == NULL)
232     APIERROR(myNdbOperation->getNdbError());
233   myBlobHandle->setValue(text_quote, strlen(text_quote));
234 
235   int check= myTrans->execute(NdbTransaction::Commit);
236   myTrans->close();
237   return check != -1;
238 }
239 
240 
update_key(Ndb * myNdb)241 int update_key(Ndb *myNdb)
242 {
243   MyRow row;
244 
245   /*
246     Uppercase all characters in TEXT field, using primary key operation.
247     Use piece-wise read/write to avoid loading entire data into memory
248     at once.
249   */
250 
251   NdbTransaction *myTrans= myNdb->startTransaction();
252   if (myTrans == NULL)
253     APIERROR(myNdb->getNdbError());
254 
255   row.myId= 1;
256 
257   const NdbOperation *myNdbOperation=
258     myTrans->updateTuple(key_record,
259                          (const char*) &row,
260                          blob_record,
261                          (const char*) &row);
262   if (myNdbOperation == NULL)
263     APIERROR(myTrans->getNdbError());
264 
265   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
266   if (myBlobHandle == NULL)
267     APIERROR(myNdbOperation->getNdbError());
268 
269   /* Execute NoCommit to make the blob handle active so
270    * that we can determine the actual Blob length
271    */
272   if (-1 == myTrans->execute(NdbTransaction::NoCommit))
273     APIERROR(myTrans->getNdbError());
274 
275   Uint64 length= 0;
276   if (-1 == myBlobHandle->getLength(length))
277     APIERROR(myBlobHandle->getNdbError());
278 
279   /*
280     A real application should use a much larger chunk size for
281     efficiency, preferably much larger than the part size, which
282     defaults to 2000. 64000 might be a good value.
283   */
284 #define CHUNK_SIZE 100
285   int chunk;
286   char buffer[CHUNK_SIZE];
287   for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--)
288   {
289     Uint64 pos= chunk*CHUNK_SIZE;
290     Uint32 chunk_length= CHUNK_SIZE;
291     if (pos + chunk_length > length)
292       chunk_length= length - pos;
293 
294     /* Read from the end back, to illustrate seeking. */
295     if (-1 == myBlobHandle->setPos(pos))
296       APIERROR(myBlobHandle->getNdbError());
297     if (-1 == myBlobHandle->readData(buffer, chunk_length))
298       APIERROR(myBlobHandle->getNdbError());
299     int res= myTrans->execute(NdbTransaction::NoCommit);
300     if (-1 == res)
301       APIERROR(myTrans->getNdbError());
302 
303     /* Uppercase everything. */
304     for (Uint64 j= 0; j < chunk_length; j++)
305       buffer[j]= toupper(buffer[j]);
306 
307     if (-1 == myBlobHandle->setPos(pos))
308       APIERROR(myBlobHandle->getNdbError());
309     if (-1 == myBlobHandle->writeData(buffer, chunk_length))
310       APIERROR(myBlobHandle->getNdbError());
311     /* Commit on the final update. */
312     if (-1 == myTrans->execute(chunk ?
313                                NdbTransaction::NoCommit :
314                                NdbTransaction::Commit))
315       APIERROR(myTrans->getNdbError());
316   }
317 
318   myNdb->closeTransaction(myTrans);
319 
320   return 1;
321 }
322 
323 
update_scan(Ndb * myNdb)324 int update_scan(Ndb *myNdb)
325 {
326   /*
327     Lowercase all characters in TEXT field, using a scan with
328     updateCurrentTuple().
329   */
330   char buffer[10000];
331 
332   NdbTransaction *myTrans= myNdb->startTransaction();
333   if (myTrans == NULL)
334     APIERROR(myNdb->getNdbError());
335 
336   NdbScanOperation *myScanOp=
337     myTrans->scanTable(blob_record, NdbOperation::LM_Exclusive);
338   if (myScanOp == NULL)
339     APIERROR(myTrans->getNdbError());
340   NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text");
341   if (myBlobHandle == NULL)
342     APIERROR(myScanOp->getNdbError());
343   if (myBlobHandle->getValue(buffer, sizeof(buffer)))
344     APIERROR(myBlobHandle->getNdbError());
345 
346   /* Start the scan. */
347   if (-1 == myTrans->execute(NdbTransaction::NoCommit))
348     APIERROR(myTrans->getNdbError());
349 
350   const MyRow *out_row;
351   int res;
352   for (;;)
353   {
354     res= myScanOp->nextResult((const char**)&out_row, true, false);
355     if (res==1)
356       break;                                    // Scan done.
357     else if (res)
358       APIERROR(myScanOp->getNdbError());
359 
360     Uint64 length= 0;
361     if (myBlobHandle->getLength(length) == -1)
362       APIERROR(myBlobHandle->getNdbError());
363 
364     /* Lowercase everything. */
365     for (Uint64 j= 0; j < length; j++)
366       buffer[j]= tolower(buffer[j]);
367 
368     /* 'Take over' the row locks from the scan to a separate
369      * operation for updating the tuple
370      */
371     const NdbOperation *myUpdateOp=
372       myScanOp->updateCurrentTuple(myTrans,
373                                    blob_record,
374                                    (const char*)out_row);
375     if (myUpdateOp == NULL)
376       APIERROR(myTrans->getNdbError());
377     NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text");
378     if (myBlobHandle2 == NULL)
379       APIERROR(myUpdateOp->getNdbError());
380     if (myBlobHandle2->setValue(buffer, length))
381       APIERROR(myBlobHandle2->getNdbError());
382 
383     if (-1 == myTrans->execute(NdbTransaction::NoCommit))
384       APIERROR(myTrans->getNdbError());
385   }
386 
387   if (-1 == myTrans->execute(NdbTransaction::Commit))
388     APIERROR(myTrans->getNdbError());
389 
390   myNdb->closeTransaction(myTrans);
391 
392   return 1;
393 }
394 
395 
396 struct ActiveHookData {
397   char buffer[10000];
398   Uint32 readLength;
399 };
400 
myFetchHook(NdbBlob * myBlobHandle,void * arg)401 int myFetchHook(NdbBlob* myBlobHandle, void* arg)
402 {
403   ActiveHookData *ahd= (ActiveHookData *)arg;
404 
405   ahd->readLength= sizeof(ahd->buffer) - 1;
406   return myBlobHandle->readData(ahd->buffer, ahd->readLength);
407 }
408 
fetch_key(Ndb * myNdb)409 int fetch_key(Ndb *myNdb)
410 {
411   /* Fetch a blob without specifying how many bytes
412    * to read up front, in one execution using
413    * the 'ActiveHook' mechanism.
414    * The supplied ActiveHook procedure is called when
415    * the Blob handle becomes 'active'.  At that point
416    * the length of the Blob can be obtained, and buffering
417    * arranged, and the data read requested.
418    */
419 
420   /* Separate rows used to specify key and hold result */
421   MyRow key_row;
422   MyRow out_row;
423 
424   /*
425     Fetch and show the blob field, using setActiveHook().
426   */
427 
428   NdbTransaction *myTrans= myNdb->startTransaction();
429   if (myTrans == NULL)
430     APIERROR(myNdb->getNdbError());
431 
432   key_row.myId= 1;
433   out_row.myText= NULL;
434   const NdbOperation *myNdbOperation=
435     myTrans->readTuple(key_record,
436                        (const char*) &key_row,
437                        blob_record,
438                        (char*) &out_row);
439   if (myNdbOperation == NULL)
440     APIERROR(myTrans->getNdbError());
441 
442   /* This time, we'll get the blob handle from the row, because
443    * we can.  Alternatively, we could use the normal mechanism
444    * of calling getBlobHandle().
445    */
446   NdbBlob *myBlobHandle= out_row.myText;
447   if (myBlobHandle == NULL)
448     APIERROR(myNdbOperation->getNdbError());
449   struct ActiveHookData ahd;
450   if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1)
451     APIERROR(myBlobHandle->getNdbError());
452 
453   /*
454     Execute Commit, but calling our callback set up in setActiveHook()
455     before actually committing.
456   */
457   if (-1 == myTrans->execute(NdbTransaction::Commit))
458     APIERROR(myTrans->getNdbError());
459   myNdb->closeTransaction(myTrans);
460 
461   /* Our fetch callback will have been called during the execute(). */
462 
463   ahd.buffer[ahd.readLength]= '\0';
464   std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl;
465 
466   return 1;
467 }
468 
469 
update2_key(Ndb * myNdb)470 int update2_key(Ndb *myNdb)
471 {
472   char buffer[10000];
473   MyRow row;
474 
475   /* Simple setValue() update specified before the
476    * Blob handle is made active
477    */
478 
479   NdbTransaction *myTrans= myNdb->startTransaction();
480   if (myTrans == NULL)
481     APIERROR(myNdb->getNdbError());
482 
483   row.myId= 1;
484   const NdbOperation *myNdbOperation=
485     myTrans->updateTuple(key_record,
486                          (const char*)&row,
487                          blob_record,
488                          (char*) &row);
489   if (myNdbOperation == NULL)
490     APIERROR(myTrans->getNdbError());
491   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
492   if (myBlobHandle == NULL)
493     APIERROR(myNdbOperation->getNdbError());
494   memset(buffer, ' ', sizeof(buffer));
495   if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1)
496     APIERROR(myBlobHandle->getNdbError());
497 
498   if (-1 == myTrans->execute(NdbTransaction::Commit))
499     APIERROR(myTrans->getNdbError());
500   myNdb->closeTransaction(myTrans);
501 
502   return 1;
503 }
504 
505 
delete_key(Ndb * myNdb)506 int delete_key(Ndb *myNdb)
507 {
508   MyRow row;
509 
510   /* Deletion of row containing blob via primary key. */
511 
512   NdbTransaction *myTrans= myNdb->startTransaction();
513   if (myTrans == NULL)
514     APIERROR(myNdb->getNdbError());
515 
516   row.myId= 1;
517   const NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record,
518                                                            (const char*)&row,
519                                                            full_record);
520   if (myNdbOperation == NULL)
521     APIERROR(myTrans->getNdbError());
522 
523   if (-1 == myTrans->execute(NdbTransaction::Commit))
524     APIERROR(myTrans->getNdbError());
525   myNdb->closeTransaction(myTrans);
526 
527   return 1;
528 }
529 
530 
main(int argc,char ** argv)531 int main(int argc, char**argv)
532 {
533   if (argc != 3)
534   {
535     std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
536     exit(-1);
537   }
538   char *mysqld_sock  = argv[1];
539   const char *connectstring = argv[2];
540   ndb_init();
541   MYSQL mysql;
542 
543   /* Connect to mysql server and create table. */
544   {
545     if ( !mysql_init(&mysql) ) {
546       std::cout << "mysql_init failed.\n";
547       exit(-1);
548     }
549     if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
550                              0, mysqld_sock, 0) )
551       MYSQLERROR(mysql);
552 
553     mysql_query(&mysql, "CREATE DATABASE ndb_examples");
554     if (mysql_query(&mysql, "USE ndb_examples") != 0)
555       MYSQLERROR(mysql);
556 
557     create_table(mysql);
558   }
559 
560   /* Connect to ndb cluster. */
561 
562   Ndb_cluster_connection cluster_connection(connectstring);
563   if (cluster_connection.connect(4, 5, 1))
564   {
565     std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
566     exit(-1);
567   }
568   /* Optionally connect and wait for the storage nodes (ndbd's). */
569   if (cluster_connection.wait_until_ready(30,0) < 0)
570   {
571     std::cout << "Cluster was not ready within 30 secs.\n";
572     exit(-1);
573   }
574 
575   Ndb myNdb(&cluster_connection,"ndb_examples");
576   if (myNdb.init(1024) == -1) {      // Set max 1024 parallel transactions
577     APIERROR(myNdb.getNdbError());
578     exit(-1);
579   }
580 
581   setup_records(&myNdb);
582 
583   if(populate(&myNdb) > 0)
584     std::cout << "populate: Success!" << std::endl;
585 
586   if(update_key(&myNdb) > 0)
587     std::cout << "update_key: Success!" << std::endl;
588 
589   if(update_scan(&myNdb) > 0)
590     std::cout << "update_scan: Success!" << std::endl;
591 
592   if(fetch_key(&myNdb) > 0)
593     std::cout << "fetch_key: Success!" << std::endl;
594 
595   if(update2_key(&myNdb) > 0)
596     std::cout << "update2_key: Success!" << std::endl;
597 
598   if(delete_key(&myNdb) > 0)
599     std::cout << "delete_key: Success!" << std::endl;
600 
601   return 0;
602 }
603