1 /*
2 Copyright (c) 2007, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 /*
27 ndbapi_blob_ndbrecord
28
29 Illustrates the manipulation of BLOB (actually TEXT in this example).
30 This example uses the NdbRecord style way of accessing tuples.
31
32 Shows insert, read, and update, using both inline value buffer and
33 read/write methods.
34 */
35
36 #ifdef _WIN32
37 #include <winsock2.h>
38 #endif
39 #include <mysql.h>
40 #include <mysqld_error.h>
41 #include <NdbApi.hpp>
42 /* Used for cout. */
43 #include <iostream>
44 #include <stdio.h>
45 #include <ctype.h>
46 #include <stdlib.h>
47 #include <stddef.h>
48 #include <string.h>
49
50 /**
51 * Helper debugging macros
52 */
53 #define PRINT_ERROR(code,msg) \
54 std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
55 << ", code: " << code \
56 << ", msg: " << msg << "." << std::endl
57 #define MYSQLERROR(mysql) { \
58 PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
59 exit(-1); }
60 #define APIERROR(error) { \
61 PRINT_ERROR(error.code,error.message); \
62 exit(-1); }
63
64 /* Quote taken from Project Gutenberg. */
65 const char *text_quote=
66 "Just at this moment, somehow or other, they began to run.\n"
67 "\n"
68 " Alice never could quite make out, in thinking it over\n"
69 "afterwards, how it was that they began: all she remembers is,\n"
70 "that they were running hand in hand, and the Queen went so fast\n"
71 "that it was all she could do to keep up with her: and still the\n"
72 "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n"
73 "go faster, though she had not breath left to say so.\n"
74 "\n"
75 " The most curious part of the thing was, that the trees and the\n"
76 "other things round them never changed their places at all:\n"
77 "however fast they went, they never seemed to pass anything. 'I\n"
78 "wonder if all the things move along with us?' thought poor\n"
79 "puzzled Alice. And the Queen seemed to guess her thoughts, for\n"
80 "she cried, 'Faster! Don't try to talk!'\n"
81 "\n"
82 " Not that Alice had any idea of doing THAT. She felt as if she\n"
83 "would never be able to talk again, she was getting so much out of\n"
84 "breath: and still the Queen cried 'Faster! Faster!' and dragged\n"
85 "her along. 'Are we nearly there?' Alice managed to pant out at\n"
86 "last.\n"
87 "\n"
88 " 'Nearly there!' the Queen repeated. 'Why, we passed it ten\n"
89 "minutes ago! Faster!' And they ran on for a time in silence,\n"
90 "with the wind whistling in Alice's ears, and almost blowing her\n"
91 "hair off her head, she fancied.\n"
92 "\n"
93 " 'Now! Now!' cried the Queen. 'Faster! Faster!' And they\n"
94 "went so fast that at last they seemed to skim through the air,\n"
95 "hardly touching the ground with their feet, till suddenly, just\n"
96 "as Alice was getting quite exhausted, they stopped, and she found\n"
97 "herself sitting on the ground, breathless and giddy.\n"
98 "\n"
99 " The Queen propped her up against a tree, and said kindly, 'You\n"
100 "may rest a little now.'\n"
101 "\n"
102 " Alice looked round her in great surprise. 'Why, I do believe\n"
103 "we've been under this tree the whole time! Everything's just as\n"
104 "it was!'\n"
105 "\n"
106 " 'Of course it is,' said the Queen, 'what would you have it?'\n"
107 "\n"
108 " 'Well, in OUR country,' said Alice, still panting a little,\n"
109 "'you'd generally get to somewhere else--if you ran very fast\n"
110 "for a long time, as we've been doing.'\n"
111 "\n"
112 " 'A slow sort of country!' said the Queen. 'Now, HERE, you see,\n"
113 "it takes all the running YOU can do, to keep in the same place.\n"
114 "If you want to get somewhere else, you must run at least twice as\n"
115 "fast as that!'\n"
116 "\n"
117 " 'I'd rather not try, please!' said Alice. 'I'm quite content\n"
118 "to stay here--only I AM so hot and thirsty!'\n"
119 "\n"
120 " -- Lewis Carroll, 'Through the Looking-Glass'.";
121
122 /* NdbRecord objects. */
123
124 const NdbRecord *key_record; // For specifying table key
125 const NdbRecord *blob_record; // For accessing blob
126 const NdbRecord *full_record; // All columns, for insert
127
128 /* C struct representing the row layout */
129 struct MyRow
130 {
131 unsigned int myId;
132
133 /* Pointer to Blob handle for operations on the blob column
134 * Space must be left for it in the row, but a pointer to the
135 * blob handle can also be obtained via calls to
136 * NdbOperation::getBlobHandle()
137 */
138 NdbBlob* myText;
139 };
140
setup_records(Ndb * myNdb)141 static void setup_records(Ndb *myNdb)
142 {
143 NdbDictionary::RecordSpecification spec[2];
144
145 NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
146 const NdbDictionary::Table *myTable= myDict->getTable("api_blob_ndbrecord");
147 if (myTable == NULL)
148 APIERROR(myDict->getNdbError());
149 const NdbDictionary::Column *col1= myTable->getColumn("my_id");
150 if (col1 == NULL)
151 APIERROR(myDict->getNdbError());
152 const NdbDictionary::Column *col2= myTable->getColumn("my_text");
153 if (col2 == NULL)
154 APIERROR(myDict->getNdbError());
155
156 spec[0].column= col1;
157 spec[0].offset= offsetof(MyRow, myId);
158 spec[0].nullbit_byte_offset= 0;
159 spec[0].nullbit_bit_in_byte= 0;
160 spec[1].column= col2;
161 spec[1].offset= offsetof(MyRow, myText);
162 spec[1].nullbit_byte_offset= 0;
163 spec[1].nullbit_bit_in_byte= 0;
164
165 key_record= myDict->createRecord(myTable, &spec[0], 1, sizeof(spec[0]));
166 if (key_record == NULL)
167 APIERROR(myDict->getNdbError());
168 blob_record= myDict->createRecord(myTable, &spec[1], 1, sizeof(spec[0]));
169 if (blob_record == NULL)
170 APIERROR(myDict->getNdbError());
171 full_record= myDict->createRecord(myTable, &spec[0], 2, sizeof(spec[0]));
172 if (full_record == NULL)
173 APIERROR(myDict->getNdbError());
174 }
175
176 /*
177 Function to drop table.
178 */
drop_table(MYSQL & mysql)179 void drop_table(MYSQL &mysql)
180 {
181 if (mysql_query(&mysql, "DROP TABLE api_blob_ndbrecord"))
182 MYSQLERROR(mysql);
183 }
184
185
186 /*
187 Functions to create table.
188 */
try_create_table(MYSQL & mysql)189 int try_create_table(MYSQL &mysql)
190 {
191 return mysql_query(&mysql,
192 "CREATE TABLE"
193 " api_blob_ndbrecord"
194 " (my_id INT UNSIGNED NOT NULL,"
195 " my_text TEXT NOT NULL,"
196 " PRIMARY KEY USING HASH (my_id))"
197 " ENGINE=NDB");
198 }
199
create_table(MYSQL & mysql)200 void create_table(MYSQL &mysql)
201 {
202 if (try_create_table(mysql))
203 {
204 if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
205 MYSQLERROR(mysql);
206 std::cout << "MySQL Cluster already has example table: api_blob_ndbrecord. "
207 << "Dropping it..." << std::endl;
208 /******************
209 * Recreate table *
210 ******************/
211 drop_table(mysql);
212 if (try_create_table(mysql))
213 MYSQLERROR(mysql);
214 }
215 }
216
populate(Ndb * myNdb)217 int populate(Ndb *myNdb)
218 {
219 MyRow row;
220
221 NdbTransaction *myTrans= myNdb->startTransaction();
222 if (myTrans == NULL)
223 APIERROR(myNdb->getNdbError());
224
225 row.myId= 1;
226 const NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, (const char*) &row);
227 if (myNdbOperation == NULL)
228 APIERROR(myTrans->getNdbError());
229
230 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
231 if (myBlobHandle == NULL)
232 APIERROR(myNdbOperation->getNdbError());
233 myBlobHandle->setValue(text_quote, strlen(text_quote));
234
235 int check= myTrans->execute(NdbTransaction::Commit);
236 myTrans->close();
237 return check != -1;
238 }
239
240
update_key(Ndb * myNdb)241 int update_key(Ndb *myNdb)
242 {
243 MyRow row;
244
245 /*
246 Uppercase all characters in TEXT field, using primary key operation.
247 Use piece-wise read/write to avoid loading entire data into memory
248 at once.
249 */
250
251 NdbTransaction *myTrans= myNdb->startTransaction();
252 if (myTrans == NULL)
253 APIERROR(myNdb->getNdbError());
254
255 row.myId= 1;
256
257 const NdbOperation *myNdbOperation=
258 myTrans->updateTuple(key_record,
259 (const char*) &row,
260 blob_record,
261 (const char*) &row);
262 if (myNdbOperation == NULL)
263 APIERROR(myTrans->getNdbError());
264
265 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
266 if (myBlobHandle == NULL)
267 APIERROR(myNdbOperation->getNdbError());
268
269 /* Execute NoCommit to make the blob handle active so
270 * that we can determine the actual Blob length
271 */
272 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
273 APIERROR(myTrans->getNdbError());
274
275 Uint64 length= 0;
276 if (-1 == myBlobHandle->getLength(length))
277 APIERROR(myBlobHandle->getNdbError());
278
279 /*
280 A real application should use a much larger chunk size for
281 efficiency, preferably much larger than the part size, which
282 defaults to 2000. 64000 might be a good value.
283 */
284 #define CHUNK_SIZE 100
285 int chunk;
286 char buffer[CHUNK_SIZE];
287 for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--)
288 {
289 Uint64 pos= chunk*CHUNK_SIZE;
290 Uint32 chunk_length= CHUNK_SIZE;
291 if (pos + chunk_length > length)
292 chunk_length= length - pos;
293
294 /* Read from the end back, to illustrate seeking. */
295 if (-1 == myBlobHandle->setPos(pos))
296 APIERROR(myBlobHandle->getNdbError());
297 if (-1 == myBlobHandle->readData(buffer, chunk_length))
298 APIERROR(myBlobHandle->getNdbError());
299 int res= myTrans->execute(NdbTransaction::NoCommit);
300 if (-1 == res)
301 APIERROR(myTrans->getNdbError());
302
303 /* Uppercase everything. */
304 for (Uint64 j= 0; j < chunk_length; j++)
305 buffer[j]= toupper(buffer[j]);
306
307 if (-1 == myBlobHandle->setPos(pos))
308 APIERROR(myBlobHandle->getNdbError());
309 if (-1 == myBlobHandle->writeData(buffer, chunk_length))
310 APIERROR(myBlobHandle->getNdbError());
311 /* Commit on the final update. */
312 if (-1 == myTrans->execute(chunk ?
313 NdbTransaction::NoCommit :
314 NdbTransaction::Commit))
315 APIERROR(myTrans->getNdbError());
316 }
317
318 myNdb->closeTransaction(myTrans);
319
320 return 1;
321 }
322
323
update_scan(Ndb * myNdb)324 int update_scan(Ndb *myNdb)
325 {
326 /*
327 Lowercase all characters in TEXT field, using a scan with
328 updateCurrentTuple().
329 */
330 char buffer[10000];
331
332 NdbTransaction *myTrans= myNdb->startTransaction();
333 if (myTrans == NULL)
334 APIERROR(myNdb->getNdbError());
335
336 NdbScanOperation *myScanOp=
337 myTrans->scanTable(blob_record, NdbOperation::LM_Exclusive);
338 if (myScanOp == NULL)
339 APIERROR(myTrans->getNdbError());
340 NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text");
341 if (myBlobHandle == NULL)
342 APIERROR(myScanOp->getNdbError());
343 if (myBlobHandle->getValue(buffer, sizeof(buffer)))
344 APIERROR(myBlobHandle->getNdbError());
345
346 /* Start the scan. */
347 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
348 APIERROR(myTrans->getNdbError());
349
350 const MyRow *out_row;
351 int res;
352 for (;;)
353 {
354 res= myScanOp->nextResult((const char**)&out_row, true, false);
355 if (res==1)
356 break; // Scan done.
357 else if (res)
358 APIERROR(myScanOp->getNdbError());
359
360 Uint64 length= 0;
361 if (myBlobHandle->getLength(length) == -1)
362 APIERROR(myBlobHandle->getNdbError());
363
364 /* Lowercase everything. */
365 for (Uint64 j= 0; j < length; j++)
366 buffer[j]= tolower(buffer[j]);
367
368 /* 'Take over' the row locks from the scan to a separate
369 * operation for updating the tuple
370 */
371 const NdbOperation *myUpdateOp=
372 myScanOp->updateCurrentTuple(myTrans,
373 blob_record,
374 (const char*)out_row);
375 if (myUpdateOp == NULL)
376 APIERROR(myTrans->getNdbError());
377 NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text");
378 if (myBlobHandle2 == NULL)
379 APIERROR(myUpdateOp->getNdbError());
380 if (myBlobHandle2->setValue(buffer, length))
381 APIERROR(myBlobHandle2->getNdbError());
382
383 if (-1 == myTrans->execute(NdbTransaction::NoCommit))
384 APIERROR(myTrans->getNdbError());
385 }
386
387 if (-1 == myTrans->execute(NdbTransaction::Commit))
388 APIERROR(myTrans->getNdbError());
389
390 myNdb->closeTransaction(myTrans);
391
392 return 1;
393 }
394
395
396 struct ActiveHookData {
397 char buffer[10000];
398 Uint32 readLength;
399 };
400
myFetchHook(NdbBlob * myBlobHandle,void * arg)401 int myFetchHook(NdbBlob* myBlobHandle, void* arg)
402 {
403 ActiveHookData *ahd= (ActiveHookData *)arg;
404
405 ahd->readLength= sizeof(ahd->buffer) - 1;
406 return myBlobHandle->readData(ahd->buffer, ahd->readLength);
407 }
408
fetch_key(Ndb * myNdb)409 int fetch_key(Ndb *myNdb)
410 {
411 /* Fetch a blob without specifying how many bytes
412 * to read up front, in one execution using
413 * the 'ActiveHook' mechanism.
414 * The supplied ActiveHook procedure is called when
415 * the Blob handle becomes 'active'. At that point
416 * the length of the Blob can be obtained, and buffering
417 * arranged, and the data read requested.
418 */
419
420 /* Separate rows used to specify key and hold result */
421 MyRow key_row;
422 MyRow out_row;
423
424 /*
425 Fetch and show the blob field, using setActiveHook().
426 */
427
428 NdbTransaction *myTrans= myNdb->startTransaction();
429 if (myTrans == NULL)
430 APIERROR(myNdb->getNdbError());
431
432 key_row.myId= 1;
433 out_row.myText= NULL;
434 const NdbOperation *myNdbOperation=
435 myTrans->readTuple(key_record,
436 (const char*) &key_row,
437 blob_record,
438 (char*) &out_row);
439 if (myNdbOperation == NULL)
440 APIERROR(myTrans->getNdbError());
441
442 /* This time, we'll get the blob handle from the row, because
443 * we can. Alternatively, we could use the normal mechanism
444 * of calling getBlobHandle().
445 */
446 NdbBlob *myBlobHandle= out_row.myText;
447 if (myBlobHandle == NULL)
448 APIERROR(myNdbOperation->getNdbError());
449 struct ActiveHookData ahd;
450 if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1)
451 APIERROR(myBlobHandle->getNdbError());
452
453 /*
454 Execute Commit, but calling our callback set up in setActiveHook()
455 before actually committing.
456 */
457 if (-1 == myTrans->execute(NdbTransaction::Commit))
458 APIERROR(myTrans->getNdbError());
459 myNdb->closeTransaction(myTrans);
460
461 /* Our fetch callback will have been called during the execute(). */
462
463 ahd.buffer[ahd.readLength]= '\0';
464 std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl;
465
466 return 1;
467 }
468
469
update2_key(Ndb * myNdb)470 int update2_key(Ndb *myNdb)
471 {
472 char buffer[10000];
473 MyRow row;
474
475 /* Simple setValue() update specified before the
476 * Blob handle is made active
477 */
478
479 NdbTransaction *myTrans= myNdb->startTransaction();
480 if (myTrans == NULL)
481 APIERROR(myNdb->getNdbError());
482
483 row.myId= 1;
484 const NdbOperation *myNdbOperation=
485 myTrans->updateTuple(key_record,
486 (const char*)&row,
487 blob_record,
488 (char*) &row);
489 if (myNdbOperation == NULL)
490 APIERROR(myTrans->getNdbError());
491 NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
492 if (myBlobHandle == NULL)
493 APIERROR(myNdbOperation->getNdbError());
494 memset(buffer, ' ', sizeof(buffer));
495 if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1)
496 APIERROR(myBlobHandle->getNdbError());
497
498 if (-1 == myTrans->execute(NdbTransaction::Commit))
499 APIERROR(myTrans->getNdbError());
500 myNdb->closeTransaction(myTrans);
501
502 return 1;
503 }
504
505
delete_key(Ndb * myNdb)506 int delete_key(Ndb *myNdb)
507 {
508 MyRow row;
509
510 /* Deletion of row containing blob via primary key. */
511
512 NdbTransaction *myTrans= myNdb->startTransaction();
513 if (myTrans == NULL)
514 APIERROR(myNdb->getNdbError());
515
516 row.myId= 1;
517 const NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record,
518 (const char*)&row,
519 full_record);
520 if (myNdbOperation == NULL)
521 APIERROR(myTrans->getNdbError());
522
523 if (-1 == myTrans->execute(NdbTransaction::Commit))
524 APIERROR(myTrans->getNdbError());
525 myNdb->closeTransaction(myTrans);
526
527 return 1;
528 }
529
530
main(int argc,char ** argv)531 int main(int argc, char**argv)
532 {
533 if (argc != 3)
534 {
535 std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
536 exit(-1);
537 }
538 char *mysqld_sock = argv[1];
539 const char *connectstring = argv[2];
540 ndb_init();
541 MYSQL mysql;
542
543 /* Connect to mysql server and create table. */
544 {
545 if ( !mysql_init(&mysql) ) {
546 std::cout << "mysql_init failed.\n";
547 exit(-1);
548 }
549 if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
550 0, mysqld_sock, 0) )
551 MYSQLERROR(mysql);
552
553 mysql_query(&mysql, "CREATE DATABASE ndb_examples");
554 if (mysql_query(&mysql, "USE ndb_examples") != 0)
555 MYSQLERROR(mysql);
556
557 create_table(mysql);
558 }
559
560 /* Connect to ndb cluster. */
561
562 Ndb_cluster_connection cluster_connection(connectstring);
563 if (cluster_connection.connect(4, 5, 1))
564 {
565 std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
566 exit(-1);
567 }
568 /* Optionally connect and wait for the storage nodes (ndbd's). */
569 if (cluster_connection.wait_until_ready(30,0) < 0)
570 {
571 std::cout << "Cluster was not ready within 30 secs.\n";
572 exit(-1);
573 }
574
575 Ndb myNdb(&cluster_connection,"ndb_examples");
576 if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
577 APIERROR(myNdb.getNdbError());
578 exit(-1);
579 }
580
581 setup_records(&myNdb);
582
583 if(populate(&myNdb) > 0)
584 std::cout << "populate: Success!" << std::endl;
585
586 if(update_key(&myNdb) > 0)
587 std::cout << "update_key: Success!" << std::endl;
588
589 if(update_scan(&myNdb) > 0)
590 std::cout << "update_scan: Success!" << std::endl;
591
592 if(fetch_key(&myNdb) > 0)
593 std::cout << "fetch_key: Success!" << std::endl;
594
595 if(update2_key(&myNdb) > 0)
596 std::cout << "update2_key: Success!" << std::endl;
597
598 if(delete_key(&myNdb) > 0)
599 std::cout << "delete_key: Success!" << std::endl;
600
601 return 0;
602 }
603