1 /*
2    Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 /*
27   ndbapi_blob.cpp:
28 
29   Illustrates the manipulation of BLOB (actually TEXT in this example).
30 
31   Shows insert, read, and update, using both inline value buffer and
32   read/write methods.
33  */
34 
35 
36 #include <mysql.h>
37 #include <mysqld_error.h>
38 #include <NdbApi.hpp>
39 /* Used for cout. */
40 #include <iostream>
41 #include <stdio.h>
42 #include <ctype.h>
43 
44 
45 /**
46  * Helper debugging macros
47  */
48 #define PRINT_ERROR(code,msg) \
49   std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
50             << ", code: " << code \
51             << ", msg: " << msg << "." << std::endl
52 #define MYSQLERROR(mysql) { \
53   PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
54   exit(-1); }
55 #define APIERROR(error) { \
56   PRINT_ERROR(error.code,error.message); \
57   exit(-1); }
58 
59 /* Quote taken from Project Gutenberg. */
60 const char *text_quote=
61 "Just at this moment, somehow or other, they began to run.\n"
62 "\n"
63 "  Alice never could quite make out, in thinking it over\n"
64 "afterwards, how it was that they began:  all she remembers is,\n"
65 "that they were running hand in hand, and the Queen went so fast\n"
66 "that it was all she could do to keep up with her:  and still the\n"
67 "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n"
68 "go faster, though she had not breath left to say so.\n"
69 "\n"
70 "  The most curious part of the thing was, that the trees and the\n"
71 "other things round them never changed their places at all:\n"
72 "however fast they went, they never seemed to pass anything.  'I\n"
73 "wonder if all the things move along with us?' thought poor\n"
74 "puzzled Alice.  And the Queen seemed to guess her thoughts, for\n"
75 "she cried, 'Faster!  Don't try to talk!'\n"
76 "\n"
77 "  Not that Alice had any idea of doing THAT.  She felt as if she\n"
78 "would never be able to talk again, she was getting so much out of\n"
79 "breath:  and still the Queen cried 'Faster! Faster!' and dragged\n"
80 "her along.  'Are we nearly there?'  Alice managed to pant out at\n"
81 "last.\n"
82 "\n"
83 "  'Nearly there!' the Queen repeated.  'Why, we passed it ten\n"
84 "minutes ago!  Faster!'  And they ran on for a time in silence,\n"
85 "with the wind whistling in Alice's ears, and almost blowing her\n"
86 "hair off her head, she fancied.\n"
87 "\n"
88 "  'Now!  Now!' cried the Queen.  'Faster!  Faster!'  And they\n"
89 "went so fast that at last they seemed to skim through the air,\n"
90 "hardly touching the ground with their feet, till suddenly, just\n"
91 "as Alice was getting quite exhausted, they stopped, and she found\n"
92 "herself sitting on the ground, breathless and giddy.\n"
93 "\n"
94 "  The Queen propped her up against a tree, and said kindly, 'You\n"
95 "may rest a little now.'\n"
96 "\n"
97 "  Alice looked round her in great surprise.  'Why, I do believe\n"
98 "we've been under this tree the whole time!  Everything's just as\n"
99 "it was!'\n"
100 "\n"
101 "  'Of course it is,' said the Queen, 'what would you have it?'\n"
102 "\n"
103 "  'Well, in OUR country,' said Alice, still panting a little,\n"
104 "'you'd generally get to somewhere else--if you ran very fast\n"
105 "for a long time, as we've been doing.'\n"
106 "\n"
107 "  'A slow sort of country!' said the Queen.  'Now, HERE, you see,\n"
108 "it takes all the running YOU can do, to keep in the same place.\n"
109 "If you want to get somewhere else, you must run at least twice as\n"
110 "fast as that!'\n"
111 "\n"
112 "  'I'd rather not try, please!' said Alice.  'I'm quite content\n"
113 "to stay here--only I AM so hot and thirsty!'\n"
114 "\n"
115 " -- Lewis Carroll, 'Through the Looking-Glass'.";
116 
117 /*
118   Function to drop table.
119 */
drop_table(MYSQL & mysql)120 void drop_table(MYSQL &mysql)
121 {
122   if (mysql_query(&mysql, "DROP TABLE api_blob"))
123     MYSQLERROR(mysql);
124 }
125 
126 
127 /*
128   Functions to create table.
129 */
try_create_table(MYSQL & mysql)130 int try_create_table(MYSQL &mysql)
131 {
132   return mysql_query(&mysql,
133                      "CREATE TABLE"
134                      "  api_blob"
135                      "    (my_id INT UNSIGNED NOT NULL,"
136                      "     my_text TEXT NOT NULL,"
137                      "     PRIMARY KEY USING HASH (my_id))"
138                      "  ENGINE=NDB");
139 }
140 
create_table(MYSQL & mysql)141 void create_table(MYSQL &mysql)
142 {
143   if (try_create_table(mysql))
144   {
145     if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
146       MYSQLERROR(mysql);
147     std::cout << "MySQL Cluster already has example table: api_blob. "
148               << "Dropping it..." << std::endl;
149     /******************
150      * Recreate table *
151      ******************/
152     drop_table(mysql);
153     if (try_create_table(mysql))
154       MYSQLERROR(mysql);
155   }
156 }
157 
populate(Ndb * myNdb)158 int populate(Ndb *myNdb)
159 {
160   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
161   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
162   if (myTable == NULL)
163     APIERROR(myDict->getNdbError());
164 
165   NdbTransaction *myTrans= myNdb->startTransaction();
166   if (myTrans == NULL)
167     APIERROR(myNdb->getNdbError());
168 
169   NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
170   if (myNdbOperation == NULL)
171     APIERROR(myTrans->getNdbError());
172   myNdbOperation->insertTuple();
173   myNdbOperation->equal("my_id", 1);
174   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
175   if (myBlobHandle == NULL)
176     APIERROR(myNdbOperation->getNdbError());
177   myBlobHandle->setValue(text_quote, strlen(text_quote));
178 
179   int check= myTrans->execute(NdbTransaction::Commit);
180   myTrans->close();
181   return check != -1;
182 }
183 
184 
update_key(Ndb * myNdb)185 int update_key(Ndb *myNdb)
186 {
187   /*
188     Uppercase all characters in TEXT field, using primary key operation.
189     Use piece-wise read/write to avoid loading entire data into memory
190     at once.
191   */
192   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
193   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
194   if (myTable == NULL)
195     APIERROR(myDict->getNdbError());
196 
197   NdbTransaction *myTrans= myNdb->startTransaction();
198   if (myTrans == NULL)
199     APIERROR(myNdb->getNdbError());
200 
201   NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
202   if (myNdbOperation == NULL)
203     APIERROR(myTrans->getNdbError());
204   myNdbOperation->updateTuple();
205   myNdbOperation->equal("my_id", 1);
206   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
207   if (myBlobHandle == NULL)
208     APIERROR(myNdbOperation->getNdbError());
209 
210   /* Execute NoCommit to make the blob handle active. */
211   if (-1 == myTrans->execute(NdbTransaction::NoCommit))
212     APIERROR(myTrans->getNdbError());
213 
214   Uint64 length= 0;
215   if (-1 == myBlobHandle->getLength(length))
216     APIERROR(myBlobHandle->getNdbError());
217 
218   /*
219     A real application should use a much larger chunk size for
220     efficiency, preferably much larger than the part size, which
221     defaults to 2000. 64000 might be a good value.
222   */
223 #define CHUNK_SIZE 100
224   int chunk;
225   char buffer[CHUNK_SIZE];
226   for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--)
227   {
228     Uint64 pos= chunk*CHUNK_SIZE;
229     Uint32 chunk_length= CHUNK_SIZE;
230     if (pos + chunk_length > length)
231       chunk_length= length - pos;
232 
233     /* Read from the end back, to illustrate seeking. */
234     if (-1 == myBlobHandle->setPos(pos))
235       APIERROR(myBlobHandle->getNdbError());
236     if (-1 == myBlobHandle->readData(buffer, chunk_length))
237       APIERROR(myBlobHandle->getNdbError());
238     int res= myTrans->execute(NdbTransaction::NoCommit);
239     if (-1 == res)
240       APIERROR(myTrans->getNdbError());
241 
242     /* Uppercase everything. */
243     for (Uint64 j= 0; j < chunk_length; j++)
244       buffer[j]= toupper(buffer[j]);
245 
246     if (-1 == myBlobHandle->setPos(pos))
247       APIERROR(myBlobHandle->getNdbError());
248     if (-1 == myBlobHandle->writeData(buffer, chunk_length))
249       APIERROR(myBlobHandle->getNdbError());
250     /* Commit on the final update. */
251     if (-1 == myTrans->execute(chunk ?
252                                NdbTransaction::NoCommit :
253                                NdbTransaction::Commit))
254       APIERROR(myTrans->getNdbError());
255   }
256 
257   myNdb->closeTransaction(myTrans);
258 
259   return 1;
260 }
261 
262 
update_scan(Ndb * myNdb)263 int update_scan(Ndb *myNdb)
264 {
265   /*
266     Lowercase all characters in TEXT field, using a scan with
267     updateCurrentTuple().
268   */
269   char buffer[10000];
270 
271   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
272   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
273   if (myTable == NULL)
274     APIERROR(myDict->getNdbError());
275 
276   NdbTransaction *myTrans= myNdb->startTransaction();
277   if (myTrans == NULL)
278     APIERROR(myNdb->getNdbError());
279 
280   NdbScanOperation *myScanOp= myTrans->getNdbScanOperation(myTable);
281   if (myScanOp == NULL)
282     APIERROR(myTrans->getNdbError());
283   myScanOp->readTuples(NdbOperation::LM_Exclusive);
284   NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text");
285   if (myBlobHandle == NULL)
286     APIERROR(myScanOp->getNdbError());
287   if (myBlobHandle->getValue(buffer, sizeof(buffer)))
288     APIERROR(myBlobHandle->getNdbError());
289 
290   /* Start the scan. */
291   if (-1 == myTrans->execute(NdbTransaction::NoCommit))
292     APIERROR(myTrans->getNdbError());
293 
294   int res;
295   for (;;)
296   {
297     res= myScanOp->nextResult(true);
298     if (res==1)
299       break;                                    // Scan done.
300     else if (res)
301       APIERROR(myScanOp->getNdbError());
302 
303     Uint64 length= 0;
304     if (myBlobHandle->getLength(length) == -1)
305       APIERROR(myBlobHandle->getNdbError());
306 
307     /* Lowercase everything. */
308     for (Uint64 j= 0; j < length; j++)
309       buffer[j]= tolower(buffer[j]);
310 
311     NdbOperation *myUpdateOp= myScanOp->updateCurrentTuple();
312     if (myUpdateOp == NULL)
313       APIERROR(myTrans->getNdbError());
314     NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text");
315     if (myBlobHandle2 == NULL)
316       APIERROR(myUpdateOp->getNdbError());
317     if (myBlobHandle2->setValue(buffer, length))
318       APIERROR(myBlobHandle2->getNdbError());
319 
320     if (-1 == myTrans->execute(NdbTransaction::NoCommit))
321       APIERROR(myTrans->getNdbError());
322   }
323 
324   if (-1 == myTrans->execute(NdbTransaction::Commit))
325     APIERROR(myTrans->getNdbError());
326 
327   myNdb->closeTransaction(myTrans);
328 
329   return 1;
330 }
331 
332 
333 struct ActiveHookData {
334   char buffer[10000];
335   Uint32 readLength;
336 };
337 
myFetchHook(NdbBlob * myBlobHandle,void * arg)338 int myFetchHook(NdbBlob* myBlobHandle, void* arg)
339 {
340   ActiveHookData *ahd= (ActiveHookData *)arg;
341 
342   ahd->readLength= sizeof(ahd->buffer) - 1;
343   return myBlobHandle->readData(ahd->buffer, ahd->readLength);
344 }
345 
fetch_key(Ndb * myNdb)346 int fetch_key(Ndb *myNdb)
347 {
348   /*
349     Fetch and show the blob field, using setActiveHook().
350   */
351   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
352   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
353   if (myTable == NULL)
354     APIERROR(myDict->getNdbError());
355 
356   NdbTransaction *myTrans= myNdb->startTransaction();
357   if (myTrans == NULL)
358     APIERROR(myNdb->getNdbError());
359 
360   NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
361   if (myNdbOperation == NULL)
362     APIERROR(myTrans->getNdbError());
363   myNdbOperation->readTuple();
364   myNdbOperation->equal("my_id", 1);
365   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
366   if (myBlobHandle == NULL)
367     APIERROR(myNdbOperation->getNdbError());
368   struct ActiveHookData ahd;
369   if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1)
370     APIERROR(myBlobHandle->getNdbError());
371 
372   /*
373     Execute Commit, but calling our callback set up in setActiveHook()
374     before actually committing.
375   */
376   if (-1 == myTrans->execute(NdbTransaction::Commit))
377     APIERROR(myTrans->getNdbError());
378   myNdb->closeTransaction(myTrans);
379 
380   /* Our fetch callback will have been called during the execute(). */
381 
382   ahd.buffer[ahd.readLength]= '\0';
383   std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl;
384 
385   return 1;
386 }
387 
388 
update2_key(Ndb * myNdb)389 int update2_key(Ndb *myNdb)
390 {
391   char buffer[10000];
392 
393   /* Simple setValue() update. */
394   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
395   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
396   if (myTable == NULL)
397     APIERROR(myDict->getNdbError());
398 
399   NdbTransaction *myTrans= myNdb->startTransaction();
400   if (myTrans == NULL)
401     APIERROR(myNdb->getNdbError());
402 
403   NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
404   if (myNdbOperation == NULL)
405     APIERROR(myTrans->getNdbError());
406   myNdbOperation->updateTuple();
407   myNdbOperation->equal("my_id", 1);
408   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
409   if (myBlobHandle == NULL)
410     APIERROR(myNdbOperation->getNdbError());
411   memset(buffer, ' ', sizeof(buffer));
412   if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1)
413     APIERROR(myBlobHandle->getNdbError());
414 
415   if (-1 == myTrans->execute(NdbTransaction::Commit))
416     APIERROR(myTrans->getNdbError());
417   myNdb->closeTransaction(myTrans);
418 
419   return 1;
420 }
421 
422 
delete_key(Ndb * myNdb)423 int delete_key(Ndb *myNdb)
424 {
425   /* Deletion of blob row. */
426   const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
427   const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
428   if (myTable == NULL)
429     APIERROR(myDict->getNdbError());
430 
431   NdbTransaction *myTrans= myNdb->startTransaction();
432   if (myTrans == NULL)
433     APIERROR(myNdb->getNdbError());
434 
435   NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
436   if (myNdbOperation == NULL)
437     APIERROR(myTrans->getNdbError());
438   myNdbOperation->deleteTuple();
439   myNdbOperation->equal("my_id", 1);
440 
441   if (-1 == myTrans->execute(NdbTransaction::Commit))
442     APIERROR(myTrans->getNdbError());
443   myNdb->closeTransaction(myTrans);
444 
445   return 1;
446 }
447 
448 
main(int argc,char ** argv)449 int main(int argc, char**argv)
450 {
451   if (argc != 3)
452   {
453     std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
454     exit(-1);
455   }
456   char *mysqld_sock  = argv[1];
457   const char *connectstring = argv[2];
458   ndb_init();
459   MYSQL mysql;
460 
461   /* Connect to mysql server and create table. */
462   {
463     if ( !mysql_init(&mysql) ) {
464       std::cout << "mysql_init failed.\n";
465       exit(-1);
466     }
467     if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
468                              0, mysqld_sock, 0) )
469       MYSQLERROR(mysql);
470 
471     mysql_query(&mysql, "CREATE DATABASE ndb_examples");
472     if (mysql_query(&mysql, "USE ndb_examples") != 0)
473       MYSQLERROR(mysql);
474 
475     create_table(mysql);
476   }
477 
478   /* Connect to ndb cluster. */
479 
480   Ndb_cluster_connection cluster_connection(connectstring);
481   if (cluster_connection.connect(4, 5, 1))
482   {
483     std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
484     exit(-1);
485   }
486   /* Optionally connect and wait for the storage nodes (ndbd's). */
487   if (cluster_connection.wait_until_ready(30,0) < 0)
488   {
489     std::cout << "Cluster was not ready within 30 secs.\n";
490     exit(-1);
491   }
492 
493   Ndb myNdb(&cluster_connection,"ndb_examples");
494   if (myNdb.init(1024) == -1) {      // Set max 1024 parallel transactions
495     APIERROR(myNdb.getNdbError());
496     exit(-1);
497   }
498 
499   if(populate(&myNdb) > 0)
500     std::cout << "populate: Success!" << std::endl;
501 
502   if(update_key(&myNdb) > 0)
503     std::cout << "update_key: Success!" << std::endl;
504 
505   if(update_scan(&myNdb) > 0)
506     std::cout << "update_scan: Success!" << std::endl;
507 
508   if(fetch_key(&myNdb) > 0)
509     std::cout << "fetch_key: Success!" << std::endl;
510 
511   if(update2_key(&myNdb) > 0)
512     std::cout << "update2_key: Success!" << std::endl;
513 
514   if(delete_key(&myNdb) > 0)
515     std::cout << "delete_key: Success!" << std::endl;
516 
517   return 0;
518 }
519