1 /* Copyright (c) 2004, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /*
24
25 MySQL Federated Storage Engine
26
27 ha_federated.cc - MySQL Federated Storage Engine
28 Patrick Galbraith and Brian Aker, 2004
29
30 This is a handler which uses a foreign database as the data file, as
31 opposed to a handler like MyISAM, which uses .MYD files locally.
32
33 How this handler works
34 ----------------------------------
35 Normal database files are local and as such: You create a table called
36 'users', a file such as 'users.MYD' is created. A handler reads, inserts,
37 deletes, updates data in this file. The data is stored in particular format,
38 so to read, that data has to be parsed into fields, to write, fields have to
39 be stored in this format to write to this data file.
40
41 With MySQL Federated storage engine, there will be no local files
42 for each table's data (such as .MYD). A foreign database will store
43 the data that would normally be in this file. This will necessitate
44 the use of MySQL client API to read, delete, update, insert this
45 data. The data will have to be retrieve via an SQL call "SELECT *
46 FROM users". Then, to read this data, it will have to be retrieved
47 via mysql_fetch_row one row at a time, then converted from the
48 column in this select into the format that the handler expects.
49
50 The create table will simply create the .frm file, and within the
51 "CREATE TABLE" SQL, there SHALL be any of the following :
52
53 connection=scheme://username:password@hostname:port/database/tablename
54 connection=scheme://username@hostname/database/tablename
55 connection=scheme://username:password@hostname/database/tablename
56 connection=scheme://username:password@hostname/database/tablename
57
58 - OR -
59
60 As of 5.1 (See worklog #3031), federated now allows you to use a non-url
61 format, taking advantage of mysql.servers:
62
63 connection="connection_one"
64 connection="connection_one/table_foo"
65
66 An example would be:
67
68 connection=mysql://username:password@hostname:port/database/tablename
69
70 or, if we had:
71
72 create server 'server_one' foreign data wrapper 'mysql' options
73 (HOST '127.0.0.1',
74 DATABASE 'db1',
75 USER 'root',
76 PASSWORD '',
77 PORT 3306,
78 SOCKET '',
79 OWNER 'root');
80
81 CREATE TABLE federated.t1 (
82 `id` int(20) NOT NULL,
83 `name` varchar(64) NOT NULL default ''
84 )
85 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
86 CONNECTION='server_one';
87
88 So, this will have been the equivalent of
89
90 CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
91
92 Then, we can also change the server to point to a new schema:
93
94 ALTER SERVER 'server_one' options(DATABASE 'db2');
95
96 All subsequent calls will now be against db2.t1! Guess what? You don't
97 have to perform an alter table!
98
99 This connecton="connection string" is necessary for the handler to be
100 able to connect to the foreign server, either by URL, or by server
101 name.
102
103
104 The basic flow is this:
105
106 SQL calls issues locally ->
107 mysql handler API (data in handler format) ->
108 mysql client API (data converted to SQL calls) ->
109 foreign database -> mysql client API ->
110 convert result sets (if any) to handler format ->
111 handler API -> results or rows affected to local
112
113 What this handler does and doesn't support
114 ------------------------------------------
115 * Tables MUST be created on the foreign server prior to any action on those
116 tables via the handler, first version. IMPORTANT: IF you MUST use the
117 federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
118 the table you connect to IS NOT a table pointing BACK to your ORIGNAL
119 table! You know and have heard the screaching of audio feedback? You
120 know putting two mirror in front of each other how the reflection
121 continues for eternity? Well, need I say more?!
122 * There will not be support for transactions.
123 * There is no way for the handler to know if the foreign database or table
124 has changed. The reason for this is that this database has to work like a
125 data file that would never be written to by anything other than the
126 database. The integrity of the data in the local table could be breached
127 if there was any change to the foreign database.
128 * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
129 * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
130 * Prepared statements will not be used in the first implementation, it
131 remains to to be seen whether the limited subset of the client API for the
132 server supports this.
133 * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
134 implementation.
135 * This will not work with the query cache.
136
137 Method calls
138
139 A two column table, with one record:
140
141 (SELECT)
142
143 "SELECT * FROM foo"
144 ha_federated::info
145 ha_federated::scan_time:
146 ha_federated::rnd_init: share->select_query SELECT * FROM foo
147 ha_federated::extra
148
149 <for every row of data retrieved>
150 ha_federated::rnd_next
151 ha_federated::convert_row_to_internal_format
152 ha_federated::rnd_next
153 </for every row of data retrieved>
154
155 ha_federated::rnd_end
156 ha_federated::extra
157 ha_federated::reset
158
159 (INSERT)
160
161 "INSERT INTO foo (id, ts) VALUES (2, now());"
162
163 ha_federated::write_row
164
165 ha_federated::reset
166
167 (UPDATE)
168
169 "UPDATE foo SET ts = now() WHERE id = 1;"
170
171 ha_federated::index_init
172 ha_federated::index_read
173 ha_federated::index_read_idx
174 ha_federated::rnd_next
175 ha_federated::convert_row_to_internal_format
176 ha_federated::update_row
177
178 ha_federated::extra
179 ha_federated::extra
180 ha_federated::extra
181 ha_federated::external_lock
182 ha_federated::reset
183
184
185 How do I use this handler?
186 --------------------------
187 First of all, you need to build this storage engine:
188
189 ./configure --with-federated-storage-engine
190 make
191
192 Next, to use this handler, it's very simple. You must
193 have two databases running, either both on the same host, or
194 on different hosts.
195
196 One the server that will be connecting to the foreign
197 host (client), you create your table as such:
198
199 CREATE TABLE test_table (
200 id int(20) NOT NULL auto_increment,
201 name varchar(32) NOT NULL default '',
202 other int(20) NOT NULL default '0',
203 PRIMARY KEY (id),
204 KEY name (name),
205 KEY other_key (other))
206 ENGINE="FEDERATED"
207 DEFAULT CHARSET=latin1
208 CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
209
210 Notice the "COMMENT" and "ENGINE" field? This is where you
211 respectively set the engine type, "FEDERATED" and foreign
212 host information, this being the database your 'client' database
213 will connect to and use as the "data file". Obviously, the foreign
214 database is running on port 9306, so you want to start up your other
215 database so that it is indeed on port 9306, and your federated
216 database on a port other than that. In my setup, I use port 5554
217 for federated, and port 5555 for the foreign database.
218
219 Then, on the foreign database:
220
221 CREATE TABLE test_table (
222 id int(20) NOT NULL auto_increment,
223 name varchar(32) NOT NULL default '',
224 other int(20) NOT NULL default '0',
225 PRIMARY KEY (id),
226 KEY name (name),
227 KEY other_key (other))
228 ENGINE="<NAME>" <-- whatever you want, or not specify
229 DEFAULT CHARSET=latin1 ;
230
231 This table is exactly the same (and must be exactly the same),
232 except that it is not using the federated handler and does
233 not need the URL.
234
235
236 How to see the handler in action
237 --------------------------------
238
239 When developing this handler, I compiled the federated database with
240 debugging:
241
242 ./configure --with-federated-storage-engine
243 --prefix=/home/mysql/mysql-build/federated/ --with-debug
244
245 Once compiled, I did a 'make install' (not for the purpose of installing
246 the binary, but to install all the files the binary expects to see in the
247 diretory I specified in the build with --prefix,
248 "/home/mysql/mysql-build/federated".
249
250 Then, I started the foreign server:
251
252 /usr/local/mysql/bin/mysqld_safe
253 --user=mysql --log=/tmp/mysqld.5555.log -P 5555
254
255 Then, I went back to the directory containing the newly compiled mysqld,
256 <builddir>/sql/, started up gdb:
257
258 gdb ./mysqld
259
260 Then, withn the (gdb) prompt:
261 (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug
262
263 Next, I open several windows for each:
264
265 1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
266 2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
267 3. A window with a client open to the federated server on port 5554
268 4. A window with a client open to the federated server on port 5555
269
270 I would create a table on the client to the foreign server on port
271 5555, and then to the federated server on port 5554. At this point,
272 I would run whatever queries I wanted to on the federated server,
273 just always remembering that whatever changes I wanted to make on
274 the table, or if I created new tables, that I would have to do that
275 on the foreign server.
276
277 Another thing to look for is 'show variables' to show you that you have
278 support for federated handler support:
279
280 show variables like '%federat%'
281
282 and:
283
284 show storage engines;
285
286 Both should display the federated storage handler.
287
288
289 Testing
290 -------
291
292 There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
293 federatedd.test It starts both a slave and master database using
294 the same setup that the replication tests use, with the exception that
295 it turns off replication, and sets replication to ignore the test tables.
296 After ensuring that you actually do have support for the federated storage
297 handler, numerous queries/inserts/updates/deletes are run, many derived
298 from the MyISAM tests, plus som other tests which were meant to reveal
299 any issues that would be most likely to affect this handler. All tests
300 should work! ;)
301
302 To run these tests, go into ./mysql-test (based in the directory you
303 built the server in)
304
305 ./mysql-test-run federated
306
307 To run the test, or if you want to run the test and have debug info:
308
309 ./mysql-test-run --debug federated
310
311 This will run the test in debug mode, and you can view the trace and
312 log files in the ./mysql-test/var/log directory
313
314 ls -l mysql-test/var/log/
315 -rw-r--r-- 1 patg patg 17 4 Dec 12:27 current_test
316 -rw-r--r-- 1 patg patg 692 4 Dec 12:52 manager.log
317 -rw-rw---- 1 patg patg 21246 4 Dec 12:51 master-bin.000001
318 -rw-rw---- 1 patg patg 68 4 Dec 12:28 master-bin.index
319 -rw-r--r-- 1 patg patg 1620 4 Dec 12:51 master.err
320 -rw-rw---- 1 patg patg 23179 4 Dec 12:51 master.log
321 -rw-rw---- 1 patg patg 16696550 4 Dec 12:51 master.trace
322 -rw-r--r-- 1 patg patg 0 4 Dec 12:28 mysqltest-time
323 -rw-r--r-- 1 patg patg 2024051 4 Dec 12:51 mysqltest.trace
324 -rw-rw---- 1 patg patg 94992 4 Dec 12:51 slave-bin.000001
325 -rw-rw---- 1 patg patg 67 4 Dec 12:28 slave-bin.index
326 -rw-rw---- 1 patg patg 249 4 Dec 12:52 slave-relay-bin.000003
327 -rw-rw---- 1 patg patg 73 4 Dec 12:28 slave-relay-bin.index
328 -rw-r--r-- 1 patg patg 1349 4 Dec 12:51 slave.err
329 -rw-rw---- 1 patg patg 96206 4 Dec 12:52 slave.log
330 -rw-rw---- 1 patg patg 15706355 4 Dec 12:51 slave.trace
331 -rw-r--r-- 1 patg patg 0 4 Dec 12:51 warnings
332
333 Of course, again, you can tail the trace log:
334
335 tail -f mysql-test/var/log/master.trace |grep ha_fed
336
337 As well as the slave query log:
338
339 tail -f mysql-test/var/log/slave.log
340
341 Files that comprise the test suit
342 ---------------------------------
343 mysql-test/t/federated.test
344 mysql-test/r/federated.result
345 mysql-test/r/have_federated_db.require
346 mysql-test/include/have_federated_db.inc
347
348
349 Other tidbits
350 -------------
351
352 These were the files that were modified or created for this
353 Federated handler to work, in 5.0:
354
355 ./configure.in
356 ./sql/Makefile.am
357 ./config/ac_macros/ha_federated.m4
358 ./sql/handler.cc
359 ./sql/mysqld.cc
360 ./sql/set_var.cc
361 ./sql/field.h
362 ./sql/sql_string.h
363 ./mysql-test/mysql-test-run(.sh)
364 ./mysql-test/t/federated.test
365 ./mysql-test/r/federated.result
366 ./mysql-test/r/have_federated_db.require
367 ./mysql-test/include/have_federated_db.inc
368 ./sql/ha_federated.cc
369 ./sql/ha_federated.h
370
371 In 5.1
372
373 my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
374 CMakeLists.txt Makefile.in ha_federated.h plug.in
375 Makefile SCCS libfederated.a
376 Makefile.am ha_federated.cc libfederated_a-ha_federated.o
377
378 */
379
380
381 #define MYSQL_SERVER 1
382 #include "sql_servers.h" // FOREIGN_SERVER, get_server_by_name
383 #include "sql_class.h" // SSV
384 #include "sql_analyse.h" // append_escaped
385 #include <mysql/plugin.h>
386
387 #include "ha_federated.h"
388 #include "probes_mysql.h"
389
390 #include "m_string.h"
391 #include "key.h" // key_copy
392 #include "myisam.h" // TT_USEFRM
393
394 #include <mysql/plugin.h>
395
396 #include <algorithm>
397
398 using std::min;
399 using std::max;
400
401 /* Variables for federated share methods */
402 static HASH federated_open_tables; // To track open tables
403 mysql_mutex_t federated_mutex; // To init the hash
404 static char ident_quote_char= '`'; // Character for quoting
405 // identifiers
406 static char value_quote_char= '\''; // Character for quoting
407 // literals
408 static const int bulk_padding= 64; // bytes "overhead" in packet
409
410 /* Variables used when chopping off trailing characters */
411 static const uint sizeof_trailing_comma= sizeof(", ") - 1;
412 static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
413 static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
414
415 /* Static declaration for handerton */
416 static handler *federated_create_handler(handlerton *hton,
417 TABLE_SHARE *table,
418 MEM_ROOT *mem_root);
419 static int federated_commit(handlerton *hton, THD *thd, bool all);
420 static int federated_rollback(handlerton *hton, THD *thd, bool all);
421
422 /* Federated storage engine handlerton */
423
federated_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)424 static handler *federated_create_handler(handlerton *hton,
425 TABLE_SHARE *table,
426 MEM_ROOT *mem_root)
427 {
428 return new (mem_root) ha_federated(hton, table);
429 }
430
431
432 /* Function we use in the creation of our hash to get key */
433
federated_get_key(FEDERATED_SHARE * share,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))434 static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
435 my_bool not_used MY_ATTRIBUTE ((unused)))
436 {
437 *length= share->share_key_length;
438 return (uchar*) share->share_key;
439 }
440
441 static PSI_memory_key fe_key_memory_federated_share;
442
443 #ifdef HAVE_PSI_INTERFACE
444 static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;
445
446 static PSI_mutex_info all_federated_mutexes[]=
447 {
448 { &fe_key_mutex_federated, "federated", PSI_FLAG_GLOBAL},
449 { &fe_key_mutex_FEDERATED_SHARE_mutex, "FEDERATED_SHARE::mutex", 0}
450 };
451
452 static PSI_memory_info all_federated_memory[]=
453 {
454 { &fe_key_memory_federated_share, "FEDERATED_SHARE", PSI_FLAG_GLOBAL}
455 };
456
init_federated_psi_keys(void)457 static void init_federated_psi_keys(void)
458 {
459 const char* category= "federated";
460 int count;
461
462 count= array_elements(all_federated_mutexes);
463 mysql_mutex_register(category, all_federated_mutexes, count);
464
465 count= array_elements(all_federated_memory);
466 mysql_memory_register(category, all_federated_memory, count);
467 }
468 #endif /* HAVE_PSI_INTERFACE */
469
470 /*
471 Initialize the federated handler.
472
473 SYNOPSIS
474 federated_db_init()
475 p Handlerton
476
477 RETURN
478 FALSE OK
479 TRUE Error
480 */
481
federated_db_init(void * p)482 int federated_db_init(void *p)
483 {
484 DBUG_ENTER("federated_db_init");
485
486 #ifdef HAVE_PSI_INTERFACE
487 init_federated_psi_keys();
488 #endif /* HAVE_PSI_INTERFACE */
489
490 handlerton *federated_hton= (handlerton *)p;
491 federated_hton->state= SHOW_OPTION_YES;
492 federated_hton->db_type= DB_TYPE_FEDERATED_DB;
493 federated_hton->commit= federated_commit;
494 federated_hton->rollback= federated_rollback;
495 federated_hton->create= federated_create_handler;
496 federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION |
497 HTON_SUPPORTS_ONLINE_BACKUPS;
498
499 /*
500 Support for transactions disabled until WL#2952 fixes it.
501 We do it like this to avoid "defined but not used" compiler warnings.
502 */
503 federated_hton->commit= 0;
504 federated_hton->rollback= 0;
505
506 if (mysql_mutex_init(fe_key_mutex_federated,
507 &federated_mutex, MY_MUTEX_INIT_FAST))
508 goto error;
509 if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
510 (my_hash_get_key) federated_get_key, 0, 0,
511 fe_key_memory_federated_share))
512 {
513 DBUG_RETURN(FALSE);
514 }
515
516 mysql_mutex_destroy(&federated_mutex);
517 error:
518 DBUG_RETURN(TRUE);
519 }
520
521
522 /*
523 Release the federated handler.
524
525 SYNOPSIS
526 federated_db_end()
527
528 RETURN
529 FALSE OK
530 */
531
federated_done(void * p)532 int federated_done(void *p)
533 {
534 my_hash_free(&federated_open_tables);
535 mysql_mutex_destroy(&federated_mutex);
536
537 return 0;
538 }
539
540
541 /**
542 @brief Append identifiers to the string.
543
544 @param[in,out] string The target string.
545 @param[in] name Identifier name
546 @param[in] length Length of identifier name in bytes
547 @param[in] quote_char Quote char to use for quoting identifier.
548
549 @return Operation Status
550 @retval FALSE OK
551 @retval TRUE There was an error appending to the string.
552
553 @note This function is based upon the append_identifier() function
554 in sql_show.cc except that quoting always occurs.
555 */
556
append_ident(String * string,const char * name,size_t length,const char quote_char)557 static bool append_ident(String *string, const char *name, size_t length,
558 const char quote_char)
559 {
560 bool result= true;
561 DBUG_ENTER("append_ident");
562
563 if (quote_char)
564 {
565 string->reserve(length * 2 + 2);
566
567 if ((result= string->append("e_char, 1, system_charset_info)))
568 goto err;
569
570 uint clen= 0;
571
572 for (const char *name_end= name + length; name < name_end; name+= clen)
573 {
574 char c= *name;
575
576 if (!(clen= my_mbcharlen(system_charset_info, c)))
577 goto err;
578
579 if (clen == 1 && c == quote_char &&
580 (result= string->append("e_char, 1, system_charset_info)))
581 goto err;
582
583 if ((result= string->append(name, clen, string->charset())))
584 goto err;
585 }
586 result= string->append("e_char, 1, system_charset_info);
587 }
588 else
589 result= string->append(name, length, system_charset_info);
590
591 err:
592 DBUG_RETURN(result);
593 }
594
595
parse_url_error(FEDERATED_SHARE * share,TABLE * table,int error_num)596 static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
597 {
598 char buf[FEDERATED_QUERY_BUFFER_SIZE];
599 size_t buf_len;
600 DBUG_ENTER("ha_federated parse_url_error");
601
602 buf_len= min<size_t>(table->s->connect_string.length,
603 FEDERATED_QUERY_BUFFER_SIZE-1);
604 strmake(buf, table->s->connect_string.str, buf_len);
605 my_error(error_num, MYF(0), buf);
606 DBUG_RETURN(error_num);
607 }
608
609 /*
610 retrieve server object which contains server meta-data
611 from the system table given a server's name, set share
612 connection parameter members
613 */
get_connection(MEM_ROOT * mem_root,FEDERATED_SHARE * share)614 int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
615 {
616 int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
617 FOREIGN_SERVER *server, server_buffer;
618 DBUG_ENTER("ha_federated::get_connection");
619
620 /*
621 get_server_by_name() clones the server if exists and allocates
622 copies of strings in the supplied mem_root
623 */
624 if (!(server=
625 get_server_by_name(mem_root, share->connection_string, &server_buffer)))
626 {
627 DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
628 error_num= ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE;
629 goto error;
630 }
631 DBUG_PRINT("info", ("get_server_by_name returned server at %lx",
632 (long unsigned int) server));
633
634 /*
635 Most of these should never be empty strings, error handling will
636 need to be implemented. Also, is this the best way to set the share
637 members? Is there some allocation needed? In running this code, it works
638 except there are errors in the trace file of the share being overrun
639 at the address of the share.
640 */
641 share->server_name_length= server->server_name_length;
642 share->server_name= server->server_name;
643 share->username= server->username;
644 share->password= server->password;
645 share->database= server->db;
646 share->port= server->port > 0 && server->port < 65536 ?
647 (ushort) server->port : MYSQL_PORT;
648 share->hostname= server->host;
649 if (!(share->socket= server->socket) &&
650 !strcmp(share->hostname, my_localhost))
651 share->socket= (char *) MYSQL_UNIX_ADDR;
652 share->scheme= server->scheme;
653
654 DBUG_PRINT("info", ("share->username %s", share->username));
655 DBUG_PRINT("info", ("share->password %s", share->password));
656 DBUG_PRINT("info", ("share->hostname %s", share->hostname));
657 DBUG_PRINT("info", ("share->database %s", share->database));
658 DBUG_PRINT("info", ("share->port %d", share->port));
659 DBUG_PRINT("info", ("share->socket %s", share->socket));
660 DBUG_RETURN(0);
661
662 error:
663 my_printf_error(error_num, "server name: '%s' doesn't exist!",
664 MYF(0), share->connection_string);
665 DBUG_RETURN(error_num);
666 }
667
668 /*
669 Parse connection info from table->s->connect_string
670
671 SYNOPSIS
672 parse_url()
673 mem_root MEM_ROOT pointer for memory allocation
674 share pointer to FEDERATED share
675 table pointer to current TABLE class
676 table_create_flag determines what error to throw
677
678 DESCRIPTION
679 Populates the share with information about the connection
680 to the foreign database that will serve as the data source.
681 This string must be specified (currently) in the "CONNECTION" field,
682 listed in the CREATE TABLE statement.
683
684 This string MUST be in the format of any of these:
685
686 CONNECTION="scheme://username:password@hostname:port/database/table"
687 CONNECTION="scheme://username@hostname/database/table"
688 CONNECTION="scheme://username@hostname:port/database/table"
689 CONNECTION="scheme://username:password@hostname/database/table"
690
691 _OR_
692
693 CONNECTION="connection name"
694
695
696
697 An Example:
698
699 CREATE TABLE t1 (id int(32))
700 ENGINE="FEDERATED"
701 CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federated/testtable";
702
703 CREATE TABLE t2 (
704 id int(4) NOT NULL auto_increment,
705 name varchar(32) NOT NULL,
706 PRIMARY KEY(id)
707 ) ENGINE="FEDERATED" CONNECTION="my_conn";
708
709 ***IMPORTANT***
710 Currently, the Federated Storage Engine only supports connecting to another
711 MySQL Database ("scheme" of "mysql"). Connections using JDBC as well as
712 other connectors are in the planning stage.
713
714
715 'password' and 'port' are both optional.
716
717 RETURN VALUE
718 0 success
719 error_num particular error code
720
721 */
722
parse_url(MEM_ROOT * mem_root,FEDERATED_SHARE * share,TABLE * table,uint table_create_flag)723 static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
724 uint table_create_flag)
725 {
726 uint error_num= (table_create_flag ?
727 ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
728 ER_FOREIGN_DATA_STRING_INVALID);
729 DBUG_ENTER("ha_federated::parse_url");
730
731 share->port= 0;
732 share->socket= 0;
733 DBUG_PRINT("info", ("share at %lx", (long unsigned int) share));
734 DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length));
735 DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length,
736 table->s->connect_string.str));
737 share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
738 table->s->connect_string.length);
739
740 DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
741 (long unsigned int) share->connection_string));
742
743 DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
744 /*
745 No :// or @ in connection string. Must be a straight connection name of
746 either "servername" or "servername/tablename"
747 */
748 if ( (!strstr(share->connection_string, "://") &&
749 (!strchr(share->connection_string, '@'))))
750 {
751
752 DBUG_PRINT("info",
753 ("share->connection_string %s internal format \
754 share->connection_string %lx",
755 share->connection_string,
756 (long unsigned int) share->connection_string));
757
758 /* ok, so we do a little parsing, but not completely! */
759 share->parsed= FALSE;
760 /*
761 If there is a single '/' in the connection string, this means the user is
762 specifying a table name
763 */
764
765 if ((share->table_name= strchr(share->connection_string, '/')))
766 {
767 share->connection_string[share->table_name - share->connection_string]= '\0';
768 share->table_name++;
769 share->table_name_length= (uint) strlen(share->table_name);
770
771 DBUG_PRINT("info",
772 ("internal format, parsed table_name share->connection_string \
773 %s share->table_name %s",
774 share->connection_string, share->table_name));
775
776 /*
777 there better not be any more '/'s !
778 */
779 if (strchr(share->table_name, '/'))
780 goto error;
781
782 }
783 /*
784 otherwise, straight server name, use tablename of federated table
785 as remote table name
786 */
787 else
788 {
789 /*
790 connection specifies everything but, resort to
791 expecting remote and foreign table names to match
792 */
793 share->table_name= strmake_root(mem_root, table->s->table_name.str,
794 (share->table_name_length= table->s->table_name.length));
795 DBUG_PRINT("info",
796 ("internal format, default table_name share->connection_string \
797 %s share->table_name %s",
798 share->connection_string, share->table_name));
799 }
800
801 if ((error_num= get_connection(mem_root, share)))
802 goto error;
803 }
804 else
805 {
806 share->parsed= TRUE;
807 // Add a null for later termination of table name
808 share->connection_string[table->s->connect_string.length]= 0;
809 share->scheme= share->connection_string;
810 DBUG_PRINT("info",("parse_url alloced share->scheme %lx",
811 (long unsigned int) share->scheme));
812
813 /*
814 remove addition of null terminator and store length
815 for each string in share
816 */
817 if (!(share->username= strstr(share->scheme, "://")))
818 goto error;
819 share->scheme[share->username - share->scheme]= '\0';
820
821 if (strcmp(share->scheme, "mysql") != 0)
822 goto error;
823
824 share->username+= 3;
825
826 if (!(share->hostname= strchr(share->username, '@')))
827 goto error;
828
829 share->username[share->hostname - share->username]= '\0';
830 share->hostname++;
831
832 if ((share->password= strchr(share->username, ':')))
833 {
834 share->username[share->password - share->username]= '\0';
835 share->password++;
836 share->username= share->username;
837 /* make sure there isn't an extra / or @ */
838 if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
839 goto error;
840 /*
841 Found that if the string is:
842 user:@hostname:port/db/table
843 Then password is a null string, so set to NULL
844 */
845 if (share->password[0] == '\0')
846 share->password= NULL;
847 }
848 else
849 share->username= share->username;
850
851 /* make sure there isn't an extra / or @ */
852 if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
853 goto error;
854
855 if (!(share->database= strchr(share->hostname, '/')))
856 goto error;
857 share->hostname[share->database - share->hostname]= '\0';
858 share->database++;
859
860 if ((share->sport= strchr(share->hostname, ':')))
861 {
862 share->hostname[share->sport - share->hostname]= '\0';
863 share->sport++;
864 if (share->sport[0] == '\0')
865 share->sport= NULL;
866 else
867 share->port= atoi(share->sport);
868 }
869
870 if (!(share->table_name= strchr(share->database, '/')))
871 goto error;
872 share->database[share->table_name - share->database]= '\0';
873 share->table_name++;
874
875 share->table_name_length= strlen(share->table_name);
876
877 /* make sure there's not an extra / */
878 if ((strchr(share->table_name, '/')))
879 goto error;
880
881 /*
882 If hostname is omitted, we set it to NULL. According to
883 mysql_real_connect() manual:
884 The value of host may be either a hostname or an IP address.
885 If host is NULL or the string "localhost", a connection to the
886 local host is assumed.
887 */
888 if (share->hostname[0] == '\0')
889 share->hostname= NULL;
890 }
891
892 if (!share->port)
893 {
894 if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
895 share->socket= (char*) MYSQL_UNIX_ADDR;
896 else
897 share->port= MYSQL_PORT;
898 }
899
900 DBUG_PRINT("info",
901 ("scheme: %s username: %s password: %s \
902 hostname: %s port: %d db: %s tablename: %s",
903 share->scheme, share->username, share->password,
904 share->hostname, share->port, share->database,
905 share->table_name));
906
907 DBUG_RETURN(0);
908
909 error:
910 DBUG_RETURN(parse_url_error(share, table, error_num));
911 }
912
913 /*****************************************************************************
914 ** FEDERATED tables
915 *****************************************************************************/
916
ha_federated(handlerton * hton,TABLE_SHARE * table_arg)917 ha_federated::ha_federated(handlerton *hton,
918 TABLE_SHARE *table_arg)
919 :handler(hton, table_arg),
920 mysql(0), stored_result(0), results(fe_key_memory_federated_share)
921 {
922 trx_next= 0;
923 memset(&bulk_insert, 0, sizeof(bulk_insert));
924 }
925
926
927 /*
928 Convert MySQL result set row to handler internal format
929
930 SYNOPSIS
931 convert_row_to_internal_format()
932 record Byte pointer to record
933 row MySQL result set row from fetchrow()
934 result Result set to use
935
936 DESCRIPTION
937 This method simply iterates through a row returned via fetchrow with
938 values from a successful SELECT , and then stores each column's value
939 in the field object via the field object pointer (pointing to the table's
940 array of field object pointers). This is how the handler needs the data
941 to be stored to then return results back to the user
942
943 RETURN VALUE
944 0 After fields have had field values stored from record
945 */
946
convert_row_to_internal_format(uchar * record,MYSQL_ROW row,MYSQL_RES * result)947 uint ha_federated::convert_row_to_internal_format(uchar *record,
948 MYSQL_ROW row,
949 MYSQL_RES *result)
950 {
951 ulong *lengths;
952 Field **field;
953 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
954 DBUG_ENTER("ha_federated::convert_row_to_internal_format");
955
956 lengths= mysql_fetch_lengths(result);
957
958 for (field= table->field; *field; field++, row++, lengths++)
959 {
960 /*
961 index variable to move us through the row at the
962 same iterative step as the field
963 */
964 my_ptrdiff_t old_ptr;
965 old_ptr= (my_ptrdiff_t) (record - table->record[0]);
966 (*field)->move_field_offset(old_ptr);
967 if (!*row)
968 {
969 (*field)->set_null();
970 (*field)->reset();
971 }
972 else
973 {
974 if (bitmap_is_set(table->read_set, (*field)->field_index))
975 {
976 (*field)->set_notnull();
977 (*field)->store(*row, *lengths, &my_charset_bin);
978 }
979 }
980 (*field)->move_field_offset(-old_ptr);
981 }
982 dbug_tmp_restore_column_map(table->write_set, old_map);
983 DBUG_RETURN(0);
984 }
985
emit_key_part_name(String * to,KEY_PART_INFO * part)986 static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
987 {
988 DBUG_ENTER("emit_key_part_name");
989 if (append_ident(to, part->field->field_name,
990 strlen(part->field->field_name), ident_quote_char))
991 DBUG_RETURN(1); // Out of memory
992 DBUG_RETURN(0);
993 }
994
emit_key_part_element(String * to,KEY_PART_INFO * part,bool needs_quotes,bool is_like,const uchar * ptr,uint len)995 static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
996 bool needs_quotes, bool is_like,
997 const uchar *ptr, uint len)
998 {
999 Field *field= part->field;
1000 DBUG_ENTER("emit_key_part_element");
1001
1002 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1003 DBUG_RETURN(1);
1004
1005 if (part->type == HA_KEYTYPE_BIT)
1006 {
1007 char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
1008
1009 *buf++= '0';
1010 *buf++= 'x';
1011 buf= octet2hex(buf, (char*) ptr, len);
1012 if (to->append((char*) buff, (uint)(buf - buff)))
1013 DBUG_RETURN(1);
1014 }
1015 else if (part->key_part_flag & HA_BLOB_PART)
1016 {
1017 String blob;
1018 uint blob_length= uint2korr(ptr);
1019 blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1020 blob_length, &my_charset_bin);
1021 if (append_escaped(to, &blob))
1022 DBUG_RETURN(1);
1023 }
1024 else if (part->key_part_flag & HA_VAR_LENGTH_PART)
1025 {
1026 String varchar;
1027 uint var_length= uint2korr(ptr);
1028 varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1029 var_length, &my_charset_bin);
1030 if (append_escaped(to, &varchar))
1031 DBUG_RETURN(1);
1032 }
1033 else
1034 {
1035 char strbuff[MAX_FIELD_WIDTH];
1036 String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
1037
1038 res= field->val_str(&str, ptr);
1039
1040 if (field->result_type() == STRING_RESULT)
1041 {
1042 if (append_escaped(to, res))
1043 DBUG_RETURN(1);
1044 }
1045 else if (to->append(res->ptr(), res->length()))
1046 DBUG_RETURN(1);
1047 }
1048
1049 if (is_like && to->append(STRING_WITH_LEN("%")))
1050 DBUG_RETURN(1);
1051
1052 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1053 DBUG_RETURN(1);
1054
1055 DBUG_RETURN(0);
1056 }
1057
1058 /*
1059 Create a WHERE clause based off of values in keys
1060 Note: This code was inspired by key_copy from key.cc
1061
1062 SYNOPSIS
1063 create_where_from_key ()
1064 to String object to store WHERE clause
1065 key_info KEY struct pointer
1066 key byte pointer containing key
1067 key_length length of key
1068 range_type 0 - no range, 1 - min range, 2 - max range
1069 (see enum range_operation)
1070
1071 DESCRIPTION
1072 Using iteration through all the keys via a KEY_PART_INFO pointer,
1073 This method 'extracts' the value of each key in the byte pointer
1074 *key, and for each key found, constructs an appropriate WHERE clause
1075
1076 RETURN VALUE
1077 0 After all keys have been accounted for to create the WHERE clause
1078 1 No keys found
1079
1080 Range flags Table per Timour:
1081
1082 -----------------
1083 - start_key:
1084 * ">" -> HA_READ_AFTER_KEY
1085 * ">=" -> HA_READ_KEY_OR_NEXT
1086 * "=" -> HA_READ_KEY_EXACT
1087
1088 - end_key:
1089 * "<" -> HA_READ_BEFORE_KEY
1090 * "<=" -> HA_READ_AFTER_KEY
1091
1092 records_in_range:
1093 -----------------
1094 - start_key:
1095 * ">" -> HA_READ_AFTER_KEY
1096 * ">=" -> HA_READ_KEY_EXACT
1097 * "=" -> HA_READ_KEY_EXACT
1098
1099 - end_key:
1100 * "<" -> HA_READ_BEFORE_KEY
1101 * "<=" -> HA_READ_AFTER_KEY
1102 * "=" -> HA_READ_AFTER_KEY
1103
1104 0 HA_READ_KEY_EXACT, Find first record else error
1105 1 HA_READ_KEY_OR_NEXT, Record or next record
1106 2 HA_READ_KEY_OR_PREV, Record or previous
1107 3 HA_READ_AFTER_KEY, Find next rec. after key-record
1108 4 HA_READ_BEFORE_KEY, Find next rec. before key-record
1109 5 HA_READ_PREFIX, Key which as same prefix
1110 6 HA_READ_PREFIX_LAST, Last key with the same prefix
1111 7 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix
1112
1113 Flags that I've found:
1114
1115 id, primary key, varchar
1116
1117 id = 'ccccc'
1118 records_in_range: start_key 0 end_key 3
1119 read_range_first: start_key 0 end_key NULL
1120
1121 id > 'ccccc'
1122 records_in_range: start_key 3 end_key NULL
1123 read_range_first: start_key 3 end_key NULL
1124
1125 id < 'ccccc'
1126 records_in_range: start_key NULL end_key 4
1127 read_range_first: start_key NULL end_key 4
1128
1129 id <= 'ccccc'
1130 records_in_range: start_key NULL end_key 3
1131 read_range_first: start_key NULL end_key 3
1132
1133 id >= 'ccccc'
1134 records_in_range: start_key 0 end_key NULL
1135 read_range_first: start_key 1 end_key NULL
1136
1137 id like 'cc%cc'
1138 records_in_range: start_key 0 end_key 3
1139 read_range_first: start_key 1 end_key 3
1140
1141 id > 'aaaaa' and id < 'ccccc'
1142 records_in_range: start_key 3 end_key 4
1143 read_range_first: start_key 3 end_key 4
1144
1145 id >= 'aaaaa' and id < 'ccccc';
1146 records_in_range: start_key 0 end_key 4
1147 read_range_first: start_key 1 end_key 4
1148
1149 id >= 'aaaaa' and id <= 'ccccc';
1150 records_in_range: start_key 0 end_key 3
1151 read_range_first: start_key 1 end_key 3
1152
1153 id > 'aaaaa' and id <= 'ccccc';
1154 records_in_range: start_key 3 end_key 3
1155 read_range_first: start_key 3 end_key 3
1156
1157 numeric keys:
1158
1159 id = 4
1160 index_read_idx: start_key 0 end_key NULL
1161
1162 id > 4
1163 records_in_range: start_key 3 end_key NULL
1164 read_range_first: start_key 3 end_key NULL
1165
1166 id >= 4
1167 records_in_range: start_key 0 end_key NULL
1168 read_range_first: start_key 1 end_key NULL
1169
1170 id < 4
1171 records_in_range: start_key NULL end_key 4
1172 read_range_first: start_key NULL end_key 4
1173
1174 id <= 4
1175 records_in_range: start_key NULL end_key 3
1176 read_range_first: start_key NULL end_key 3
1177
1178 id like 4
1179 full table scan, select * from
1180
1181 id > 2 and id < 8
1182 records_in_range: start_key 3 end_key 4
1183 read_range_first: start_key 3 end_key 4
1184
1185 id >= 2 and id < 8
1186 records_in_range: start_key 0 end_key 4
1187 read_range_first: start_key 1 end_key 4
1188
1189 id >= 2 and id <= 8
1190 records_in_range: start_key 0 end_key 3
1191 read_range_first: start_key 1 end_key 3
1192
1193 id > 2 and id <= 8
1194 records_in_range: start_key 3 end_key 3
1195 read_range_first: start_key 3 end_key 3
1196
1197 multi keys (id int, name varchar, other varchar)
1198
1199 id = 1;
1200 records_in_range: start_key 0 end_key 3
1201 read_range_first: start_key 0 end_key NULL
1202
1203 id > 4;
1204 id > 2 and name = '333'; remote: id > 2
1205 id > 2 and name > '333'; remote: id > 2
1206 id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
1207 id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
1208 id >= 4 and name = 'eric was here' and other > 'eeee';
1209 records_in_range: start_key 3 end_key NULL
1210 read_range_first: start_key 3 end_key NULL
1211
1212 id >= 4;
1213 id >= 2 and name = '333' and other < 'ddd';
1214 remote: `id` >= 2 AND `name` >= '333';
1215 records_in_range: start_key 0 end_key NULL
1216 read_range_first: start_key 1 end_key NULL
1217
1218 id < 4;
1219 id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
1220 records_in_range: start_key NULL end_key 4
1221 read_range_first: start_key NULL end_key 4
1222
1223 id <= 4;
1224 records_in_range: start_key NULL end_key 3
1225 read_range_first: start_key NULL end_key 3
1226
1227 id like 4;
1228 full table scan
1229
1230 id > 2 and id < 4;
1231 records_in_range: start_key 3 end_key 4
1232 read_range_first: start_key 3 end_key 4
1233
1234 id >= 2 and id < 4;
1235 records_in_range: start_key 0 end_key 4
1236 read_range_first: start_key 1 end_key 4
1237
1238 id >= 2 and id <= 4;
1239 records_in_range: start_key 0 end_key 3
1240 read_range_first: start_key 1 end_key 3
1241
1242 id > 2 and id <= 4;
1243 id = 6 and name = 'eric was here' and other > 'eeee';
1244 remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee')
1245 AND (`id` <= 6) AND ( AND `name` <= 'eric was here')
1246 no results
1247 records_in_range: start_key 3 end_key 3
1248 read_range_first: start_key 3 end_key 3
1249
1250 Summary:
1251
1252 * If the start key flag is 0 the max key flag shouldn't even be set,
1253 and if it is, the query produced would be invalid.
1254 * Multipart keys, even if containing some or all numeric columns,
1255 are treated the same as non-numeric keys
1256
1257 If the query is " = " (quotes or not):
1258 - records in range start key flag HA_READ_KEY_EXACT,
1259 end key flag HA_READ_AFTER_KEY (incorrect)
1260 - any other: start key flag HA_READ_KEY_OR_NEXT,
1261 end key flag HA_READ_AFTER_KEY (correct)
1262
1263 * 'like' queries (of key)
1264 - Numeric, full table scan
1265 - Non-numeric
1266 records_in_range: start_key 0 end_key 3
1267 other : start_key 1 end_key 3
1268
1269 * If the key flag is HA_READ_AFTER_KEY:
1270 if start_key, append >
1271 if end_key, append <=
1272
1273 * If create_where_key was called by records_in_range:
1274
1275 - if the key is numeric:
1276 start key flag is 0 when end key is NULL, end key flag is 3 or 4
1277 - if create_where_key was called by any other function:
1278 start key flag is 1 when end key is NULL, end key flag is 3 or 4
1279 - if the key is non-numeric, or multipart
1280 When the query is an exact match, the start key flag is 0,
1281 end key flag is 3 for what should be a no-range condition where
1282 you should have 0 and max key NULL, which it is if called by
1283 read_range_first
1284
1285 Conclusion:
1286
1287 1. Need logic to determin if a key is min or max when the flag is
1288 HA_READ_AFTER_KEY, and handle appending correct operator accordingly
1289
1290 2. Need a boolean flag to pass to create_where_from_key, used in the
1291 switch statement. Add 1 to the flag if:
1292 - start key flag is HA_READ_KEY_EXACT and the end key is NULL
1293
1294 */
1295
create_where_from_key(String * to,KEY * key_info,const key_range * start_key,const key_range * end_key,bool from_records_in_range,bool eq_range_arg)1296 bool ha_federated::create_where_from_key(String *to,
1297 KEY *key_info,
1298 const key_range *start_key,
1299 const key_range *end_key,
1300 bool from_records_in_range,
1301 bool eq_range_arg)
1302 {
1303 bool both_not_null=
1304 (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1305 const uchar *ptr;
1306 uint remainder, length;
1307 char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
1308 String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
1309 const key_range *ranges[2]= { start_key, end_key };
1310 my_bitmap_map *old_map;
1311 DBUG_ENTER("ha_federated::create_where_from_key");
1312
1313 tmp.length(0);
1314 if (start_key == NULL && end_key == NULL)
1315 DBUG_RETURN(1);
1316
1317 old_map= dbug_tmp_use_all_columns(table, table->write_set);
1318 for (uint i= 0; i <= 1; i++)
1319 {
1320 bool needs_quotes;
1321 KEY_PART_INFO *key_part;
1322 if (ranges[i] == NULL)
1323 continue;
1324
1325 if (both_not_null)
1326 {
1327 if (i > 0)
1328 tmp.append(STRING_WITH_LEN(") AND ("));
1329 else
1330 tmp.append(STRING_WITH_LEN(" ("));
1331 }
1332
1333 for (key_part= key_info->key_part,
1334 remainder= key_info->user_defined_key_parts,
1335 length= ranges[i]->length,
1336 ptr= ranges[i]->key; ;
1337 remainder--,
1338 key_part++)
1339 {
1340 Field *field= key_part->field;
1341 uint store_length= key_part->store_length;
1342 uint part_length= min(store_length, length);
1343 needs_quotes= field->str_needs_quotes();
1344 DBUG_DUMP("key, start of loop", ptr, length);
1345
1346 if (key_part->null_bit)
1347 {
1348 if (*ptr++)
1349 {
1350 /*
1351 We got "IS [NOT] NULL" condition against nullable column. We
1352 distinguish between "IS NOT NULL" and "IS NULL" by flag. For
1353 "IS NULL", flag is set to HA_READ_KEY_EXACT.
1354 */
1355 if (emit_key_part_name(&tmp, key_part) ||
1356 (ranges[i]->flag == HA_READ_KEY_EXACT ?
1357 tmp.append(STRING_WITH_LEN(" IS NULL ")) :
1358 tmp.append(STRING_WITH_LEN(" IS NOT NULL "))))
1359 goto err;
1360 /*
1361 We need to adjust pointer and length to be prepared for next
1362 key part. As well as check if this was last key part.
1363 */
1364 goto prepare_for_next_key_part;
1365 }
1366 }
1367
1368 if (tmp.append(STRING_WITH_LEN(" (")))
1369 goto err;
1370
1371 switch (ranges[i]->flag) {
1372 case HA_READ_KEY_EXACT:
1373 DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1374 if (store_length >= length ||
1375 !needs_quotes ||
1376 key_part->type == HA_KEYTYPE_BIT ||
1377 field->result_type() != STRING_RESULT)
1378 {
1379 if (emit_key_part_name(&tmp, key_part))
1380 goto err;
1381
1382 if (from_records_in_range)
1383 {
1384 if (tmp.append(STRING_WITH_LEN(" >= ")))
1385 goto err;
1386 }
1387 else
1388 {
1389 if (tmp.append(STRING_WITH_LEN(" = ")))
1390 goto err;
1391 }
1392
1393 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1394 part_length))
1395 goto err;
1396 }
1397 else
1398 {
1399 /* LIKE */
1400 if (emit_key_part_name(&tmp, key_part) ||
1401 tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1402 emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1403 part_length))
1404 goto err;
1405 }
1406 break;
1407 case HA_READ_AFTER_KEY:
1408 if (eq_range_arg)
1409 {
1410 if (tmp.append("1=1")) // Dummy
1411 goto err;
1412 break;
1413 }
1414 DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1415 if ((store_length >= length) || (i > 0)) /* for all parts of end key*/
1416 {
1417 if (emit_key_part_name(&tmp, key_part))
1418 goto err;
1419
1420 if (i > 0) /* end key */
1421 {
1422 if (tmp.append(STRING_WITH_LEN(" <= ")))
1423 goto err;
1424 }
1425 else /* start key */
1426 {
1427 if (tmp.append(STRING_WITH_LEN(" > ")))
1428 goto err;
1429 }
1430
1431 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1432 part_length))
1433 {
1434 goto err;
1435 }
1436 break;
1437 }
1438 // Fall through
1439 case HA_READ_KEY_OR_NEXT:
1440 DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1441 if (emit_key_part_name(&tmp, key_part) ||
1442 tmp.append(STRING_WITH_LEN(" >= ")) ||
1443 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1444 part_length))
1445 goto err;
1446 break;
1447 case HA_READ_BEFORE_KEY:
1448 DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1449 if (store_length >= length)
1450 {
1451 if (emit_key_part_name(&tmp, key_part) ||
1452 tmp.append(STRING_WITH_LEN(" < ")) ||
1453 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1454 part_length))
1455 goto err;
1456 break;
1457 }
1458 // Fall through
1459 case HA_READ_KEY_OR_PREV:
1460 DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1461 if (emit_key_part_name(&tmp, key_part) ||
1462 tmp.append(STRING_WITH_LEN(" <= ")) ||
1463 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1464 part_length))
1465 goto err;
1466 break;
1467 default:
1468 DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1469 goto err;
1470 }
1471 if (tmp.append(STRING_WITH_LEN(") ")))
1472 goto err;
1473
1474 prepare_for_next_key_part:
1475 if (store_length >= length)
1476 break;
1477 DBUG_PRINT("info", ("remainder %d", remainder));
1478 assert(remainder > 1);
1479 length-= store_length;
1480 /*
1481 For nullable columns, null-byte is already skipped before, that is
1482 ptr was incremented by 1. Since store_length still counts null-byte,
1483 we need to subtract 1 from store_length.
1484 */
1485 ptr+= store_length - MY_TEST(key_part->null_bit);
1486 if (tmp.append(STRING_WITH_LEN(" AND ")))
1487 goto err;
1488
1489 DBUG_PRINT("info",
1490 ("create_where_from_key WHERE clause: %s",
1491 tmp.c_ptr_quick()));
1492 }
1493 }
1494 dbug_tmp_restore_column_map(table->write_set, old_map);
1495
1496 if (both_not_null)
1497 if (tmp.append(STRING_WITH_LEN(") ")))
1498 DBUG_RETURN(1);
1499
1500 if (to->append(STRING_WITH_LEN(" WHERE ")))
1501 DBUG_RETURN(1);
1502
1503 if (to->append(tmp))
1504 DBUG_RETURN(1);
1505
1506 DBUG_RETURN(0);
1507
1508 err:
1509 dbug_tmp_restore_column_map(table->write_set, old_map);
1510 DBUG_RETURN(1);
1511 }
1512
1513 /*
1514 Example of simple lock controls. The "share" it creates is structure we will
1515 pass to each federated handler. Do you have to have one of these? Well, you
1516 have pieces that are used for locking, and they are needed to function.
1517 */
1518
get_share(const char * table_name,TABLE * table)1519 static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
1520 {
1521 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1522 Field **field;
1523 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
1524 FEDERATED_SHARE *share= NULL, tmp_share;
1525 MEM_ROOT mem_root;
1526 DBUG_ENTER("ha_federated.cc::get_share");
1527
1528 /*
1529 In order to use this string, we must first zero it's length,
1530 or it will contain garbage
1531 */
1532 query.length(0);
1533
1534 init_alloc_root(fe_key_memory_federated_share, &mem_root, 256, 0);
1535
1536 mysql_mutex_lock(&federated_mutex);
1537
1538 tmp_share.share_key= table_name;
1539 tmp_share.share_key_length= (uint) strlen(table_name);
1540 if (parse_url(&mem_root, &tmp_share, table, 0))
1541 goto error;
1542
1543 /* TODO: change tmp_share.scheme to LEX_STRING object */
1544 if (!(share= (FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
1545 (uchar*) tmp_share.share_key,
1546 tmp_share.
1547 share_key_length)))
1548 {
1549 query.set_charset(system_charset_info);
1550 query.append(STRING_WITH_LEN("SELECT "));
1551 for (field= table->field; *field; field++)
1552 {
1553 append_ident(&query, (*field)->field_name,
1554 strlen((*field)->field_name), ident_quote_char);
1555 query.append(STRING_WITH_LEN(", "));
1556 }
1557 /* chops off trailing comma */
1558 query.length(query.length() - sizeof_trailing_comma);
1559
1560 query.append(STRING_WITH_LEN(" FROM "));
1561
1562 append_ident(&query, tmp_share.table_name,
1563 tmp_share.table_name_length, ident_quote_char);
1564
1565 if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1566 !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1)))
1567 goto error;
1568
1569 share->use_count= 0;
1570 share->mem_root= mem_root;
1571
1572 DBUG_PRINT("info",
1573 ("share->select_query %s", share->select_query));
1574
1575 if (my_hash_insert(&federated_open_tables, (uchar*) share))
1576 goto error;
1577 thr_lock_init(&share->lock);
1578 mysql_mutex_init(fe_key_mutex_FEDERATED_SHARE_mutex,
1579 &share->mutex, MY_MUTEX_INIT_FAST);
1580 }
1581 else
1582 free_root(&mem_root, MYF(0)); /* prevents memory leak */
1583
1584 share->use_count++;
1585 mysql_mutex_unlock(&federated_mutex);
1586
1587 DBUG_RETURN(share);
1588
1589 error:
1590 mysql_mutex_unlock(&federated_mutex);
1591 free_root(&mem_root, MYF(0));
1592 DBUG_RETURN(NULL);
1593 }
1594
1595
1596 /*
1597 Free lock controls. We call this whenever we close a table.
1598 If the table had the last reference to the share then we
1599 free memory associated with it.
1600 */
1601
free_share(FEDERATED_SHARE * share)1602 static int free_share(FEDERATED_SHARE *share)
1603 {
1604 MEM_ROOT mem_root= share->mem_root;
1605 DBUG_ENTER("free_share");
1606
1607 mysql_mutex_lock(&federated_mutex);
1608 if (!--share->use_count)
1609 {
1610 my_hash_delete(&federated_open_tables, (uchar*) share);
1611 thr_lock_delete(&share->lock);
1612 mysql_mutex_destroy(&share->mutex);
1613 free_root(&mem_root, MYF(0));
1614 }
1615 mysql_mutex_unlock(&federated_mutex);
1616
1617 DBUG_RETURN(0);
1618 }
1619
1620
records_in_range(uint inx,key_range * start_key,key_range * end_key)1621 ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
1622 key_range *end_key)
1623 {
1624 /*
1625
1626 We really want indexes to be used as often as possible, therefore
1627 we just need to hard-code the return value to a very low number to
1628 force the issue
1629
1630 */
1631 DBUG_ENTER("ha_federated::records_in_range");
1632 DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
1633 }
1634 /*
1635 If frm_error() is called then we will use this to to find out
1636 what file extentions exist for the storage engine. This is
1637 also used by the default rename_table and delete_table method
1638 in handler.cc.
1639 */
1640
bas_ext() const1641 const char **ha_federated::bas_ext() const
1642 {
1643 static const char *ext[]=
1644 {
1645 NullS
1646 };
1647 return ext;
1648 }
1649
1650
1651 /*
1652 Used for opening tables. The name will be the name of the file.
1653 A table is opened when it needs to be opened. For instance
1654 when a request comes in for a select on the table (tables are not
1655 open and closed for each request, they are cached).
1656
1657 Called from handler.cc by handler::ha_open(). The server opens
1658 all tables by calling ha_open() which then calls the handler
1659 specific open().
1660 */
1661
open(const char * name,int mode,uint test_if_locked)1662 int ha_federated::open(const char *name, int mode, uint test_if_locked)
1663 {
1664 DBUG_ENTER("ha_federated::open");
1665
1666 if (!(share= get_share(name, table)))
1667 DBUG_RETURN(1);
1668 thr_lock_data_init(&share->lock, &lock, NULL);
1669
1670 assert(mysql == NULL);
1671
1672 ref_length= sizeof(MYSQL_RES *) + sizeof(MYSQL_ROW_OFFSET);
1673 DBUG_PRINT("info", ("ref_length: %u", ref_length));
1674
1675 reset();
1676
1677 DBUG_RETURN(0);
1678 }
1679
1680
1681 /*
1682 Closes a table. We call the free_share() function to free any resources
1683 that we have allocated in the "shared" structure.
1684
1685 Called from sql_base.cc, sql_select.cc, and table.cc.
1686 In sql_select.cc it is only used to close up temporary tables or during
1687 the process where a temporary table is converted over to being a
1688 myisam table.
1689 For sql_base.cc look at close_data_tables().
1690 */
1691
close(void)1692 int ha_federated::close(void)
1693 {
1694 THD *thd= current_thd;
1695 DBUG_ENTER("ha_federated::close");
1696
1697 free_result();
1698
1699 results.clear();
1700
1701 /*
1702 Check to verify wheather the connection is still alive or not.
1703 FLUSH TABLES will quit the connection and if connection is broken,
1704 it will reconnect again and quit silently.
1705 */
1706 if (mysql && (!mysql->net.vio || !vio_is_connected(mysql->net.vio)))
1707 mysql->net.error= 2;
1708
1709 /* Disconnect from mysql */
1710 mysql_close(mysql);
1711 mysql= NULL;
1712
1713 /*
1714 mysql_close() might return an error if a remote server's gone
1715 for some reason. If that happens while removing a table from
1716 the table cache, the error will be propagated to a client even
1717 if the original query was not issued against the FEDERATED table.
1718 So, don't propagate errors from mysql_close().
1719 */
1720 if (table->in_use && thd != table->in_use)
1721 table->in_use->clear_error();
1722
1723 /*
1724 Errors from mysql_close() are silently ignored for flush tables.
1725 Close the connection silently.
1726 */
1727 if (thd && thd->lex->sql_command == SQLCOM_FLUSH)
1728 thd->clear_error();
1729
1730 DBUG_RETURN(free_share(share));
1731 }
1732
1733
1734 /**
1735 @brief Construct the INSERT statement.
1736
1737 @details This method will construct the INSERT statement and appends it to
1738 the supplied query string buffer.
1739
1740 @return
1741 @retval FALSE No error
1742 @retval TRUE Failure
1743 */
1744
append_stmt_insert(String * query)1745 bool ha_federated::append_stmt_insert(String *query)
1746 {
1747 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1748 Field **field;
1749 size_t tmp_length;
1750 bool added_field= FALSE;
1751
1752 /* The main insert query string */
1753 String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1754 DBUG_ENTER("ha_federated::append_stmt_insert");
1755
1756 insert_string.length(0);
1757
1758 if (replace_duplicates)
1759 insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
1760 else if (ignore_duplicates && !insert_dup_update)
1761 insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
1762 else
1763 insert_string.append(STRING_WITH_LEN("INSERT INTO "));
1764 append_ident(&insert_string, share->table_name, share->table_name_length,
1765 ident_quote_char);
1766 tmp_length= insert_string.length();
1767 insert_string.append(STRING_WITH_LEN(" ("));
1768
1769 /*
1770 loop through the field pointer array, add any fields to both the values
1771 list and the fields list that match the current query id
1772 */
1773 for (field= table->field; *field; field++)
1774 {
1775 if (bitmap_is_set(table->write_set, (*field)->field_index))
1776 {
1777 /* append the field name */
1778 append_ident(&insert_string, (*field)->field_name,
1779 strlen((*field)->field_name), ident_quote_char);
1780
1781 /* append commas between both fields and fieldnames */
1782 /*
1783 unfortunately, we can't use the logic if *(fields + 1) to
1784 make the following appends conditional as we don't know if the
1785 next field is in the write set
1786 */
1787 insert_string.append(STRING_WITH_LEN(", "));
1788 added_field= TRUE;
1789 }
1790 }
1791
1792 if (added_field)
1793 {
1794 /* Remove trailing comma. */
1795 insert_string.length(insert_string.length() - sizeof_trailing_comma);
1796 insert_string.append(STRING_WITH_LEN(") "));
1797 }
1798 else
1799 {
1800 /* If there were no fields, we don't want to add a closing paren. */
1801 insert_string.length(tmp_length);
1802 }
1803
1804 insert_string.append(STRING_WITH_LEN(" VALUES "));
1805
1806 DBUG_RETURN(query->append(insert_string));
1807 }
1808
1809
1810 /*
1811 write_row() inserts a row. No extra() hint is given currently if a bulk load
1812 is happeneding. buf() is a byte array of data. You can use the field
1813 information to extract the data from the native byte array type.
1814 Example of this would be:
1815 for (Field **field=table->field ; *field ; field++)
1816 {
1817 ...
1818 }
1819
1820 Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
1821 sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
1822 */
1823
write_row(uchar * buf)1824 int ha_federated::write_row(uchar *buf)
1825 {
1826 char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1827 char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1828 Field **field;
1829 size_t tmp_length;
1830 int error= 0;
1831 bool use_bulk_insert;
1832 bool auto_increment_update_required= (table->next_number_field != NULL);
1833
1834 /* The string containing the values to be added to the insert */
1835 String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1836 /* The actual value of the field, to be added to the values_string */
1837 String insert_field_value_string(insert_field_value_buffer,
1838 sizeof(insert_field_value_buffer),
1839 &my_charset_bin);
1840 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1841 DBUG_ENTER("ha_federated::write_row");
1842
1843 values_string.length(0);
1844 insert_field_value_string.length(0);
1845 ha_statistic_increment(&SSV::ha_write_count);
1846
1847 /*
1848 start both our field and field values strings
1849 We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
1850 Ignore duplicates is always true when insert_dup_update is true.
1851 When replace_duplicates == TRUE, we can safely enable multi-row insert.
1852 When performing multi-row insert, we only collect the columns values for
1853 the row. The start of the statement is only created when the first
1854 row is copied in to the bulk_insert string.
1855 */
1856 if (!(use_bulk_insert= bulk_insert.str &&
1857 (!insert_dup_update || replace_duplicates)))
1858 append_stmt_insert(&values_string);
1859
1860 values_string.append(STRING_WITH_LEN(" ("));
1861 tmp_length= values_string.length();
1862
1863 /*
1864 loop through the field pointer array, add any fields to both the values
1865 list and the fields list that is part of the write set
1866 */
1867 for (field= table->field; *field; field++)
1868 {
1869 if (bitmap_is_set(table->write_set, (*field)->field_index))
1870 {
1871 if ((*field)->is_null())
1872 values_string.append(STRING_WITH_LEN(" NULL "));
1873 else
1874 {
1875 bool needs_quote= (*field)->str_needs_quotes();
1876 (*field)->val_str(&insert_field_value_string);
1877 if (needs_quote)
1878 values_string.append(value_quote_char);
1879 insert_field_value_string.print(&values_string);
1880 if (needs_quote)
1881 values_string.append(value_quote_char);
1882
1883 insert_field_value_string.length(0);
1884 }
1885
1886 /* append commas between both fields and fieldnames */
1887 /*
1888 unfortunately, we can't use the logic if *(fields + 1) to
1889 make the following appends conditional as we don't know if the
1890 next field is in the write set
1891 */
1892 values_string.append(STRING_WITH_LEN(", "));
1893 }
1894 }
1895 dbug_tmp_restore_column_map(table->read_set, old_map);
1896
1897 /*
1898 if there were no fields, we don't want to add a closing paren
1899 AND, we don't want to chop off the last char '('
1900 insert will be "INSERT INTO t1 VALUES ();"
1901 */
1902 if (values_string.length() > tmp_length)
1903 {
1904 /* chops off trailing comma */
1905 values_string.length(values_string.length() - sizeof_trailing_comma);
1906 }
1907 /* we always want to append this, even if there aren't any fields */
1908 values_string.append(STRING_WITH_LEN(") "));
1909
1910 if (use_bulk_insert)
1911 {
1912 /*
1913 Send the current bulk insert out if appending the current row would
1914 cause the statement to overflow the packet size, otherwise set
1915 auto_increment_update_required to FALSE as no query was executed.
1916 */
1917 if (bulk_insert.length + values_string.length() + bulk_padding >
1918 mysql->net.max_packet_size && bulk_insert.length)
1919 {
1920 error= real_query(bulk_insert.str, bulk_insert.length);
1921 bulk_insert.length= 0;
1922 }
1923 else
1924 auto_increment_update_required= FALSE;
1925
1926 if (bulk_insert.length == 0)
1927 {
1928 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1929 String insert_string(insert_buffer, sizeof(insert_buffer),
1930 &my_charset_bin);
1931 insert_string.length(0);
1932 append_stmt_insert(&insert_string);
1933 dynstr_append_mem(&bulk_insert, insert_string.ptr(),
1934 insert_string.length());
1935 }
1936 else
1937 dynstr_append_mem(&bulk_insert, ",", 1);
1938
1939 dynstr_append_mem(&bulk_insert, values_string.ptr(),
1940 values_string.length());
1941 }
1942 else
1943 {
1944 error= real_query(values_string.ptr(), values_string.length());
1945 }
1946
1947 if (error)
1948 {
1949 DBUG_RETURN(stash_remote_error());
1950 }
1951 /*
1952 If the table we've just written a record to contains an auto_increment
1953 field, then store the last_insert_id() value from the foreign server
1954 */
1955 if (auto_increment_update_required)
1956 {
1957 update_auto_increment();
1958
1959 /* mysql_insert() uses this for protocol return value */
1960 table->next_number_field->store(stats.auto_increment_value, 1);
1961 }
1962
1963 DBUG_RETURN(0);
1964 }
1965
1966
1967 /**
1968 @brief Prepares the storage engine for bulk inserts.
1969
1970 @param[in] rows estimated number of rows in bulk insert
1971 or 0 if unknown.
1972
1973 @details Initializes memory structures required for bulk insert.
1974 */
1975
start_bulk_insert(ha_rows rows)1976 void ha_federated::start_bulk_insert(ha_rows rows)
1977 {
1978 uint page_size;
1979 DBUG_ENTER("ha_federated::start_bulk_insert");
1980
1981 dynstr_free(&bulk_insert);
1982
1983 /**
1984 We don't bother with bulk-insert semantics when the estimated rows == 1
1985 The rows value will be 0 if the server does not know how many rows
1986 would be inserted. This can occur when performing INSERT...SELECT
1987 */
1988
1989 if (rows == 1)
1990 DBUG_VOID_RETURN;
1991
1992 /*
1993 Make sure we have an open connection so that we know the
1994 maximum packet size.
1995 */
1996 if (!mysql && real_connect())
1997 DBUG_VOID_RETURN;
1998
1999 page_size= (uint) my_getpagesize();
2000
2001 if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
2002 DBUG_VOID_RETURN;
2003
2004 bulk_insert.length= 0;
2005 DBUG_VOID_RETURN;
2006 }
2007
2008
2009 /**
2010 @brief End bulk insert.
2011
2012 @details This method will send any remaining rows to the remote server.
2013 Finally, it will deinitialize the bulk insert data structure.
2014
2015 @return Operation status
2016 @retval 0 No error
2017 @retval != 0 Error occured at remote server. Also sets my_errno.
2018 */
2019
end_bulk_insert()2020 int ha_federated::end_bulk_insert()
2021 {
2022 int error= 0;
2023 DBUG_ENTER("ha_federated::end_bulk_insert");
2024
2025 if (bulk_insert.str && bulk_insert.length)
2026 {
2027 if (real_query(bulk_insert.str, bulk_insert.length))
2028 error= stash_remote_error();
2029 else
2030 if (table->next_number_field)
2031 update_auto_increment();
2032 }
2033
2034 dynstr_free(&bulk_insert);
2035
2036 set_my_errno(error);
2037 DBUG_RETURN(error);
2038 }
2039
2040
2041 /*
2042 ha_federated::update_auto_increment
2043
2044 This method ensures that last_insert_id() works properly. What it simply does
2045 is calls last_insert_id() on the foreign database immediately after insert
2046 (if the table has an auto_increment field) and sets the insert id via
2047 thd->insert_id(ID)).
2048 */
update_auto_increment(void)2049 void ha_federated::update_auto_increment(void)
2050 {
2051 THD *thd= current_thd;
2052 DBUG_ENTER("ha_federated::update_auto_increment");
2053
2054 ha_federated::info(HA_STATUS_AUTO);
2055 thd->first_successful_insert_id_in_cur_stmt=
2056 stats.auto_increment_value;
2057 DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
2058
2059 DBUG_VOID_RETURN;
2060 }
2061
optimize(THD * thd,HA_CHECK_OPT * check_opt)2062 int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
2063 {
2064 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2065 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2066 DBUG_ENTER("ha_federated::optimize");
2067
2068 query.length(0);
2069
2070 query.set_charset(system_charset_info);
2071 query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
2072 append_ident(&query, share->table_name, share->table_name_length,
2073 ident_quote_char);
2074
2075 if (real_query(query.ptr(), query.length()))
2076 {
2077 DBUG_RETURN(stash_remote_error());
2078 }
2079
2080 DBUG_RETURN(0);
2081 }
2082
2083
repair(THD * thd,HA_CHECK_OPT * check_opt)2084 int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
2085 {
2086 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2087 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2088 DBUG_ENTER("ha_federated::repair");
2089
2090 query.length(0);
2091
2092 query.set_charset(system_charset_info);
2093 query.append(STRING_WITH_LEN("REPAIR TABLE "));
2094 append_ident(&query, share->table_name, share->table_name_length,
2095 ident_quote_char);
2096 if (check_opt->flags & T_QUICK)
2097 query.append(STRING_WITH_LEN(" QUICK"));
2098 if (check_opt->flags & T_EXTEND)
2099 query.append(STRING_WITH_LEN(" EXTENDED"));
2100 if (check_opt->sql_flags & TT_USEFRM)
2101 query.append(STRING_WITH_LEN(" USE_FRM"));
2102
2103 if (real_query(query.ptr(), query.length()))
2104 {
2105 DBUG_RETURN(stash_remote_error());
2106 }
2107
2108 DBUG_RETURN(0);
2109 }
2110
2111
2112 /*
2113 Yes, update_row() does what you expect, it updates a row. old_data will have
2114 the previous row record in it, while new_data will have the newest data in
2115 it.
2116
2117 Keep in mind that the server can do updates based on ordering if an ORDER BY
2118 clause was used. Consecutive ordering is not guaranteed.
2119
2120 Currently new_data will not have an updated AUTO_INCREMENT record. You can
2121 do this for federated by doing the following:
2122
2123 if (table->next_number_field && record == table->record[0])
2124 update_auto_increment();
2125
2126 Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
2127 */
2128
update_row(const uchar * old_data,uchar * new_data)2129 int ha_federated::update_row(const uchar *old_data, uchar *new_data)
2130 {
2131 /*
2132 This used to control how the query was built. If there was a
2133 primary key, the query would be built such that there was a where
2134 clause with only that column as the condition. This is flawed,
2135 because if we have a multi-part primary key, it would only use the
2136 first part! We don't need to do this anyway, because
2137 read_range_first will retrieve the correct record, which is what
2138 is used to build the WHERE clause. We can however use this to
2139 append a LIMIT to the end if there is NOT a primary key. Why do
2140 this? Because we only are updating one record, and LIMIT enforces
2141 this.
2142 */
2143 bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY);
2144
2145 /*
2146 buffers for following strings
2147 */
2148 char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2149 char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2150 char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2151
2152 /* Work area for field values */
2153 String field_value(field_value_buffer, sizeof(field_value_buffer),
2154 &my_charset_bin);
2155 /* stores the update query */
2156 String update_string(update_buffer,
2157 sizeof(update_buffer),
2158 &my_charset_bin);
2159 /* stores the WHERE clause */
2160 String where_string(where_buffer,
2161 sizeof(where_buffer),
2162 &my_charset_bin);
2163 uchar *record= table->record[0];
2164 DBUG_ENTER("ha_federated::update_row");
2165 /*
2166 set string lengths to 0 to avoid misc chars in string
2167 */
2168 field_value.length(0);
2169 update_string.length(0);
2170 where_string.length(0);
2171
2172 if (ignore_duplicates)
2173 update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
2174 else
2175 update_string.append(STRING_WITH_LEN("UPDATE "));
2176 append_ident(&update_string, share->table_name,
2177 share->table_name_length, ident_quote_char);
2178 update_string.append(STRING_WITH_LEN(" SET "));
2179
2180 /*
2181 In this loop, we want to match column names to values being inserted
2182 (while building INSERT statement).
2183
2184 Iterate through table->field (new data) and share->old_field (old_data)
2185 using the same index to create an SQL UPDATE statement. New data is
2186 used to create SET field=value and old data is used to create WHERE
2187 field=oldvalue
2188 */
2189
2190 for (Field **field= table->field; *field; field++)
2191 {
2192 if (bitmap_is_set(table->write_set, (*field)->field_index))
2193 {
2194 size_t field_name_length= strlen((*field)->field_name);
2195 append_ident(&update_string, (*field)->field_name, field_name_length,
2196 ident_quote_char);
2197 update_string.append(STRING_WITH_LEN(" = "));
2198
2199 if ((*field)->is_null())
2200 update_string.append(STRING_WITH_LEN(" NULL "));
2201 else
2202 {
2203 /* otherwise = */
2204 my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2205 bool needs_quote= (*field)->str_needs_quotes();
2206 (*field)->val_str(&field_value);
2207 if (needs_quote)
2208 update_string.append(value_quote_char);
2209 field_value.print(&update_string);
2210 if (needs_quote)
2211 update_string.append(value_quote_char);
2212 field_value.length(0);
2213 tmp_restore_column_map(table->read_set, old_map);
2214 }
2215 update_string.append(STRING_WITH_LEN(", "));
2216 }
2217
2218 if (bitmap_is_set(table->read_set, (*field)->field_index))
2219 {
2220 size_t field_name_length= strlen((*field)->field_name);
2221 append_ident(&where_string, (*field)->field_name, field_name_length,
2222 ident_quote_char);
2223 if ((*field)->is_null_in_record(old_data))
2224 where_string.append(STRING_WITH_LEN(" IS NULL "));
2225 else
2226 {
2227 bool needs_quote= (*field)->str_needs_quotes();
2228 where_string.append(STRING_WITH_LEN(" = "));
2229 (*field)->val_str(&field_value,
2230 (old_data + (*field)->offset(record)));
2231 if (needs_quote)
2232 where_string.append(value_quote_char);
2233 field_value.print(&where_string);
2234 if (needs_quote)
2235 where_string.append(value_quote_char);
2236 field_value.length(0);
2237 }
2238 where_string.append(STRING_WITH_LEN(" AND "));
2239 }
2240 }
2241
2242 /* Remove last ', '. This works as there must be at least on updated field */
2243 update_string.length(update_string.length() - sizeof_trailing_comma);
2244
2245 if (where_string.length())
2246 {
2247 /* chop off trailing AND */
2248 where_string.length(where_string.length() - sizeof_trailing_and);
2249 update_string.append(STRING_WITH_LEN(" WHERE "));
2250 update_string.append(where_string);
2251 }
2252
2253 /*
2254 If this table has not a primary key, then we could possibly
2255 update multiple rows. We want to make sure to only update one!
2256 */
2257 if (!has_a_primary_key)
2258 update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2259
2260 if (real_query(update_string.ptr(), update_string.length()))
2261 {
2262 DBUG_RETURN(stash_remote_error());
2263 }
2264 DBUG_RETURN(0);
2265 }
2266
2267 /*
2268 This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2269 The server will call this right after the current row has been called (from
2270 either a previous rnd_next() or index call).
2271 If you keep a pointer to the last row or can access a primary key it will
2272 make doing the deletion quite a bit easier.
2273 Keep in mind that the server does no guarentee consecutive deletions.
2274 ORDER BY clauses can be used.
2275
2276 Called in sql_acl.cc and sql_udf.cc to manage internal table information.
2277 Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2278 it is used for removing duplicates while in insert it is used for REPLACE
2279 calls.
2280 */
2281
delete_row(const uchar * buf)2282 int ha_federated::delete_row(const uchar *buf)
2283 {
2284 char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2285 char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2286 String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
2287 String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2288 uint found= 0;
2289 DBUG_ENTER("ha_federated::delete_row");
2290
2291 delete_string.length(0);
2292 if (ignore_duplicates)
2293 delete_string.append(STRING_WITH_LEN("DELETE IGNORE FROM "));
2294 else
2295 delete_string.append(STRING_WITH_LEN("DELETE FROM "));
2296 append_ident(&delete_string, share->table_name,
2297 share->table_name_length, ident_quote_char);
2298 delete_string.append(STRING_WITH_LEN(" WHERE "));
2299
2300 for (Field **field= table->field; *field; field++)
2301 {
2302 Field *cur_field= *field;
2303 found++;
2304 if (bitmap_is_set(table->read_set, cur_field->field_index))
2305 {
2306 append_ident(&delete_string, (*field)->field_name,
2307 strlen((*field)->field_name), ident_quote_char);
2308 data_string.length(0);
2309 if (cur_field->is_null())
2310 {
2311 delete_string.append(STRING_WITH_LEN(" IS NULL "));
2312 }
2313 else
2314 {
2315 bool needs_quote= cur_field->str_needs_quotes();
2316 delete_string.append(STRING_WITH_LEN(" = "));
2317 cur_field->val_str(&data_string);
2318 if (needs_quote)
2319 delete_string.append(value_quote_char);
2320 data_string.print(&delete_string);
2321 if (needs_quote)
2322 delete_string.append(value_quote_char);
2323 }
2324 delete_string.append(STRING_WITH_LEN(" AND "));
2325 }
2326 }
2327
2328 // Remove trailing AND
2329 delete_string.length(delete_string.length() - sizeof_trailing_and);
2330 if (!found)
2331 delete_string.length(delete_string.length() - sizeof_trailing_where);
2332
2333 delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2334 DBUG_PRINT("info",
2335 ("Delete sql: %s", delete_string.c_ptr_quick()));
2336 if (real_query(delete_string.ptr(), delete_string.length()))
2337 {
2338 DBUG_RETURN(stash_remote_error());
2339 }
2340 stats.deleted+= (ha_rows) mysql->affected_rows;
2341 stats.records-= (ha_rows) mysql->affected_rows;
2342 DBUG_PRINT("info",
2343 ("rows deleted %ld rows deleted for all time %ld",
2344 (long) mysql->affected_rows, (long) stats.deleted));
2345
2346 DBUG_RETURN(0);
2347 }
2348
index_read_idx_map(uchar * buf,uint index,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)2349 int ha_federated::index_read_idx_map(uchar *buf, uint index, const uchar *key,
2350 key_part_map keypart_map,
2351 enum ha_rkey_function find_flag)
2352 {
2353 int error= index_init(index, 0);
2354 if (error)
2355 return error;
2356 error= index_read_map(buf, key, keypart_map, find_flag);
2357 if(!error && stored_result)
2358 {
2359 uchar *dummy_arg=NULL;
2360 position(dummy_arg);
2361 }
2362 int error1= index_end();
2363 return error ? error : error1;
2364 }
2365
2366 /*
2367 Positions an index cursor to the index specified in the handle. Fetches the
2368 row if available. If the key value is null, begin at the first key of the
2369 index. This method, which is called in the case of an SQL statement having
2370 a WHERE clause on a non-primary key index, simply calls index_read_idx.
2371 */
2372
index_read(uchar * buf,const uchar * key,uint key_len,ha_rkey_function find_flag)2373 int ha_federated::index_read(uchar *buf, const uchar *key,
2374 uint key_len, ha_rkey_function find_flag)
2375 {
2376 int rc;
2377 DBUG_ENTER("ha_federated::index_read");
2378
2379 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2380 free_result();
2381 rc= index_read_idx_with_result_set(buf, active_index, key,
2382 key_len, find_flag,
2383 &stored_result);
2384 MYSQL_INDEX_READ_ROW_DONE(rc);
2385 DBUG_RETURN(rc);
2386 }
2387
2388
2389 /*
2390 Positions an index cursor to the index specified in key. Fetches the
2391 row if any. This is only used to read whole keys.
2392
2393 This method is called via index_read in the case of a WHERE clause using
2394 a primary key index OR is called DIRECTLY when the WHERE clause
2395 uses a PRIMARY KEY index.
2396
2397 NOTES
2398 This uses an internal result set that is deleted before function
2399 returns. We need to be able to be calable from ha_rnd_pos()
2400 */
2401
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)2402 int ha_federated::index_read_idx(uchar *buf, uint index, const uchar *key,
2403 uint key_len, enum ha_rkey_function find_flag)
2404 {
2405 int retval;
2406 MYSQL_RES *mysql_result;
2407 DBUG_ENTER("ha_federated::index_read_idx");
2408
2409 if ((retval= index_read_idx_with_result_set(buf, index, key,
2410 key_len, find_flag,
2411 &mysql_result)))
2412 DBUG_RETURN(retval);
2413 mysql_free_result(mysql_result);
2414 results.pop_back();
2415 DBUG_RETURN(0);
2416 }
2417
2418
2419 /*
2420 Create result set for rows matching query and return first row
2421
2422 RESULT
2423 0 ok In this case *result will contain the result set
2424 table->status == 0
2425 # error In this case *result will contain 0
2426 table->status == STATUS_NOT_FOUND
2427 */
2428
index_read_idx_with_result_set(uchar * buf,uint index,const uchar * key,uint key_len,ha_rkey_function find_flag,MYSQL_RES ** result)2429 int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
2430 const uchar *key,
2431 uint key_len,
2432 ha_rkey_function find_flag,
2433 MYSQL_RES **result)
2434 {
2435 int retval;
2436 char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2437 char index_value[STRING_BUFFER_USUAL_SIZE];
2438 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2439 String index_string(index_value,
2440 sizeof(index_value),
2441 &my_charset_bin);
2442 String sql_query(sql_query_buffer,
2443 sizeof(sql_query_buffer),
2444 &my_charset_bin);
2445 key_range range;
2446 DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2447
2448 *result= 0; // In case of errors
2449 index_string.length(0);
2450 sql_query.length(0);
2451 ha_statistic_increment(&SSV::ha_read_key_count);
2452
2453 sql_query.append(share->select_query);
2454
2455 range.key= key;
2456 range.length= key_len;
2457 range.flag= find_flag;
2458 create_where_from_key(&index_string,
2459 &table->key_info[index],
2460 &range,
2461 NULL, 0, 0);
2462 sql_query.append(index_string);
2463
2464 if (real_query(sql_query.ptr(), sql_query.length()))
2465 {
2466 sprintf(error_buffer, "error: %d '%s'",
2467 mysql_errno(mysql), mysql_error(mysql));
2468 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2469 goto error;
2470 }
2471 if (!(*result= store_result(mysql)))
2472 {
2473 retval= HA_ERR_END_OF_FILE;
2474 goto error;
2475 }
2476 if ((retval= read_next(buf, *result)))
2477 {
2478 mysql_free_result(*result);
2479 results.pop_back();
2480 *result= 0;
2481 table->status= STATUS_NOT_FOUND;
2482 DBUG_RETURN(retval);
2483 }
2484 DBUG_RETURN(0);
2485
2486 error:
2487 table->status= STATUS_NOT_FOUND;
2488 my_error(retval, MYF(0), error_buffer);
2489 DBUG_RETURN(retval);
2490 }
2491
2492
2493 /*
2494 This method is used exlusevely by filesort() to check if we
2495 can create sorting buffers of necessary size.
2496 If the handler returns more records that it declares
2497 here server can just crash on filesort().
2498 We cannot guarantee that's not going to happen with
2499 the FEDERATED engine, as we have records==0 always if the
2500 client is a VIEW, and for the table the number of
2501 records can inpredictably change during execution.
2502 So we return maximum possible value here.
2503 */
2504
estimate_rows_upper_bound()2505 ha_rows ha_federated::estimate_rows_upper_bound()
2506 {
2507 return HA_POS_ERROR;
2508 }
2509
2510
2511 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
2512
index_init(uint keynr,bool sorted)2513 int ha_federated::index_init(uint keynr, bool sorted)
2514 {
2515 DBUG_ENTER("ha_federated::index_init");
2516 DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr));
2517 active_index= keynr;
2518 DBUG_RETURN(0);
2519 }
2520
2521
2522 /*
2523 Read first range
2524 */
2525
read_range_first(const key_range * start_key,const key_range * end_key,bool eq_range_arg,bool sorted)2526 int ha_federated::read_range_first(const key_range *start_key,
2527 const key_range *end_key,
2528 bool eq_range_arg, bool sorted)
2529 {
2530 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2531 int retval;
2532 String sql_query(sql_query_buffer,
2533 sizeof(sql_query_buffer),
2534 &my_charset_bin);
2535 DBUG_ENTER("ha_federated::read_range_first");
2536 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2537
2538 assert(!(start_key == NULL && end_key == NULL));
2539
2540 sql_query.length(0);
2541 sql_query.append(share->select_query);
2542 create_where_from_key(&sql_query,
2543 &table->key_info[active_index],
2544 start_key, end_key, 0, eq_range_arg);
2545 if (real_query(sql_query.ptr(), sql_query.length()))
2546 {
2547 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2548 goto error;
2549 }
2550 sql_query.length(0);
2551
2552 if (!(stored_result= store_result(mysql)))
2553 {
2554 retval= HA_ERR_END_OF_FILE;
2555 goto error;
2556 }
2557
2558 retval= read_next(table->record[0], stored_result);
2559 MYSQL_INDEX_READ_ROW_DONE(retval);
2560 DBUG_RETURN(retval);
2561
2562 error:
2563 table->status= STATUS_NOT_FOUND;
2564 MYSQL_INDEX_READ_ROW_DONE(retval);
2565 DBUG_RETURN(retval);
2566 }
2567
2568
read_range_next()2569 int ha_federated::read_range_next()
2570 {
2571 int retval;
2572 DBUG_ENTER("ha_federated::read_range_next");
2573 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2574 retval= rnd_next_int(table->record[0]);
2575 MYSQL_INDEX_READ_ROW_DONE(retval);
2576 DBUG_RETURN(retval);
2577 }
2578
2579
2580 /* Used to read forward through the index. */
index_next(uchar * buf)2581 int ha_federated::index_next(uchar *buf)
2582 {
2583 int retval;
2584 DBUG_ENTER("ha_federated::index_next");
2585 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2586 ha_statistic_increment(&SSV::ha_read_next_count);
2587 retval= read_next(buf, stored_result);
2588 MYSQL_INDEX_READ_ROW_DONE(retval);
2589 DBUG_RETURN(retval);
2590 }
2591
2592
2593 /*
2594 rnd_init() is called when the system wants the storage engine to do a table
2595 scan.
2596
2597 This is the method that gets data for the SELECT calls.
2598
2599 See the federated in the introduction at the top of this file to see when
2600 rnd_init() is called.
2601
2602 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2603 sql_table.cc, and sql_update.cc.
2604 */
2605
rnd_init(bool scan)2606 int ha_federated::rnd_init(bool scan)
2607 {
2608 DBUG_ENTER("ha_federated::rnd_init");
2609 /*
2610 The use of the 'scan' flag is incredibly important for this handler
2611 to work properly, especially with updates containing WHERE clauses
2612 using indexed columns.
2613
2614 When the initial query contains a WHERE clause of the query using an
2615 indexed column, it's index_read_idx that selects the exact record from
2616 the foreign database.
2617
2618 When there is NO index in the query, either due to not having a WHERE
2619 clause, or the WHERE clause is using columns that are not indexed, a
2620 'full table scan' done by rnd_init, which in this situation simply means
2621 a 'select * from ...' on the foreign table.
2622
2623 In other words, this 'scan' flag gives us the means to ensure that if
2624 there is an index involved in the query, we want index_read_idx to
2625 retrieve the exact record (scan flag is 0), and do not want rnd_init
2626 to do a 'full table scan' and wipe out that result set.
2627
2628 Prior to using this flag, the problem was most apparent with updates.
2629
2630 An initial query like 'UPDATE tablename SET anything = whatever WHERE
2631 indexedcol = someval', index_read_idx would get called, using a query
2632 constructed with a WHERE clause built from the values of index ('indexcol'
2633 in this case, having a value of 'someval'). mysql_store_result would
2634 then get called (this would be the result set we want to use).
2635
2636 After this rnd_init (from sql_update.cc) would be called, it would then
2637 unecessarily call "select * from table" on the foreign table, then call
2638 mysql_store_result, which would wipe out the correct previous result set
2639 from the previous call of index_read_idx's that had the result set
2640 containing the correct record, hence update the wrong row!
2641
2642 */
2643
2644 if (scan)
2645 {
2646 if (real_query(share->select_query, strlen(share->select_query)) ||
2647 !(stored_result= store_result(mysql)))
2648 DBUG_RETURN(stash_remote_error());
2649 }
2650 DBUG_RETURN(0);
2651 }
2652
2653
rnd_end()2654 int ha_federated::rnd_end()
2655 {
2656 DBUG_ENTER("ha_federated::rnd_end");
2657 DBUG_RETURN(index_end());
2658 }
2659
2660
index_end(void)2661 int ha_federated::index_end(void)
2662 {
2663 DBUG_ENTER("ha_federated::index_end");
2664 free_result();
2665 active_index= MAX_KEY;
2666 DBUG_RETURN(0);
2667 }
2668
2669
2670 /*
2671 This is called for each row of the table scan. When you run out of records
2672 you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
2673 The Field structure for the table is the key to getting data into buf
2674 in a manner that will allow the server to understand it.
2675
2676 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2677 sql_table.cc, and sql_update.cc.
2678 */
2679
rnd_next(uchar * buf)2680 int ha_federated::rnd_next(uchar *buf)
2681 {
2682 int rc;
2683 DBUG_ENTER("ha_federated::rnd_next");
2684 MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2685 TRUE);
2686 rc= rnd_next_int(buf);
2687 MYSQL_READ_ROW_DONE(rc);
2688 DBUG_RETURN(rc);
2689 }
2690
rnd_next_int(uchar * buf)2691 int ha_federated::rnd_next_int(uchar *buf)
2692 {
2693 DBUG_ENTER("ha_federated::rnd_next_int");
2694
2695 if (stored_result == 0)
2696 {
2697 /*
2698 Return value of rnd_init is not always checked (see records.cc),
2699 so we can get here _even_ if there is _no_ pre-fetched result-set!
2700 TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
2701 */
2702 DBUG_RETURN(1);
2703 }
2704 DBUG_RETURN(read_next(buf, stored_result));
2705 }
2706
2707
2708 /*
2709 ha_federated::read_next
2710
2711 reads from a result set and converts to mysql internal
2712 format
2713
2714 SYNOPSIS
2715 ha_federated::read_next()
2716 buf byte pointer to record
2717 result mysql result set
2718
2719 DESCRIPTION
2720 This method is a wrapper method that reads one record from a result
2721 set and converts it to the internal table format
2722
2723 RETURN VALUE
2724 1 error
2725 0 no error
2726 */
2727
read_next(uchar * buf,MYSQL_RES * result)2728 int ha_federated::read_next(uchar *buf, MYSQL_RES *result)
2729 {
2730 int retval;
2731 MYSQL_ROW row;
2732 DBUG_ENTER("ha_federated::read_next");
2733
2734 table->status= STATUS_NOT_FOUND; // For easier return
2735
2736 /* Save current data cursor position. */
2737 current_position= result->data_cursor;
2738
2739 /* Fetch a row, insert it back in a row format. */
2740 if (!(row= mysql_fetch_row(result)))
2741 DBUG_RETURN(HA_ERR_END_OF_FILE);
2742
2743 if (!(retval= convert_row_to_internal_format(buf, row, result)))
2744 table->status= 0;
2745
2746 DBUG_RETURN(retval);
2747 }
2748
2749
2750 /**
2751 @brief Store a reference to current row.
2752
2753 @details During a query execution we may have different result sets (RS),
2754 e.g. for different ranges. All the RS's used are stored in
2755 memory and placed in @c results dynamic array. At the end of
2756 execution all stored RS's are freed at once in the
2757 @c ha_federated::reset().
2758 So, in case of federated, a reference to current row is a
2759 stored result address and current data cursor position.
2760 As we keep all RS in memory during a query execution,
2761 we can get any record using the reference any time until
2762 @c ha_federated::reset() is called.
2763 TODO: we don't have to store all RS's rows but only those
2764 we call @c ha_federated::position() for, so we can free memory
2765 where we store other rows in the @c ha_federated::index_end().
2766
2767 @param[in] record record data (unused)
2768 */
2769
position(const uchar * record MY_ATTRIBUTE ((unused)))2770 void ha_federated::position(const uchar *record MY_ATTRIBUTE ((unused)))
2771 {
2772 DBUG_ENTER("ha_federated::position");
2773
2774 assert(stored_result);
2775
2776 position_called= TRUE;
2777 /* Store result set address. */
2778 memcpy(ref, &stored_result, sizeof(MYSQL_RES *));
2779 /* Store data cursor position. */
2780 memcpy(ref + sizeof(MYSQL_RES *), ¤t_position,
2781 sizeof(MYSQL_ROW_OFFSET));
2782 DBUG_VOID_RETURN;
2783 }
2784
2785
2786 /*
2787 This is like rnd_next, but you are given a position to use to determine the
2788 row. The position will be of the type that you stored in ref.
2789
2790 This method is required for an ORDER BY
2791
2792 Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
2793 */
2794
rnd_pos(uchar * buf,uchar * pos)2795 int ha_federated::rnd_pos(uchar *buf, uchar *pos)
2796 {
2797 MYSQL_RES *result;
2798 int ret_val;
2799 DBUG_ENTER("ha_federated::rnd_pos");
2800
2801 MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2802 FALSE);
2803 ha_statistic_increment(&SSV::ha_read_rnd_count);
2804
2805 /* Get stored result set. */
2806 memcpy(&result, pos, sizeof(MYSQL_RES *));
2807 assert(result);
2808 /* Set data cursor position. */
2809 memcpy(&result->data_cursor, pos + sizeof(MYSQL_RES *),
2810 sizeof(MYSQL_ROW_OFFSET));
2811 /* Read a row. */
2812 ret_val= read_next(buf, result);
2813 MYSQL_READ_ROW_DONE(ret_val);
2814 DBUG_RETURN(ret_val);
2815 }
2816
2817
2818 /*
2819 ::info() is used to return information to the optimizer.
2820 Currently this table handler doesn't implement most of the fields
2821 really needed. SHOW also makes use of this data
2822 Another note, you will probably want to have the following in your
2823 code:
2824 if (records < 2)
2825 records = 2;
2826 The reason is that the server will optimize for cases of only a single
2827 record. If in a table scan you don't know the number of records
2828 it will probably be better to set records to two so you can return
2829 as many records as you need.
2830 Along with records a few more variables you may wish to set are:
2831 records
2832 deleted
2833 data_file_length
2834 index_file_length
2835 delete_length
2836 check_time
2837 Take a look at the public variables in handler.h for more information.
2838
2839 Called in:
2840 filesort.cc
2841 ha_heap.cc
2842 item_sum.cc
2843 opt_sum.cc
2844 sql_delete.cc
2845 sql_delete.cc
2846 sql_derived.cc
2847 sql_select.cc
2848 sql_select.cc
2849 sql_select.cc
2850 sql_select.cc
2851 sql_select.cc
2852 sql_show.cc
2853 sql_show.cc
2854 sql_show.cc
2855 sql_show.cc
2856 sql_table.cc
2857 sql_union.cc
2858 sql_update.cc
2859
2860 */
2861
info(uint flag)2862 int ha_federated::info(uint flag)
2863 {
2864 char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
2865 int error;
2866 uint error_code;
2867 MYSQL_RES *result= 0;
2868 MYSQL_ROW row;
2869 String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2870 DBUG_ENTER("ha_federated::info");
2871
2872 error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2873 /* we want not to show table status if not needed to do so */
2874 if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
2875 {
2876 status_query_string.length(0);
2877 status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
2878 append_ident(&status_query_string, share->table_name,
2879 share->table_name_length, value_quote_char);
2880
2881 if (real_query(status_query_string.ptr(), status_query_string.length()))
2882 goto error;
2883
2884 status_query_string.length(0);
2885
2886 result= mysql_store_result(mysql);
2887
2888 /*
2889 We're going to use fields num. 4, 12 and 13 of the resultset,
2890 so make sure we have these fields.
2891 */
2892 if (!result || (mysql_num_fields(result) < 14))
2893 goto error;
2894
2895 if (!mysql_num_rows(result))
2896 goto error;
2897
2898 if (!(row= mysql_fetch_row(result)))
2899 goto error;
2900
2901 /*
2902 deleted is set in ha_federated::info
2903 */
2904 /*
2905 need to figure out what this means as far as federated is concerned,
2906 since we don't have a "file"
2907
2908 data_file_length = ?
2909 index_file_length = ?
2910 delete_length = ?
2911 */
2912 if (row[4] != NULL)
2913 stats.records= (ha_rows) my_strtoll10(row[4], (char**) 0,
2914 &error);
2915 if (row[5] != NULL)
2916 stats.mean_rec_length= (ulong) my_strtoll10(row[5], (char**) 0, &error);
2917
2918 stats.data_file_length= stats.records * stats.mean_rec_length;
2919
2920 if (row[12] != NULL)
2921 stats.update_time= (ulong) my_strtoll10(row[12], (char**) 0,
2922 &error);
2923 if (row[13] != NULL)
2924 stats.check_time= (ulong) my_strtoll10(row[13], (char**) 0,
2925 &error);
2926
2927 /*
2928 size of IO operations (This is based on a good guess, no high science
2929 involved)
2930 */
2931 if (flag & HA_STATUS_CONST)
2932 stats.block_size= 4096;
2933
2934 }
2935
2936 if ((flag & HA_STATUS_AUTO) && mysql)
2937 stats.auto_increment_value= mysql->insert_id;
2938
2939 mysql_free_result(result);
2940
2941 DBUG_RETURN(0);
2942
2943 error:
2944 mysql_free_result(result);
2945 if (mysql)
2946 {
2947 my_printf_error(error_code, ": %d : %s", MYF(0),
2948 mysql_errno(mysql), mysql_error(mysql));
2949 }
2950 else
2951 if (remote_error_number != -1 /* error already reported */)
2952 {
2953 error_code= remote_error_number;
2954 my_error(error_code, MYF(0), ER(error_code));
2955 }
2956 DBUG_RETURN(error_code);
2957 }
2958
2959
2960 /**
2961 @brief Handles extra signals from MySQL server
2962
2963 @param[in] operation Hint for storage engine
2964
2965 @return Operation Status
2966 @retval 0 OK
2967 */
extra(ha_extra_function operation)2968 int ha_federated::extra(ha_extra_function operation)
2969 {
2970 DBUG_ENTER("ha_federated::extra");
2971 switch (operation) {
2972 case HA_EXTRA_IGNORE_DUP_KEY:
2973 ignore_duplicates= TRUE;
2974 break;
2975 case HA_EXTRA_NO_IGNORE_DUP_KEY:
2976 insert_dup_update= FALSE;
2977 ignore_duplicates= FALSE;
2978 break;
2979 case HA_EXTRA_WRITE_CAN_REPLACE:
2980 replace_duplicates= TRUE;
2981 break;
2982 case HA_EXTRA_WRITE_CANNOT_REPLACE:
2983 /*
2984 We use this flag to ensure that we do not create an "INSERT IGNORE"
2985 statement when inserting new rows into the remote table.
2986 */
2987 replace_duplicates= FALSE;
2988 break;
2989 case HA_EXTRA_INSERT_WITH_UPDATE:
2990 insert_dup_update= TRUE;
2991 break;
2992 default:
2993 /* do nothing */
2994 DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
2995 }
2996 DBUG_RETURN(0);
2997 }
2998
2999
3000 /**
3001 @brief Reset state of file to after 'open'.
3002
3003 @detail This function is called after every statement for all tables
3004 used by that statement.
3005
3006 @return Operation status
3007 @retval 0 OK
3008 */
3009
reset(void)3010 int ha_federated::reset(void)
3011 {
3012 insert_dup_update= FALSE;
3013 ignore_duplicates= FALSE;
3014 replace_duplicates= FALSE;
3015
3016 /* Free stored result sets. */
3017 for (MYSQL_RES **result= results.begin(); result != results.end(); ++result)
3018 {
3019 mysql_free_result(*result);
3020 }
3021 results.clear();
3022
3023 return 0;
3024 }
3025
3026
3027 /*
3028 Used to delete all rows in a table. Both for cases of truncate and
3029 for cases where the optimizer realizes that all rows will be
3030 removed as a result of a SQL statement.
3031
3032 Called from item_sum.cc by Item_func_group_concat::clear(),
3033 Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
3034 Called from sql_delete.cc by mysql_delete().
3035 Called from sql_select.cc by JOIN::reinit().
3036 Called from sql_union.cc by st_select_lex_unit::exec().
3037 */
3038
delete_all_rows()3039 int ha_federated::delete_all_rows()
3040 {
3041 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3042 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3043 DBUG_ENTER("ha_federated::delete_all_rows");
3044
3045 query.length(0);
3046
3047 query.set_charset(system_charset_info);
3048 if (ignore_duplicates)
3049 query.append(STRING_WITH_LEN("DELETE IGNORE FROM "));
3050 else
3051 query.append(STRING_WITH_LEN("DELETE FROM "));
3052 append_ident(&query, share->table_name, share->table_name_length,
3053 ident_quote_char);
3054
3055 if (real_query(query.ptr(), query.length()))
3056 {
3057 DBUG_RETURN(stash_remote_error());
3058 }
3059 stats.deleted+= stats.records;
3060 stats.records= 0;
3061 DBUG_RETURN(0);
3062 }
3063
3064
3065 /*
3066 Used to manually truncate the table.
3067 */
3068
truncate()3069 int ha_federated::truncate()
3070 {
3071 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3072 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3073 DBUG_ENTER("ha_federated::truncate");
3074
3075 query.length(0);
3076
3077 query.set_charset(system_charset_info);
3078 query.append(STRING_WITH_LEN("TRUNCATE TABLE "));
3079 append_ident(&query, share->table_name, share->table_name_length,
3080 ident_quote_char);
3081
3082 /*
3083 TRUNCATE won't return anything in mysql_affected_rows
3084 */
3085 if (real_query(query.ptr(), query.length()))
3086 {
3087 DBUG_RETURN(stash_remote_error());
3088 }
3089 stats.deleted+= stats.records;
3090 stats.records= 0;
3091 DBUG_RETURN(0);
3092 }
3093
3094
3095 /*
3096 The idea with handler::store_lock() is the following:
3097
3098 The statement decided which locks we should need for the table
3099 for updates/deletes/inserts we get WRITE locks, for SELECT... we get
3100 read locks.
3101
3102 Before adding the lock into the table lock handler (see thr_lock.c)
3103 mysqld calls store lock with the requested locks. Store lock can now
3104 modify a write lock to a read lock (or some other lock), ignore the
3105 lock (if we don't want to use MySQL table locks at all) or add locks
3106 for many tables (like we do when we are using a MERGE handler).
3107
3108 Berkeley DB for federated changes all WRITE locks to TL_WRITE_ALLOW_WRITE
3109 (which signals that we are doing WRITES, but we are still allowing other
3110 reader's and writer's.
3111
3112 When releasing locks, store_lock() are also called. In this case one
3113 usually doesn't have to do anything.
3114
3115 In some exceptional cases MySQL may send a request for a TL_IGNORE;
3116 This means that we are requesting the same lock as last time and this
3117 should also be ignored. (This may happen when someone does a flush
3118 table when we have opened a part of the tables, in which case mysqld
3119 closes and reopens the tables and tries to get the same locks at last
3120 time). In the future we will probably try to remove this.
3121
3122 Called from lock.cc by get_lock_data().
3123 */
3124
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3125 THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
3126 THR_LOCK_DATA **to,
3127 enum thr_lock_type lock_type)
3128 {
3129 DBUG_ENTER("ha_federated::store_lock");
3130 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3131 {
3132 /*
3133 Here is where we get into the guts of a row level lock.
3134 If TL_UNLOCK is set
3135 If we are not doing a LOCK TABLE or DISCARD/IMPORT
3136 TABLESPACE, then allow multiple writers
3137 */
3138
3139 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3140 lock_type <= TL_WRITE) && !thd->in_lock_tables)
3141 lock_type= TL_WRITE_ALLOW_WRITE;
3142
3143 /*
3144 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
3145 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
3146 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
3147 to t2. Convert the lock to a normal read lock to allow
3148 concurrent inserts to t2.
3149 */
3150
3151 if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3152 lock_type= TL_READ;
3153
3154 lock.type= lock_type;
3155 }
3156
3157 *to++= &lock;
3158
3159 DBUG_RETURN(to);
3160 }
3161
3162 /*
3163 create() does nothing, since we have no local setup of our own.
3164 FUTURE: We should potentially connect to the foreign database and
3165 */
3166
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)3167 int ha_federated::create(const char *name, TABLE *table_arg,
3168 HA_CREATE_INFO *create_info)
3169 {
3170 int retval;
3171 THD *thd= current_thd;
3172 FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
3173 DBUG_ENTER("ha_federated::create");
3174
3175 retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3176
3177 DBUG_RETURN(retval);
3178
3179 }
3180
3181
real_connect()3182 int ha_federated::real_connect()
3183 {
3184 char buffer[FEDERATED_QUERY_BUFFER_SIZE];
3185 String sql_query(buffer, sizeof(buffer), &my_charset_bin);
3186 DBUG_ENTER("ha_federated::real_connect");
3187
3188 /*
3189 Bug#25679
3190 Ensure that we do not hold the LOCK_open mutex while attempting
3191 to establish Federated connection to guard against a trivial
3192 Denial of Service scenerio.
3193 */
3194 mysql_mutex_assert_not_owner(&LOCK_open);
3195
3196 assert(mysql == NULL);
3197
3198 if (!(mysql= mysql_init(NULL)))
3199 {
3200 remote_error_number= HA_ERR_OUT_OF_MEM;
3201 DBUG_RETURN(-1);
3202 }
3203
3204 /*
3205 BUG# 17044 Federated Storage Engine is not UTF8 clean
3206 Add set names to whatever charset the table is at open
3207 of table
3208 */
3209 /* this sets the csname like 'set names utf8' */
3210 mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
3211 this->table->s->table_charset->csname);
3212 mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD,
3213 "program_name", "mysqld");
3214 mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD,
3215 "_client_role", "federated_storage");
3216 sql_query.length(0);
3217
3218 if (!mysql_real_connect(mysql,
3219 share->hostname,
3220 share->username,
3221 share->password,
3222 share->database,
3223 share->port,
3224 share->socket, 0))
3225 {
3226 stash_remote_error();
3227 mysql_close(mysql);
3228 mysql= NULL;
3229 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
3230 remote_error_number= -1;
3231 DBUG_RETURN(-1);
3232 }
3233
3234 /*
3235 We have established a connection, lets try a simple dummy query just
3236 to check that the table and expected columns are present.
3237 */
3238 sql_query.append(share->select_query);
3239 sql_query.append(STRING_WITH_LEN(" WHERE 1=0"));
3240 if (mysql_real_query(mysql, sql_query.ptr(),
3241 static_cast<ulong>(sql_query.length())))
3242 {
3243 sql_query.length(0);
3244 sql_query.append("error: ");
3245 sql_query.qs_append(mysql_errno(mysql));
3246 sql_query.append(" '");
3247 sql_query.append(mysql_error(mysql));
3248 sql_query.append("'");
3249 mysql_close(mysql);
3250 mysql= NULL;
3251 my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
3252 remote_error_number= -1;
3253 DBUG_RETURN(-1);
3254 }
3255
3256 /* Just throw away the result, no rows anyways but need to keep in sync */
3257 mysql_free_result(mysql_store_result(mysql));
3258
3259 /*
3260 Since we do not support transactions at this version, we can let the client
3261 API silently reconnect. For future versions, we will need more logic to
3262 deal with transactions
3263 */
3264
3265 mysql->reconnect= 1;
3266 DBUG_RETURN(0);
3267 }
3268
3269
real_query(const char * query,size_t length)3270 int ha_federated::real_query(const char *query, size_t length)
3271 {
3272 int rc= 0;
3273 DBUG_ENTER("ha_federated::real_query");
3274
3275 if (!mysql && (rc= real_connect()))
3276 goto end;
3277
3278 if (!query || !length)
3279 goto end;
3280
3281 rc= mysql_real_query(mysql, query, static_cast<ulong>(length));
3282
3283 end:
3284 DBUG_RETURN(rc);
3285 }
3286
3287
stash_remote_error()3288 int ha_federated::stash_remote_error()
3289 {
3290 DBUG_ENTER("ha_federated::stash_remote_error()");
3291 if (!mysql)
3292 DBUG_RETURN(remote_error_number);
3293 remote_error_number= mysql_errno(mysql);
3294 strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
3295 if (remote_error_number == ER_DUP_ENTRY ||
3296 remote_error_number == ER_DUP_KEY)
3297 DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3298 if (remote_error_number == ER_NO_REFERENCED_ROW ||
3299 remote_error_number == ER_NO_REFERENCED_ROW_2)
3300 DBUG_RETURN(HA_ERR_NO_REFERENCED_ROW);
3301 DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
3302 }
3303
3304
get_error_message(int error,String * buf)3305 bool ha_federated::get_error_message(int error, String* buf)
3306 {
3307 DBUG_ENTER("ha_federated::get_error_message");
3308 DBUG_PRINT("enter", ("error: %d", error));
3309 if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
3310 {
3311 buf->append(STRING_WITH_LEN("Error on remote system: "));
3312 buf->qs_append(remote_error_number);
3313 buf->append(STRING_WITH_LEN(": "));
3314 buf->append(remote_error_buf);
3315
3316 remote_error_number= 0;
3317 remote_error_buf[0]= '\0';
3318 }
3319 DBUG_PRINT("exit", ("message: %s", buf->ptr()));
3320 DBUG_RETURN(FALSE);
3321 }
3322
3323
3324 /**
3325 @brief Store a result set.
3326
3327 @details Call @c mysql_store_result() to save a result set then
3328 append it to the stored results array.
3329
3330 @param[in] mysql_arg MySLQ connection structure.
3331
3332 @return Stored result set (MYSQL_RES object).
3333 */
3334
store_result(MYSQL * mysql_arg)3335 MYSQL_RES *ha_federated::store_result(MYSQL *mysql_arg)
3336 {
3337 MYSQL_RES *result= mysql_store_result(mysql_arg);
3338 DBUG_ENTER("ha_federated::store_result");
3339 if (result)
3340 {
3341 results.push_back(result);
3342 }
3343 position_called= FALSE;
3344 DBUG_RETURN(result);
3345 }
3346
3347
free_result()3348 void ha_federated::free_result()
3349 {
3350 DBUG_ENTER("ha_federated::free_result");
3351 if (stored_result && !position_called)
3352 {
3353 mysql_free_result(stored_result);
3354 stored_result= 0;
3355 if (!results.empty())
3356 results.pop_back();
3357 }
3358 DBUG_VOID_RETURN;
3359 }
3360
3361
external_lock(THD * thd,int lock_type)3362 int ha_federated::external_lock(THD *thd, int lock_type)
3363 {
3364 int error= 0;
3365 DBUG_ENTER("ha_federated::external_lock");
3366
3367 /*
3368 Support for transactions disabled until WL#2952 fixes it.
3369 */
3370 DBUG_RETURN(error);
3371 }
3372
3373
federated_commit(handlerton * hton,THD * thd,bool all)3374 static int federated_commit(handlerton *hton, THD *thd, bool all)
3375 {
3376 int return_val= 0;
3377 ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
3378 DBUG_ENTER("federated_commit");
3379
3380 if (all)
3381 {
3382 int error= 0;
3383 ha_federated *ptr, *old= NULL;
3384 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3385 {
3386 if (old)
3387 old->trx_next= NULL;
3388 error= ptr->connection_commit();
3389 if (error && !return_val)
3390 return_val= error;
3391 }
3392 thd_set_ha_data(thd, hton, NULL);
3393 }
3394
3395 DBUG_PRINT("info", ("error val: %d", return_val));
3396 DBUG_RETURN(return_val);
3397 }
3398
3399
federated_rollback(handlerton * hton,THD * thd,bool all)3400 static int federated_rollback(handlerton *hton, THD *thd, bool all)
3401 {
3402 int return_val= 0;
3403 ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
3404 DBUG_ENTER("federated_rollback");
3405
3406 if (all)
3407 {
3408 int error= 0;
3409 ha_federated *ptr, *old= NULL;
3410 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3411 {
3412 if (old)
3413 old->trx_next= NULL;
3414 error= ptr->connection_rollback();
3415 if (error && !return_val)
3416 return_val= error;
3417 }
3418 thd_set_ha_data(thd, hton, NULL);
3419 }
3420
3421 DBUG_PRINT("info", ("error val: %d", return_val));
3422 DBUG_RETURN(return_val);
3423 }
3424
connection_commit()3425 int ha_federated::connection_commit()
3426 {
3427 DBUG_ENTER("ha_federated::connection_commit");
3428 DBUG_RETURN(execute_simple_query("COMMIT", 6));
3429 }
3430
3431
connection_rollback()3432 int ha_federated::connection_rollback()
3433 {
3434 DBUG_ENTER("ha_federated::connection_rollback");
3435 DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
3436 }
3437
3438
connection_autocommit(bool state)3439 int ha_federated::connection_autocommit(bool state)
3440 {
3441 const char *text;
3442 DBUG_ENTER("ha_federated::connection_autocommit");
3443 text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
3444 DBUG_RETURN(execute_simple_query(text, 16));
3445 }
3446
3447
execute_simple_query(const char * query,int len)3448 int ha_federated::execute_simple_query(const char *query, int len)
3449 {
3450 DBUG_ENTER("ha_federated::execute_simple_query");
3451
3452 if (mysql_real_query(mysql, query, (ulong)len))
3453 {
3454 DBUG_RETURN(stash_remote_error());
3455 }
3456 DBUG_RETURN(0);
3457 }
3458
3459 struct st_mysql_storage_engine federated_storage_engine=
3460 { MYSQL_HANDLERTON_INTERFACE_VERSION };
3461
mysql_declare_plugin(federated)3462 mysql_declare_plugin(federated)
3463 {
3464 MYSQL_STORAGE_ENGINE_PLUGIN,
3465 &federated_storage_engine,
3466 "FEDERATED",
3467 "Patrick Galbraith and Brian Aker, MySQL AB",
3468 "Federated MySQL storage engine",
3469 PLUGIN_LICENSE_GPL,
3470 federated_db_init, /* Plugin Init */
3471 federated_done, /* Plugin Deinit */
3472 0x0100 /* 1.0 */,
3473 NULL, /* status variables */
3474 NULL, /* system variables */
3475 NULL, /* config options */
3476 0, /* flags */
3477 }
3478 mysql_declare_plugin_end;
3479