1 /*
2 Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 /*
27 ndbapi_blob.cpp:
28
29 Illustrates the manipulation of BLOB (actually TEXT in this example).
30
31 Shows insert, read, and update, using both inline value buffer and
32 read/write methods.
33 */
34
35
36 #include <mysql.h>
37 #include <mysqld_error.h>
38 #include <NdbApi.hpp>
39 /* Used for cout. */
40 #include <iostream>
41 #include <stdio.h>
42 #include <ctype.h>
43
44
45 /**
46 * Helper debugging macros
47 */
48 #define PRINT_ERROR(code,msg) \
49 std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
50 << ", code: " << code \
51 << ", msg: " << msg << "." << std::endl
52 #define MYSQLERROR(mysql) { \
53 PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
54 exit(-1); }
55 #define APIERROR(error) { \
56 PRINT_ERROR(error.code,error.message); \
57 exit(-1); }
58
59 /* Quote taken from Project Gutenberg. */
60 const char *text_quote=
61 "Just at this moment, somehow or other, they began to run.\n"
62 "\n"
63 " Alice never could quite make out, in thinking it over\n"
64 "afterwards, how it was that they began: all she remembers is,\n"
65 "that they were running hand in hand, and the Queen went so fast\n"
66 "that it was all she could do to keep up with her: and still the\n"
67 "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n"
68 "go faster, though she had not breath left to say so.\n"
69 "\n"
70 " The most curious part of the thing was, that the trees and the\n"
71 "other things round them never changed their places at all:\n"
72 "however fast they went, they never seemed to pass anything. 'I\n"
73 "wonder if all the things move along with us?' thought poor\n"
74 "puzzled Alice. And the Queen seemed to guess her thoughts, for\n"
75 "she cried, 'Faster! Don't try to talk!'\n"
76 "\n"
77 " Not that Alice had any idea of doing THAT. She felt as if she\n"
78 "would never be able to talk again, she was getting so much out of\n"
79 "breath: and still the Queen cried 'Faster! Faster!' and dragged\n"
80 "her along. 'Are we nearly there?' Alice managed to pant out at\n"
81 "last.\n"
82 "\n"
83 " 'Nearly there!' the Queen repeated. 'Why, we passed it ten\n"
84 "minutes ago! Faster!' And they ran on for a time in silence,\n"
85 "with the wind whistling in Alice's ears, and almost blowing her\n"
86 "hair off her head, she fancied.\n"
87 "\n"
88 " 'Now! Now!' cried the Queen. 'Faster! Faster!' And they\n"
89 "went so fast that at last they seemed to skim through the air,\n"
90 "hardly touching the ground with their feet, till suddenly, just\n"
91 "as Alice was getting quite exhausted, they stopped, and she found\n"
92 "herself sitting on the ground, breathless and giddy.\n"
93 "\n"
94 " The Queen propped her up against a tree, and said kindly, 'You\n"
95 "may rest a little now.'\n"
96 "\n"
97 " Alice looked round her in great surprise. 'Why, I do believe\n"
98 "we've been under this tree the whole time! Everything's just as\n"
99 "it was!'\n"
100 "\n"
101 " 'Of course it is,' said the Queen, 'what would you have it?'\n"
102 "\n"
103 " 'Well, in OUR country,' said Alice, still panting a little,\n"
104 "'you'd generally get to somewhere else--if you ran very fast\n"
105 "for a long time, as we've been doing.'\n"
106 "\n"
107 " 'A slow sort of country!' said the Queen. 'Now, HERE, you see,\n"
108 "it takes all the running YOU can do, to keep in the same place.\n"
109 "If you want to get somewhere else, you must run at least twice as\n"
110 "fast as that!'\n"
111 "\n"
112 " 'I'd rather not try, please!' said Alice. 'I'm quite content\n"
113 "to stay here--only I AM so hot and thirsty!'\n"
114 "\n"
115 " -- Lewis Carroll, 'Through the Looking-Glass'.";
116
117 /*
118 Function to drop table.
119 */
drop_table(MYSQL & mysql)120 void drop_table(MYSQL &mysql)
121 {
122 if (mysql_query(&mysql, "DROP TABLE api_blob"))
123 MYSQLERROR(mysql);
124 }
125
126
127 /*
128 Functions to create table.
129 */
try_create_table(MYSQL & mysql)130 int try_create_table(MYSQL &mysql)
131 {
132 return mysql_query(&mysql,
133 "CREATE TABLE"
134 " api_blob"
135 " (my_id INT UNSIGNED NOT NULL,"
136 " my_text TEXT NOT NULL,"
137 " PRIMARY KEY USING HASH (my_id))"
138 " ENGINE=NDB");
139 }
140
create_table(MYSQL & mysql)141 void create_table(MYSQL &mysql)
142 {
143 if (try_create_table(mysql))
144 {
145 if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
146 MYSQLERROR(mysql);
147 std::cout << "MySQL Cluster already has example table: api_blob. "
148 << "Dropping it..." << std::endl;
149 /******************
150 * Recreate table *
151 ******************/
152 drop_table(mysql);
153 if (try_create_table(mysql))
154 MYSQLERROR(mysql);
155 }
156 }
157
populate(Ndb * myNdb)158 int populate(Ndb *myNdb)
159 {
160 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
161 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
162 if (myTable == NULL)
163 APIERROR(myDict->getNdbError());
164
165 NdbTransaction *myTrans= myNdb->startTransaction();
166 if (myTrans == NULL)
167 APIERROR(myNdb->getNdbError());
168
169 NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
170 if (myNdbOperation == NULL)
171 APIERROR(myTrans->getNdbError());
172 myNdbOperation->insertTuple();
173 myNdbOperation->equal("my_id", 1);
174 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
175 if (myBlobHandle == NULL)
176 APIERROR(myNdbOperation->getNdbError());
177 myBlobHandle->setValue(text_quote, strlen(text_quote));
178
179 int check= myTrans->execute(NdbTransaction::Commit);
180 myTrans->close();
181 return check != -1;
182 }
183
184
update_key(Ndb * myNdb)185 int update_key(Ndb *myNdb)
186 {
187 /*
188 Uppercase all characters in TEXT field, using primary key operation.
189 Use piece-wise read/write to avoid loading entire data into memory
190 at once.
191 */
192 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
193 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
194 if (myTable == NULL)
195 APIERROR(myDict->getNdbError());
196
197 NdbTransaction *myTrans= myNdb->startTransaction();
198 if (myTrans == NULL)
199 APIERROR(myNdb->getNdbError());
200
201 NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
202 if (myNdbOperation == NULL)
203 APIERROR(myTrans->getNdbError());
204 myNdbOperation->updateTuple();
205 myNdbOperation->equal("my_id", 1);
206 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
207 if (myBlobHandle == NULL)
208 APIERROR(myNdbOperation->getNdbError());
209
210 /* Execute NoCommit to make the blob handle active. */
211 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
212 APIERROR(myTrans->getNdbError());
213
214 Uint64 length= 0;
215 if (-1 == myBlobHandle->getLength(length))
216 APIERROR(myBlobHandle->getNdbError());
217
218 /*
219 A real application should use a much larger chunk size for
220 efficiency, preferably much larger than the part size, which
221 defaults to 2000. 64000 might be a good value.
222 */
223 #define CHUNK_SIZE 100
224 int chunk;
225 char buffer[CHUNK_SIZE];
226 for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--)
227 {
228 Uint64 pos= chunk*CHUNK_SIZE;
229 Uint32 chunk_length= CHUNK_SIZE;
230 if (pos + chunk_length > length)
231 chunk_length= length - pos;
232
233 /* Read from the end back, to illustrate seeking. */
234 if (-1 == myBlobHandle->setPos(pos))
235 APIERROR(myBlobHandle->getNdbError());
236 if (-1 == myBlobHandle->readData(buffer, chunk_length))
237 APIERROR(myBlobHandle->getNdbError());
238 int res= myTrans->execute(NdbTransaction::NoCommit);
239 if (-1 == res)
240 APIERROR(myTrans->getNdbError());
241
242 /* Uppercase everything. */
243 for (Uint64 j= 0; j < chunk_length; j++)
244 buffer[j]= toupper(buffer[j]);
245
246 if (-1 == myBlobHandle->setPos(pos))
247 APIERROR(myBlobHandle->getNdbError());
248 if (-1 == myBlobHandle->writeData(buffer, chunk_length))
249 APIERROR(myBlobHandle->getNdbError());
250 /* Commit on the final update. */
251 if (-1 == myTrans->execute(chunk ?
252 NdbTransaction::NoCommit :
253 NdbTransaction::Commit))
254 APIERROR(myTrans->getNdbError());
255 }
256
257 myNdb->closeTransaction(myTrans);
258
259 return 1;
260 }
261
262
update_scan(Ndb * myNdb)263 int update_scan(Ndb *myNdb)
264 {
265 /*
266 Lowercase all characters in TEXT field, using a scan with
267 updateCurrentTuple().
268 */
269 char buffer[10000];
270
271 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
272 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
273 if (myTable == NULL)
274 APIERROR(myDict->getNdbError());
275
276 NdbTransaction *myTrans= myNdb->startTransaction();
277 if (myTrans == NULL)
278 APIERROR(myNdb->getNdbError());
279
280 NdbScanOperation *myScanOp= myTrans->getNdbScanOperation(myTable);
281 if (myScanOp == NULL)
282 APIERROR(myTrans->getNdbError());
283 myScanOp->readTuples(NdbOperation::LM_Exclusive);
284 NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text");
285 if (myBlobHandle == NULL)
286 APIERROR(myScanOp->getNdbError());
287 if (myBlobHandle->getValue(buffer, sizeof(buffer)))
288 APIERROR(myBlobHandle->getNdbError());
289
290 /* Start the scan. */
291 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
292 APIERROR(myTrans->getNdbError());
293
294 int res;
295 for (;;)
296 {
297 res= myScanOp->nextResult(true);
298 if (res==1)
299 break; // Scan done.
300 else if (res)
301 APIERROR(myScanOp->getNdbError());
302
303 Uint64 length= 0;
304 if (myBlobHandle->getLength(length) == -1)
305 APIERROR(myBlobHandle->getNdbError());
306
307 /* Lowercase everything. */
308 for (Uint64 j= 0; j < length; j++)
309 buffer[j]= tolower(buffer[j]);
310
311 NdbOperation *myUpdateOp= myScanOp->updateCurrentTuple();
312 if (myUpdateOp == NULL)
313 APIERROR(myTrans->getNdbError());
314 NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text");
315 if (myBlobHandle2 == NULL)
316 APIERROR(myUpdateOp->getNdbError());
317 if (myBlobHandle2->setValue(buffer, length))
318 APIERROR(myBlobHandle2->getNdbError());
319
320 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
321 APIERROR(myTrans->getNdbError());
322 }
323
324 if (-1 == myTrans->execute(NdbTransaction::Commit))
325 APIERROR(myTrans->getNdbError());
326
327 myNdb->closeTransaction(myTrans);
328
329 return 1;
330 }
331
332
333 struct ActiveHookData {
334 char buffer[10000];
335 Uint32 readLength;
336 };
337
myFetchHook(NdbBlob * myBlobHandle,void * arg)338 int myFetchHook(NdbBlob* myBlobHandle, void* arg)
339 {
340 ActiveHookData *ahd= (ActiveHookData *)arg;
341
342 ahd->readLength= sizeof(ahd->buffer) - 1;
343 return myBlobHandle->readData(ahd->buffer, ahd->readLength);
344 }
345
fetch_key(Ndb * myNdb)346 int fetch_key(Ndb *myNdb)
347 {
348 /*
349 Fetch and show the blob field, using setActiveHook().
350 */
351 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
352 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
353 if (myTable == NULL)
354 APIERROR(myDict->getNdbError());
355
356 NdbTransaction *myTrans= myNdb->startTransaction();
357 if (myTrans == NULL)
358 APIERROR(myNdb->getNdbError());
359
360 NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
361 if (myNdbOperation == NULL)
362 APIERROR(myTrans->getNdbError());
363 myNdbOperation->readTuple();
364 myNdbOperation->equal("my_id", 1);
365 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
366 if (myBlobHandle == NULL)
367 APIERROR(myNdbOperation->getNdbError());
368 struct ActiveHookData ahd;
369 if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1)
370 APIERROR(myBlobHandle->getNdbError());
371
372 /*
373 Execute Commit, but calling our callback set up in setActiveHook()
374 before actually committing.
375 */
376 if (-1 == myTrans->execute(NdbTransaction::Commit))
377 APIERROR(myTrans->getNdbError());
378 myNdb->closeTransaction(myTrans);
379
380 /* Our fetch callback will have been called during the execute(). */
381
382 ahd.buffer[ahd.readLength]= '\0';
383 std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl;
384
385 return 1;
386 }
387
388
update2_key(Ndb * myNdb)389 int update2_key(Ndb *myNdb)
390 {
391 char buffer[10000];
392
393 /* Simple setValue() update. */
394 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
395 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
396 if (myTable == NULL)
397 APIERROR(myDict->getNdbError());
398
399 NdbTransaction *myTrans= myNdb->startTransaction();
400 if (myTrans == NULL)
401 APIERROR(myNdb->getNdbError());
402
403 NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
404 if (myNdbOperation == NULL)
405 APIERROR(myTrans->getNdbError());
406 myNdbOperation->updateTuple();
407 myNdbOperation->equal("my_id", 1);
408 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
409 if (myBlobHandle == NULL)
410 APIERROR(myNdbOperation->getNdbError());
411 memset(buffer, ' ', sizeof(buffer));
412 if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1)
413 APIERROR(myBlobHandle->getNdbError());
414
415 if (-1 == myTrans->execute(NdbTransaction::Commit))
416 APIERROR(myTrans->getNdbError());
417 myNdb->closeTransaction(myTrans);
418
419 return 1;
420 }
421
422
delete_key(Ndb * myNdb)423 int delete_key(Ndb *myNdb)
424 {
425 /* Deletion of blob row. */
426 const NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
427 const NdbDictionary::Table *myTable= myDict->getTable("api_blob");
428 if (myTable == NULL)
429 APIERROR(myDict->getNdbError());
430
431 NdbTransaction *myTrans= myNdb->startTransaction();
432 if (myTrans == NULL)
433 APIERROR(myNdb->getNdbError());
434
435 NdbOperation *myNdbOperation= myTrans->getNdbOperation(myTable);
436 if (myNdbOperation == NULL)
437 APIERROR(myTrans->getNdbError());
438 myNdbOperation->deleteTuple();
439 myNdbOperation->equal("my_id", 1);
440
441 if (-1 == myTrans->execute(NdbTransaction::Commit))
442 APIERROR(myTrans->getNdbError());
443 myNdb->closeTransaction(myTrans);
444
445 return 1;
446 }
447
448
main(int argc,char ** argv)449 int main(int argc, char**argv)
450 {
451 if (argc != 3)
452 {
453 std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
454 exit(-1);
455 }
456 char *mysqld_sock = argv[1];
457 const char *connectstring = argv[2];
458 ndb_init();
459 MYSQL mysql;
460
461 /* Connect to mysql server and create table. */
462 {
463 if ( !mysql_init(&mysql) ) {
464 std::cout << "mysql_init failed.\n";
465 exit(-1);
466 }
467 if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
468 0, mysqld_sock, 0) )
469 MYSQLERROR(mysql);
470
471 mysql_query(&mysql, "CREATE DATABASE ndb_examples");
472 if (mysql_query(&mysql, "USE ndb_examples") != 0)
473 MYSQLERROR(mysql);
474
475 create_table(mysql);
476 }
477
478 /* Connect to ndb cluster. */
479
480 Ndb_cluster_connection cluster_connection(connectstring);
481 if (cluster_connection.connect(4, 5, 1))
482 {
483 std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
484 exit(-1);
485 }
486 /* Optionally connect and wait for the storage nodes (ndbd's). */
487 if (cluster_connection.wait_until_ready(30,0) < 0)
488 {
489 std::cout << "Cluster was not ready within 30 secs.\n";
490 exit(-1);
491 }
492
493 Ndb myNdb(&cluster_connection,"ndb_examples");
494 if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
495 APIERROR(myNdb.getNdbError());
496 exit(-1);
497 }
498
499 if(populate(&myNdb) > 0)
500 std::cout << "populate: Success!" << std::endl;
501
502 if(update_key(&myNdb) > 0)
503 std::cout << "update_key: Success!" << std::endl;
504
505 if(update_scan(&myNdb) > 0)
506 std::cout << "update_scan: Success!" << std::endl;
507
508 if(fetch_key(&myNdb) > 0)
509 std::cout << "fetch_key: Success!" << std::endl;
510
511 if(update2_key(&myNdb) > 0)
512 std::cout << "update2_key: Success!" << std::endl;
513
514 if(delete_key(&myNdb) > 0)
515 std::cout << "delete_key: Success!" << std::endl;
516
517 return 0;
518 }
519