1 /* Copyright (c) 2004, 2021, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software
21   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /*
24 
25   MySQL Federated Storage Engine
26 
27   ha_federated.cc - MySQL Federated Storage Engine
28   Patrick Galbraith and Brian Aker, 2004
29 
30   This is a handler which uses a foreign database as the data file, as
31   opposed to a handler like MyISAM, which uses .MYD files locally.
32 
33   How this handler works
34   ----------------------------------
35   Normal database files are local and as such: You create a table called
36   'users', a file such as 'users.MYD' is created. A handler reads, inserts,
37   deletes, updates data in this file. The data is stored in particular format,
38   so to read, that data has to be parsed into fields, to write, fields have to
39   be stored in this format to write to this data file.
40 
41   With MySQL Federated storage engine, there will be no local files
42   for each table's data (such as .MYD). A foreign database will store
43   the data that would normally be in this file. This will necessitate
44   the use of MySQL client API to read, delete, update, insert this
45   data. The data will have to be retrieve via an SQL call "SELECT *
46   FROM users". Then, to read this data, it will have to be retrieved
47   via mysql_fetch_row one row at a time, then converted from the
48   column in this select into the format that the handler expects.
49 
50   The create table will simply create the .frm file, and within the
51   "CREATE TABLE" SQL, there SHALL be any of the following :
52 
53   connection=scheme://username:password@hostname:port/database/tablename
54   connection=scheme://username@hostname/database/tablename
55   connection=scheme://username:password@hostname/database/tablename
56   connection=scheme://username:password@hostname/database/tablename
57 
58   - OR -
59 
60   As of 5.1 (See worklog #3031), federated now allows you to use a non-url
61   format, taking advantage of mysql.servers:
62 
63   connection="connection_one"
64   connection="connection_one/table_foo"
65 
66   An example would be:
67 
68   connection=mysql://username:password@hostname:port/database/tablename
69 
70   or, if we had:
71 
72   create server 'server_one' foreign data wrapper 'mysql' options
73   (HOST '127.0.0.1',
74   DATABASE 'db1',
75   USER 'root',
76   PASSWORD '',
77   PORT 3306,
78   SOCKET '',
79   OWNER 'root');
80 
81   CREATE TABLE federated.t1 (
82     `id` int(20) NOT NULL,
83     `name` varchar(64) NOT NULL default ''
84     )
85   ENGINE="FEDERATED" DEFAULT CHARSET=latin1
86   CONNECTION='server_one';
87 
88   So, this will have been the equivalent of
89 
90   CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
91 
92   Then, we can also change the server to point to a new schema:
93 
94   ALTER SERVER 'server_one' options(DATABASE 'db2');
95 
96   All subsequent calls will now be against db2.t1! Guess what? You don't
97   have to perform an alter table!
98 
99   This connecton="connection string" is necessary for the handler to be
100   able to connect to the foreign server, either by URL, or by server
101   name.
102 
103 
104   The basic flow is this:
105 
106   SQL calls issues locally ->
107   mysql handler API (data in handler format) ->
108   mysql client API (data converted to SQL calls) ->
109   foreign database -> mysql client API ->
110   convert result sets (if any) to handler format ->
111   handler API -> results or rows affected to local
112 
113   What this handler does and doesn't support
114   ------------------------------------------
115   * Tables MUST be created on the foreign server prior to any action on those
116     tables via the handler, first version. IMPORTANT: IF you MUST use the
117     federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
118     the table you connect to IS NOT a table pointing BACK to your ORIGNAL
119     table! You know  and have heard the screaching of audio feedback? You
120     know putting two mirror in front of each other how the reflection
121     continues for eternity? Well, need I say more?!
122   * There will not be support for transactions.
123   * There is no way for the handler to know if the foreign database or table
124     has changed. The reason for this is that this database has to work like a
125     data file that would never be written to by anything other than the
126     database. The integrity of the data in the local table could be breached
127     if there was any change to the foreign database.
128   * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
129   * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
130   * Prepared statements will not be used in the first implementation, it
131     remains to to be seen whether the limited subset of the client API for the
132     server supports this.
133   * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
134     implementation.
135   * This will not work with the query cache.
136 
137    Method calls
138 
139    A two column table, with one record:
140 
141    (SELECT)
142 
143    "SELECT * FROM foo"
144     ha_federated::info
145     ha_federated::scan_time:
146     ha_federated::rnd_init: share->select_query SELECT * FROM foo
147     ha_federated::extra
148 
149     <for every row of data retrieved>
150     ha_federated::rnd_next
151     ha_federated::convert_row_to_internal_format
152     ha_federated::rnd_next
153     </for every row of data retrieved>
154 
155     ha_federated::rnd_end
156     ha_federated::extra
157     ha_federated::reset
158 
159     (INSERT)
160 
161     "INSERT INTO foo (id, ts) VALUES (2, now());"
162 
163     ha_federated::write_row
164 
165     ha_federated::reset
166 
167     (UPDATE)
168 
169     "UPDATE foo SET ts = now() WHERE id = 1;"
170 
171     ha_federated::index_init
172     ha_federated::index_read
173     ha_federated::index_read_idx
174     ha_federated::rnd_next
175     ha_federated::convert_row_to_internal_format
176     ha_federated::update_row
177 
178     ha_federated::extra
179     ha_federated::extra
180     ha_federated::extra
181     ha_federated::external_lock
182     ha_federated::reset
183 
184 
185     How do I use this handler?
186     --------------------------
187     First of all, you need to build this storage engine:
188 
189       ./configure --with-federated-storage-engine
190       make
191 
192     Next, to use this handler, it's very simple. You must
193     have two databases running, either both on the same host, or
194     on different hosts.
195 
196     One the server that will be connecting to the foreign
197     host (client), you create your table as such:
198 
199     CREATE TABLE test_table (
200       id     int(20) NOT NULL auto_increment,
201       name   varchar(32) NOT NULL default '',
202       other  int(20) NOT NULL default '0',
203       PRIMARY KEY  (id),
204       KEY name (name),
205       KEY other_key (other))
206        ENGINE="FEDERATED"
207        DEFAULT CHARSET=latin1
208        CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
209 
210    Notice the "COMMENT" and "ENGINE" field? This is where you
211    respectively set the engine type, "FEDERATED" and foreign
212    host information, this being the database your 'client' database
213    will connect to and use as the "data file". Obviously, the foreign
214    database is running on port 9306, so you want to start up your other
215    database so that it is indeed on port 9306, and your federated
216    database on a port other than that. In my setup, I use port 5554
217    for federated, and port 5555 for the foreign database.
218 
219    Then, on the foreign database:
220 
221    CREATE TABLE test_table (
222      id     int(20) NOT NULL auto_increment,
223      name   varchar(32) NOT NULL default '',
224      other  int(20) NOT NULL default '0',
225      PRIMARY KEY  (id),
226      KEY name (name),
227      KEY other_key (other))
228      ENGINE="<NAME>" <-- whatever you want, or not specify
229      DEFAULT CHARSET=latin1 ;
230 
231     This table is exactly the same (and must be exactly the same),
232     except that it is not using the federated handler and does
233     not need the URL.
234 
235 
236     How to see the handler in action
237     --------------------------------
238 
239     When developing this handler, I compiled the federated database with
240     debugging:
241 
242     ./configure --with-federated-storage-engine
243     --prefix=/home/mysql/mysql-build/federated/ --with-debug
244 
245     Once compiled, I did a 'make install' (not for the purpose of installing
246     the binary, but to install all the files the binary expects to see in the
247     diretory I specified in the build with --prefix,
248     "/home/mysql/mysql-build/federated".
249 
250     Then, I started the foreign server:
251 
252     /usr/local/mysql/bin/mysqld_safe
253     --user=mysql --log=/tmp/mysqld.5555.log -P 5555
254 
255     Then, I went back to the directory containing the newly compiled mysqld,
256     <builddir>/sql/, started up gdb:
257 
258     gdb ./mysqld
259 
260     Then, withn the (gdb) prompt:
261     (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug
262 
263     Next, I open several windows for each:
264 
265     1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
266     2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
267     3. A window with a client open to the federated server on port 5554
268     4. A window with a client open to the federated server on port 5555
269 
270     I would create a table on the client to the foreign server on port
271     5555, and then to the federated server on port 5554. At this point,
272     I would run whatever queries I wanted to on the federated server,
273     just always remembering that whatever changes I wanted to make on
274     the table, or if I created new tables, that I would have to do that
275     on the foreign server.
276 
277     Another thing to look for is 'show variables' to show you that you have
278     support for federated handler support:
279 
280     show variables like '%federat%'
281 
282     and:
283 
284     show storage engines;
285 
286     Both should display the federated storage handler.
287 
288 
289     Testing
290     -------
291 
292     There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
293     federatedd.test It starts both a slave and master database using
294     the same setup that the replication tests use, with the exception that
295     it turns off replication, and sets replication to ignore the test tables.
296     After ensuring that you actually do have support for the federated storage
297     handler, numerous queries/inserts/updates/deletes are run, many derived
298     from the MyISAM tests, plus som other tests which were meant to reveal
299     any issues that would be most likely to affect this handler. All tests
300     should work! ;)
301 
302     To run these tests, go into ./mysql-test (based in the directory you
303     built the server in)
304 
305     ./mysql-test-run federated
306 
307     To run the test, or if you want to run the test and have debug info:
308 
309     ./mysql-test-run --debug federated
310 
311     This will run the test in debug mode, and you can view the trace and
312     log files in the ./mysql-test/var/log directory
313 
314     ls -l mysql-test/var/log/
315     -rw-r--r--  1 patg  patg        17  4 Dec 12:27 current_test
316     -rw-r--r--  1 patg  patg       692  4 Dec 12:52 manager.log
317     -rw-rw----  1 patg  patg     21246  4 Dec 12:51 master-bin.000001
318     -rw-rw----  1 patg  patg        68  4 Dec 12:28 master-bin.index
319     -rw-r--r--  1 patg  patg      1620  4 Dec 12:51 master.err
320     -rw-rw----  1 patg  patg     23179  4 Dec 12:51 master.log
321     -rw-rw----  1 patg  patg  16696550  4 Dec 12:51 master.trace
322     -rw-r--r--  1 patg  patg         0  4 Dec 12:28 mysqltest-time
323     -rw-r--r--  1 patg  patg   2024051  4 Dec 12:51 mysqltest.trace
324     -rw-rw----  1 patg  patg     94992  4 Dec 12:51 slave-bin.000001
325     -rw-rw----  1 patg  patg        67  4 Dec 12:28 slave-bin.index
326     -rw-rw----  1 patg  patg       249  4 Dec 12:52 slave-relay-bin.000003
327     -rw-rw----  1 patg  patg        73  4 Dec 12:28 slave-relay-bin.index
328     -rw-r--r--  1 patg  patg      1349  4 Dec 12:51 slave.err
329     -rw-rw----  1 patg  patg     96206  4 Dec 12:52 slave.log
330     -rw-rw----  1 patg  patg  15706355  4 Dec 12:51 slave.trace
331     -rw-r--r--  1 patg  patg         0  4 Dec 12:51 warnings
332 
333     Of course, again, you can tail the trace log:
334 
335     tail -f mysql-test/var/log/master.trace |grep ha_fed
336 
337     As well as the slave query log:
338 
339     tail -f mysql-test/var/log/slave.log
340 
341     Files that comprise the test suit
342     ---------------------------------
343     mysql-test/t/federated.test
344     mysql-test/r/federated.result
345     mysql-test/r/have_federated_db.require
346     mysql-test/include/have_federated_db.inc
347 
348 
349     Other tidbits
350     -------------
351 
352     These were the files that were modified or created for this
353     Federated handler to work, in 5.0:
354 
355     ./configure.in
356     ./sql/Makefile.am
357     ./config/ac_macros/ha_federated.m4
358     ./sql/handler.cc
359     ./sql/mysqld.cc
360     ./sql/set_var.cc
361     ./sql/field.h
362     ./sql/sql_string.h
363     ./mysql-test/mysql-test-run(.sh)
364     ./mysql-test/t/federated.test
365     ./mysql-test/r/federated.result
366     ./mysql-test/r/have_federated_db.require
367     ./mysql-test/include/have_federated_db.inc
368     ./sql/ha_federated.cc
369     ./sql/ha_federated.h
370 
371     In 5.1
372 
373     my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
374     CMakeLists.txt                  Makefile.in                     ha_federated.h                  plug.in
375     Makefile                        SCCS                            libfederated.a
376     Makefile.am                     ha_federated.cc                 libfederated_a-ha_federated.o
377 
378 */
379 
380 
381 #define MYSQL_SERVER 1
382 #include "sql_servers.h"         // FOREIGN_SERVER, get_server_by_name
383 #include "sql_class.h"           // SSV
384 #include "sql_analyse.h"         // append_escaped
385 #include <mysql/plugin.h>
386 
387 #include "ha_federated.h"
388 #include "probes_mysql.h"
389 
390 #include "m_string.h"
391 #include "key.h"                                // key_copy
392 #include "myisam.h"                             // TT_USEFRM
393 
394 #include <mysql/plugin.h>
395 
396 #include <algorithm>
397 
398 using std::min;
399 using std::max;
400 
401 /* Variables for federated share methods */
402 static HASH federated_open_tables;              // To track open tables
403 mysql_mutex_t federated_mutex;                // To init the hash
404 static char ident_quote_char= '`';              // Character for quoting
405                                                 // identifiers
406 static char value_quote_char= '\'';             // Character for quoting
407                                                 // literals
408 static const int bulk_padding= 64;              // bytes "overhead" in packet
409 
410 /* Variables used when chopping off trailing characters */
411 static const uint sizeof_trailing_comma= sizeof(", ") - 1;
412 static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
413 static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
414 
415 /* Static declaration for handerton */
416 static handler *federated_create_handler(handlerton *hton,
417                                          TABLE_SHARE *table,
418                                          MEM_ROOT *mem_root);
419 static int federated_commit(handlerton *hton, THD *thd, bool all);
420 static int federated_rollback(handlerton *hton, THD *thd, bool all);
421 
422 /* Federated storage engine handlerton */
423 
federated_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)424 static handler *federated_create_handler(handlerton *hton,
425                                          TABLE_SHARE *table,
426                                          MEM_ROOT *mem_root)
427 {
428   return new (mem_root) ha_federated(hton, table);
429 }
430 
431 
432 /* Function we use in the creation of our hash to get key */
433 
federated_get_key(FEDERATED_SHARE * share,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))434 static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
435                                 my_bool not_used MY_ATTRIBUTE ((unused)))
436 {
437   *length= share->share_key_length;
438   return (uchar*) share->share_key;
439 }
440 
441 static PSI_memory_key fe_key_memory_federated_share;
442 
443 #ifdef HAVE_PSI_INTERFACE
444 static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;
445 
446 static PSI_mutex_info all_federated_mutexes[]=
447 {
448   { &fe_key_mutex_federated, "federated", PSI_FLAG_GLOBAL},
449   { &fe_key_mutex_FEDERATED_SHARE_mutex, "FEDERATED_SHARE::mutex", 0}
450 };
451 
452 static PSI_memory_info all_federated_memory[]=
453 {
454   { &fe_key_memory_federated_share, "FEDERATED_SHARE", PSI_FLAG_GLOBAL}
455 };
456 
init_federated_psi_keys(void)457 static void init_federated_psi_keys(void)
458 {
459   const char* category= "federated";
460   int count;
461 
462   count= array_elements(all_federated_mutexes);
463   mysql_mutex_register(category, all_federated_mutexes, count);
464 
465   count= array_elements(all_federated_memory);
466   mysql_memory_register(category, all_federated_memory, count);
467 }
468 #endif /* HAVE_PSI_INTERFACE */
469 
470 /*
471   Initialize the federated handler.
472 
473   SYNOPSIS
474     federated_db_init()
475     p		Handlerton
476 
477   RETURN
478     FALSE       OK
479     TRUE        Error
480 */
481 
federated_db_init(void * p)482 int federated_db_init(void *p)
483 {
484   DBUG_ENTER("federated_db_init");
485 
486 #ifdef HAVE_PSI_INTERFACE
487   init_federated_psi_keys();
488 #endif /* HAVE_PSI_INTERFACE */
489 
490   handlerton *federated_hton= (handlerton *)p;
491   federated_hton->state= SHOW_OPTION_YES;
492   federated_hton->db_type= DB_TYPE_FEDERATED_DB;
493   federated_hton->commit= federated_commit;
494   federated_hton->rollback= federated_rollback;
495   federated_hton->create= federated_create_handler;
496   federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION |
497     HTON_SUPPORTS_ONLINE_BACKUPS;
498 
499   /*
500     Support for transactions disabled until WL#2952 fixes it.
501 	We do it like this to avoid "defined but not used" compiler warnings.
502   */
503   federated_hton->commit= 0;
504   federated_hton->rollback= 0;
505 
506   if (mysql_mutex_init(fe_key_mutex_federated,
507                        &federated_mutex, MY_MUTEX_INIT_FAST))
508     goto error;
509   if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
510                     (my_hash_get_key) federated_get_key, 0, 0,
511                     fe_key_memory_federated_share))
512   {
513     DBUG_RETURN(FALSE);
514   }
515 
516   mysql_mutex_destroy(&federated_mutex);
517 error:
518   DBUG_RETURN(TRUE);
519 }
520 
521 
522 /*
523   Release the federated handler.
524 
525   SYNOPSIS
526     federated_db_end()
527 
528   RETURN
529     FALSE       OK
530 */
531 
federated_done(void * p)532 int federated_done(void *p)
533 {
534   my_hash_free(&federated_open_tables);
535   mysql_mutex_destroy(&federated_mutex);
536 
537   return 0;
538 }
539 
540 
541 /**
542   @brief Append identifiers to the string.
543 
544   @param[in,out] string	The target string.
545   @param[in] name       Identifier name
546   @param[in] length     Length of identifier name in bytes
547   @param[in] quote_char Quote char to use for quoting identifier.
548 
549   @return Operation Status
550   @retval FALSE OK
551   @retval TRUE  There was an error appending to the string.
552 
553   @note This function is based upon the append_identifier() function
554         in sql_show.cc except that quoting always occurs.
555 */
556 
append_ident(String * string,const char * name,size_t length,const char quote_char)557 static bool append_ident(String *string, const char *name, size_t length,
558                          const char quote_char)
559 {
560   bool result= true;
561   DBUG_ENTER("append_ident");
562 
563   if (quote_char)
564   {
565     string->reserve(length * 2 + 2);
566 
567     if ((result= string->append(&quote_char, 1, system_charset_info)))
568       goto err;
569 
570     uint clen= 0;
571 
572     for (const char *name_end= name + length; name < name_end; name+= clen)
573     {
574       char c= *name;
575 
576       if (!(clen= my_mbcharlen(system_charset_info, c)))
577         goto err;
578 
579       if (clen == 1 && c == quote_char &&
580           (result= string->append(&quote_char, 1, system_charset_info)))
581         goto err;
582 
583       if ((result= string->append(name, clen, string->charset())))
584         goto err;
585     }
586     result= string->append(&quote_char, 1, system_charset_info);
587   }
588   else
589     result= string->append(name, length, system_charset_info);
590 
591 err:
592   DBUG_RETURN(result);
593 }
594 
595 
parse_url_error(FEDERATED_SHARE * share,TABLE * table,int error_num)596 static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
597 {
598   char buf[FEDERATED_QUERY_BUFFER_SIZE];
599   size_t buf_len;
600   DBUG_ENTER("ha_federated parse_url_error");
601 
602   buf_len= min<size_t>(table->s->connect_string.length,
603                        FEDERATED_QUERY_BUFFER_SIZE-1);
604   strmake(buf, table->s->connect_string.str, buf_len);
605   my_error(error_num, MYF(0), buf);
606   DBUG_RETURN(error_num);
607 }
608 
609 /*
610   retrieve server object which contains server meta-data
611   from the system table given a server's name, set share
612   connection parameter members
613 */
get_connection(MEM_ROOT * mem_root,FEDERATED_SHARE * share)614 int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
615 {
616   int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
617   FOREIGN_SERVER *server, server_buffer;
618   DBUG_ENTER("ha_federated::get_connection");
619 
620   /*
621     get_server_by_name() clones the server if exists and allocates
622 	copies of strings in the supplied mem_root
623   */
624   if (!(server=
625        get_server_by_name(mem_root, share->connection_string, &server_buffer)))
626   {
627     DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
628     error_num= ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE;
629     goto error;
630   }
631   DBUG_PRINT("info", ("get_server_by_name returned server at %lx",
632                       (long unsigned int) server));
633 
634   /*
635     Most of these should never be empty strings, error handling will
636     need to be implemented. Also, is this the best way to set the share
637     members? Is there some allocation needed? In running this code, it works
638     except there are errors in the trace file of the share being overrun
639     at the address of the share.
640   */
641   share->server_name_length= server->server_name_length;
642   share->server_name= server->server_name;
643   share->username= server->username;
644   share->password= server->password;
645   share->database= server->db;
646   share->port= server->port > 0 && server->port < 65536 ?
647                (ushort) server->port : MYSQL_PORT;
648   share->hostname= server->host;
649   if (!(share->socket= server->socket) &&
650       !strcmp(share->hostname, my_localhost))
651     share->socket= (char *) MYSQL_UNIX_ADDR;
652   share->scheme= server->scheme;
653 
654   DBUG_PRINT("info", ("share->username %s", share->username));
655   DBUG_PRINT("info", ("share->password %s", share->password));
656   DBUG_PRINT("info", ("share->hostname %s", share->hostname));
657   DBUG_PRINT("info", ("share->database %s", share->database));
658   DBUG_PRINT("info", ("share->port %d", share->port));
659   DBUG_PRINT("info", ("share->socket %s", share->socket));
660   DBUG_RETURN(0);
661 
662 error:
663   my_printf_error(error_num, "server name: '%s' doesn't exist!",
664                   MYF(0), share->connection_string);
665   DBUG_RETURN(error_num);
666 }
667 
668 /*
669   Parse connection info from table->s->connect_string
670 
671   SYNOPSIS
672     parse_url()
673     mem_root            MEM_ROOT pointer for memory allocation
674     share               pointer to FEDERATED share
675     table               pointer to current TABLE class
676     table_create_flag   determines what error to throw
677 
678   DESCRIPTION
679     Populates the share with information about the connection
680     to the foreign database that will serve as the data source.
681     This string must be specified (currently) in the "CONNECTION" field,
682     listed in the CREATE TABLE statement.
683 
684     This string MUST be in the format of any of these:
685 
686     CONNECTION="scheme://username:password@hostname:port/database/table"
687     CONNECTION="scheme://username@hostname/database/table"
688     CONNECTION="scheme://username@hostname:port/database/table"
689     CONNECTION="scheme://username:password@hostname/database/table"
690 
691     _OR_
692 
693     CONNECTION="connection name"
694 
695 
696 
697   An Example:
698 
699   CREATE TABLE t1 (id int(32))
700     ENGINE="FEDERATED"
701     CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federated/testtable";
702 
703   CREATE TABLE t2 (
704     id int(4) NOT NULL auto_increment,
705     name varchar(32) NOT NULL,
706     PRIMARY KEY(id)
707     ) ENGINE="FEDERATED" CONNECTION="my_conn";
708 
709   ***IMPORTANT***
710   Currently, the Federated Storage Engine only supports connecting to another
711   MySQL Database ("scheme" of "mysql"). Connections using JDBC as well as
712   other connectors are in the planning stage.
713 
714 
715   'password' and 'port' are both optional.
716 
717   RETURN VALUE
718     0           success
719     error_num   particular error code
720 
721 */
722 
parse_url(MEM_ROOT * mem_root,FEDERATED_SHARE * share,TABLE * table,uint table_create_flag)723 static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
724                      uint table_create_flag)
725 {
726   uint error_num= (table_create_flag ?
727                    ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
728                    ER_FOREIGN_DATA_STRING_INVALID);
729   DBUG_ENTER("ha_federated::parse_url");
730 
731   share->port= 0;
732   share->socket= 0;
733   DBUG_PRINT("info", ("share at %lx", (long unsigned int) share));
734   DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length));
735   DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length,
736                       table->s->connect_string.str));
737   share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
738                                        table->s->connect_string.length);
739 
740   DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
741                      (long unsigned int) share->connection_string));
742 
743   DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
744   /*
745     No :// or @ in connection string. Must be a straight connection name of
746     either "servername" or "servername/tablename"
747   */
748   if ( (!strstr(share->connection_string, "://") &&
749        (!strchr(share->connection_string, '@'))))
750   {
751 
752     DBUG_PRINT("info",
753                ("share->connection_string %s internal format \
754                 share->connection_string %lx",
755                 share->connection_string,
756                 (long unsigned int) share->connection_string));
757 
758     /* ok, so we do a little parsing, but not completely! */
759     share->parsed= FALSE;
760     /*
761       If there is a single '/' in the connection string, this means the user is
762       specifying a table name
763     */
764 
765     if ((share->table_name= strchr(share->connection_string, '/')))
766     {
767       share->connection_string[share->table_name - share->connection_string]= '\0';
768       share->table_name++;
769       share->table_name_length= (uint) strlen(share->table_name);
770 
771       DBUG_PRINT("info",
772                  ("internal format, parsed table_name share->connection_string \
773                   %s share->table_name %s",
774                   share->connection_string, share->table_name));
775 
776       /*
777         there better not be any more '/'s !
778       */
779       if (strchr(share->table_name, '/'))
780         goto error;
781 
782     }
783     /*
784       otherwise, straight server name, use tablename of federated table
785       as remote table name
786     */
787     else
788     {
789       /*
790         connection specifies everything but, resort to
791         expecting remote and foreign table names to match
792       */
793       share->table_name= strmake_root(mem_root, table->s->table_name.str,
794                                       (share->table_name_length= table->s->table_name.length));
795       DBUG_PRINT("info",
796                  ("internal format, default table_name share->connection_string \
797                   %s share->table_name %s",
798                   share->connection_string, share->table_name));
799     }
800 
801     if ((error_num= get_connection(mem_root, share)))
802       goto error;
803   }
804   else
805   {
806     share->parsed= TRUE;
807     // Add a null for later termination of table name
808     share->connection_string[table->s->connect_string.length]= 0;
809     share->scheme= share->connection_string;
810     DBUG_PRINT("info",("parse_url alloced share->scheme %lx",
811                        (long unsigned int) share->scheme));
812 
813     /*
814       remove addition of null terminator and store length
815       for each string  in share
816     */
817     if (!(share->username= strstr(share->scheme, "://")))
818       goto error;
819     share->scheme[share->username - share->scheme]= '\0';
820 
821     if (strcmp(share->scheme, "mysql") != 0)
822       goto error;
823 
824     share->username+= 3;
825 
826     if (!(share->hostname= strchr(share->username, '@')))
827       goto error;
828 
829     share->username[share->hostname - share->username]= '\0';
830     share->hostname++;
831 
832     if ((share->password= strchr(share->username, ':')))
833     {
834       share->username[share->password - share->username]= '\0';
835       share->password++;
836       share->username= share->username;
837       /* make sure there isn't an extra / or @ */
838       if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
839         goto error;
840       /*
841         Found that if the string is:
842         user:@hostname:port/db/table
843         Then password is a null string, so set to NULL
844       */
845       if (share->password[0] == '\0')
846         share->password= NULL;
847     }
848     else
849       share->username= share->username;
850 
851     /* make sure there isn't an extra / or @ */
852     if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
853       goto error;
854 
855     if (!(share->database= strchr(share->hostname, '/')))
856       goto error;
857     share->hostname[share->database - share->hostname]= '\0';
858     share->database++;
859 
860     if ((share->sport= strchr(share->hostname, ':')))
861     {
862       share->hostname[share->sport - share->hostname]= '\0';
863       share->sport++;
864       if (share->sport[0] == '\0')
865         share->sport= NULL;
866       else
867         share->port= atoi(share->sport);
868     }
869 
870     if (!(share->table_name= strchr(share->database, '/')))
871       goto error;
872     share->database[share->table_name - share->database]= '\0';
873     share->table_name++;
874 
875     share->table_name_length= strlen(share->table_name);
876 
877     /* make sure there's not an extra / */
878     if ((strchr(share->table_name, '/')))
879       goto error;
880 
881     /*
882       If hostname is omitted, we set it to NULL. According to
883       mysql_real_connect() manual:
884       The value of host may be either a hostname or an IP address.
885       If host is NULL or the string "localhost", a connection to the
886       local host is assumed.
887     */
888     if (share->hostname[0] == '\0')
889       share->hostname= NULL;
890   }
891 
892   if (!share->port)
893   {
894     if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
895       share->socket= (char*) MYSQL_UNIX_ADDR;
896     else
897       share->port= MYSQL_PORT;
898   }
899 
900   DBUG_PRINT("info",
901              ("scheme: %s  username: %s  password: %s \
902                hostname: %s  port: %d  db: %s  tablename: %s",
903               share->scheme, share->username, share->password,
904               share->hostname, share->port, share->database,
905               share->table_name));
906 
907   DBUG_RETURN(0);
908 
909 error:
910   DBUG_RETURN(parse_url_error(share, table, error_num));
911 }
912 
913 /*****************************************************************************
914 ** FEDERATED tables
915 *****************************************************************************/
916 
ha_federated(handlerton * hton,TABLE_SHARE * table_arg)917 ha_federated::ha_federated(handlerton *hton,
918                            TABLE_SHARE *table_arg)
919   :handler(hton, table_arg),
920    mysql(0), stored_result(0), results(fe_key_memory_federated_share)
921 {
922   trx_next= 0;
923   memset(&bulk_insert, 0, sizeof(bulk_insert));
924 }
925 
926 
927 /*
928   Convert MySQL result set row to handler internal format
929 
930   SYNOPSIS
931     convert_row_to_internal_format()
932       record    Byte pointer to record
933       row       MySQL result set row from fetchrow()
934       result	Result set to use
935 
936   DESCRIPTION
937     This method simply iterates through a row returned via fetchrow with
938     values from a successful SELECT , and then stores each column's value
939     in the field object via the field object pointer (pointing to the table's
940     array of field object pointers). This is how the handler needs the data
941     to be stored to then return results back to the user
942 
943   RETURN VALUE
944     0   After fields have had field values stored from record
945 */
946 
convert_row_to_internal_format(uchar * record,MYSQL_ROW row,MYSQL_RES * result)947 uint ha_federated::convert_row_to_internal_format(uchar *record,
948                                                   MYSQL_ROW row,
949                                                   MYSQL_RES *result)
950 {
951   ulong *lengths;
952   Field **field;
953   my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
954   DBUG_ENTER("ha_federated::convert_row_to_internal_format");
955 
956   lengths= mysql_fetch_lengths(result);
957 
958   for (field= table->field; *field; field++, row++, lengths++)
959   {
960     /*
961       index variable to move us through the row at the
962       same iterative step as the field
963     */
964     my_ptrdiff_t old_ptr;
965     old_ptr= (my_ptrdiff_t) (record - table->record[0]);
966     (*field)->move_field_offset(old_ptr);
967     if (!*row)
968     {
969       (*field)->set_null();
970       (*field)->reset();
971     }
972     else
973     {
974       if (bitmap_is_set(table->read_set, (*field)->field_index))
975       {
976         (*field)->set_notnull();
977         (*field)->store(*row, *lengths, &my_charset_bin);
978       }
979     }
980     (*field)->move_field_offset(-old_ptr);
981   }
982   dbug_tmp_restore_column_map(table->write_set, old_map);
983   DBUG_RETURN(0);
984 }
985 
emit_key_part_name(String * to,KEY_PART_INFO * part)986 static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
987 {
988   DBUG_ENTER("emit_key_part_name");
989   if (append_ident(to, part->field->field_name,
990                    strlen(part->field->field_name), ident_quote_char))
991     DBUG_RETURN(1);                           // Out of memory
992   DBUG_RETURN(0);
993 }
994 
emit_key_part_element(String * to,KEY_PART_INFO * part,bool needs_quotes,bool is_like,const uchar * ptr,uint len)995 static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
996                                   bool needs_quotes, bool is_like,
997                                   const uchar *ptr, uint len)
998 {
999   Field *field= part->field;
1000   DBUG_ENTER("emit_key_part_element");
1001 
1002   if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1003     DBUG_RETURN(1);
1004 
1005   if (part->type == HA_KEYTYPE_BIT)
1006   {
1007     char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
1008 
1009     *buf++= '0';
1010     *buf++= 'x';
1011     buf= octet2hex(buf, (char*) ptr, len);
1012     if (to->append((char*) buff, (uint)(buf - buff)))
1013       DBUG_RETURN(1);
1014   }
1015   else if (part->key_part_flag & HA_BLOB_PART)
1016   {
1017     String blob;
1018     uint blob_length= uint2korr(ptr);
1019     blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1020                    blob_length, &my_charset_bin);
1021     if (append_escaped(to, &blob))
1022       DBUG_RETURN(1);
1023   }
1024   else if (part->key_part_flag & HA_VAR_LENGTH_PART)
1025   {
1026     String varchar;
1027     uint var_length= uint2korr(ptr);
1028     varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1029                       var_length, &my_charset_bin);
1030     if (append_escaped(to, &varchar))
1031       DBUG_RETURN(1);
1032   }
1033   else
1034   {
1035     char strbuff[MAX_FIELD_WIDTH];
1036     String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
1037 
1038     res= field->val_str(&str, ptr);
1039 
1040     if (field->result_type() == STRING_RESULT)
1041     {
1042       if (append_escaped(to, res))
1043         DBUG_RETURN(1);
1044     }
1045     else if (to->append(res->ptr(), res->length()))
1046       DBUG_RETURN(1);
1047   }
1048 
1049   if (is_like && to->append(STRING_WITH_LEN("%")))
1050     DBUG_RETURN(1);
1051 
1052   if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1053     DBUG_RETURN(1);
1054 
1055   DBUG_RETURN(0);
1056 }
1057 
1058 /*
1059   Create a WHERE clause based off of values in keys
1060   Note: This code was inspired by key_copy from key.cc
1061 
1062   SYNOPSIS
1063     create_where_from_key ()
1064       to          String object to store WHERE clause
1065       key_info    KEY struct pointer
1066       key         byte pointer containing key
1067       key_length  length of key
1068       range_type  0 - no range, 1 - min range, 2 - max range
1069                   (see enum range_operation)
1070 
1071   DESCRIPTION
1072     Using iteration through all the keys via a KEY_PART_INFO pointer,
1073     This method 'extracts' the value of each key in the byte pointer
1074     *key, and for each key found, constructs an appropriate WHERE clause
1075 
1076   RETURN VALUE
1077     0   After all keys have been accounted for to create the WHERE clause
1078     1   No keys found
1079 
1080     Range flags Table per Timour:
1081 
1082    -----------------
1083    - start_key:
1084      * ">"  -> HA_READ_AFTER_KEY
1085      * ">=" -> HA_READ_KEY_OR_NEXT
1086      * "="  -> HA_READ_KEY_EXACT
1087 
1088    - end_key:
1089      * "<"  -> HA_READ_BEFORE_KEY
1090      * "<=" -> HA_READ_AFTER_KEY
1091 
1092    records_in_range:
1093    -----------------
1094    - start_key:
1095      * ">"  -> HA_READ_AFTER_KEY
1096      * ">=" -> HA_READ_KEY_EXACT
1097      * "="  -> HA_READ_KEY_EXACT
1098 
1099    - end_key:
1100      * "<"  -> HA_READ_BEFORE_KEY
1101      * "<=" -> HA_READ_AFTER_KEY
1102      * "="  -> HA_READ_AFTER_KEY
1103 
1104 0 HA_READ_KEY_EXACT,              Find first record else error
1105 1 HA_READ_KEY_OR_NEXT,            Record or next record
1106 2 HA_READ_KEY_OR_PREV,            Record or previous
1107 3 HA_READ_AFTER_KEY,              Find next rec. after key-record
1108 4 HA_READ_BEFORE_KEY,             Find next rec. before key-record
1109 5 HA_READ_PREFIX,                 Key which as same prefix
1110 6 HA_READ_PREFIX_LAST,            Last key with the same prefix
1111 7 HA_READ_PREFIX_LAST_OR_PREV,    Last or prev key with the same prefix
1112 
1113 Flags that I've found:
1114 
1115 id, primary key, varchar
1116 
1117 id = 'ccccc'
1118 records_in_range: start_key 0 end_key 3
1119 read_range_first: start_key 0 end_key NULL
1120 
1121 id > 'ccccc'
1122 records_in_range: start_key 3 end_key NULL
1123 read_range_first: start_key 3 end_key NULL
1124 
1125 id < 'ccccc'
1126 records_in_range: start_key NULL end_key 4
1127 read_range_first: start_key NULL end_key 4
1128 
1129 id <= 'ccccc'
1130 records_in_range: start_key NULL end_key 3
1131 read_range_first: start_key NULL end_key 3
1132 
1133 id >= 'ccccc'
1134 records_in_range: start_key 0 end_key NULL
1135 read_range_first: start_key 1 end_key NULL
1136 
1137 id like 'cc%cc'
1138 records_in_range: start_key 0 end_key 3
1139 read_range_first: start_key 1 end_key 3
1140 
1141 id > 'aaaaa' and id < 'ccccc'
1142 records_in_range: start_key 3 end_key 4
1143 read_range_first: start_key 3 end_key 4
1144 
1145 id >= 'aaaaa' and id < 'ccccc';
1146 records_in_range: start_key 0 end_key 4
1147 read_range_first: start_key 1 end_key 4
1148 
1149 id >= 'aaaaa' and id <= 'ccccc';
1150 records_in_range: start_key 0 end_key 3
1151 read_range_first: start_key 1 end_key 3
1152 
1153 id > 'aaaaa' and id <= 'ccccc';
1154 records_in_range: start_key 3 end_key 3
1155 read_range_first: start_key 3 end_key 3
1156 
1157 numeric keys:
1158 
1159 id = 4
1160 index_read_idx: start_key 0 end_key NULL
1161 
1162 id > 4
1163 records_in_range: start_key 3 end_key NULL
1164 read_range_first: start_key 3 end_key NULL
1165 
1166 id >= 4
1167 records_in_range: start_key 0 end_key NULL
1168 read_range_first: start_key 1 end_key NULL
1169 
1170 id < 4
1171 records_in_range: start_key NULL end_key 4
1172 read_range_first: start_key NULL end_key 4
1173 
1174 id <= 4
1175 records_in_range: start_key NULL end_key 3
1176 read_range_first: start_key NULL end_key 3
1177 
1178 id like 4
1179 full table scan, select * from
1180 
1181 id > 2 and id < 8
1182 records_in_range: start_key 3 end_key 4
1183 read_range_first: start_key 3 end_key 4
1184 
1185 id >= 2 and id < 8
1186 records_in_range: start_key 0 end_key 4
1187 read_range_first: start_key 1 end_key 4
1188 
1189 id >= 2 and id <= 8
1190 records_in_range: start_key 0 end_key 3
1191 read_range_first: start_key 1 end_key 3
1192 
1193 id > 2 and id <= 8
1194 records_in_range: start_key 3 end_key 3
1195 read_range_first: start_key 3 end_key 3
1196 
1197 multi keys (id int, name varchar, other varchar)
1198 
1199 id = 1;
1200 records_in_range: start_key 0 end_key 3
1201 read_range_first: start_key 0 end_key NULL
1202 
1203 id > 4;
1204 id > 2 and name = '333'; remote: id > 2
1205 id > 2 and name > '333'; remote: id > 2
1206 id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
1207 id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
1208 id >= 4 and name = 'eric was here' and other > 'eeee';
1209 records_in_range: start_key 3 end_key NULL
1210 read_range_first: start_key 3 end_key NULL
1211 
1212 id >= 4;
1213 id >= 2 and name = '333' and other < 'ddd';
1214 remote: `id`  >= 2 AND `name`  >= '333';
1215 records_in_range: start_key 0 end_key NULL
1216 read_range_first: start_key 1 end_key NULL
1217 
1218 id < 4;
1219 id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
1220 records_in_range: start_key NULL end_key 4
1221 read_range_first: start_key NULL end_key 4
1222 
1223 id <= 4;
1224 records_in_range: start_key NULL end_key 3
1225 read_range_first: start_key NULL end_key 3
1226 
1227 id like 4;
1228 full table scan
1229 
1230 id  > 2 and id < 4;
1231 records_in_range: start_key 3 end_key 4
1232 read_range_first: start_key 3 end_key 4
1233 
1234 id >= 2 and id < 4;
1235 records_in_range: start_key 0 end_key 4
1236 read_range_first: start_key 1 end_key 4
1237 
1238 id >= 2 and id <= 4;
1239 records_in_range: start_key 0 end_key 3
1240 read_range_first: start_key 1 end_key 3
1241 
1242 id > 2 and id <= 4;
1243 id = 6 and name = 'eric was here' and other > 'eeee';
1244 remote: (`id`  > 6 AND `name`  > 'eric was here' AND `other`  > 'eeee')
1245 AND (`id`  <= 6) AND ( AND `name`  <= 'eric was here')
1246 no results
1247 records_in_range: start_key 3 end_key 3
1248 read_range_first: start_key 3 end_key 3
1249 
1250 Summary:
1251 
1252 * If the start key flag is 0 the max key flag shouldn't even be set,
1253   and if it is, the query produced would be invalid.
1254 * Multipart keys, even if containing some or all numeric columns,
1255   are treated the same as non-numeric keys
1256 
1257   If the query is " = " (quotes or not):
1258   - records in range start key flag HA_READ_KEY_EXACT,
1259     end key flag HA_READ_AFTER_KEY (incorrect)
1260   - any other: start key flag HA_READ_KEY_OR_NEXT,
1261     end key flag HA_READ_AFTER_KEY (correct)
1262 
1263 * 'like' queries (of key)
1264   - Numeric, full table scan
1265   - Non-numeric
1266       records_in_range: start_key 0 end_key 3
1267       other : start_key 1 end_key 3
1268 
1269 * If the key flag is HA_READ_AFTER_KEY:
1270    if start_key, append >
1271    if end_key, append <=
1272 
1273 * If create_where_key was called by records_in_range:
1274 
1275  - if the key is numeric:
1276     start key flag is 0 when end key is NULL, end key flag is 3 or 4
1277  - if create_where_key was called by any other function:
1278     start key flag is 1 when end key is NULL, end key flag is 3 or 4
1279  - if the key is non-numeric, or multipart
1280     When the query is an exact match, the start key flag is 0,
1281     end key flag is 3 for what should be a no-range condition where
1282     you should have 0 and max key NULL, which it is if called by
1283     read_range_first
1284 
1285 Conclusion:
1286 
1287 1. Need logic to determin if a key is min or max when the flag is
1288 HA_READ_AFTER_KEY, and handle appending correct operator accordingly
1289 
1290 2. Need a boolean flag to pass to create_where_from_key, used in the
1291 switch statement. Add 1 to the flag if:
1292   - start key flag is HA_READ_KEY_EXACT and the end key is NULL
1293 
1294 */
1295 
create_where_from_key(String * to,KEY * key_info,const key_range * start_key,const key_range * end_key,bool from_records_in_range,bool eq_range_arg)1296 bool ha_federated::create_where_from_key(String *to,
1297                                          KEY *key_info,
1298                                          const key_range *start_key,
1299                                          const key_range *end_key,
1300                                          bool from_records_in_range,
1301                                          bool eq_range_arg)
1302 {
1303   bool both_not_null=
1304     (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1305   const uchar *ptr;
1306   uint remainder, length;
1307   char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
1308   String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
1309   const key_range *ranges[2]= { start_key, end_key };
1310   my_bitmap_map *old_map;
1311   DBUG_ENTER("ha_federated::create_where_from_key");
1312 
1313   tmp.length(0);
1314   if (start_key == NULL && end_key == NULL)
1315     DBUG_RETURN(1);
1316 
1317   old_map= dbug_tmp_use_all_columns(table, table->write_set);
1318   for (uint i= 0; i <= 1; i++)
1319   {
1320     bool needs_quotes;
1321     KEY_PART_INFO *key_part;
1322     if (ranges[i] == NULL)
1323       continue;
1324 
1325     if (both_not_null)
1326     {
1327       if (i > 0)
1328         tmp.append(STRING_WITH_LEN(") AND ("));
1329       else
1330         tmp.append(STRING_WITH_LEN(" ("));
1331     }
1332 
1333     for (key_part= key_info->key_part,
1334          remainder= key_info->user_defined_key_parts,
1335          length= ranges[i]->length,
1336          ptr= ranges[i]->key; ;
1337          remainder--,
1338          key_part++)
1339     {
1340       Field *field= key_part->field;
1341       uint store_length= key_part->store_length;
1342       uint part_length= min(store_length, length);
1343       needs_quotes= field->str_needs_quotes();
1344       DBUG_DUMP("key, start of loop", ptr, length);
1345 
1346       if (key_part->null_bit)
1347       {
1348         if (*ptr++)
1349         {
1350           /*
1351             We got "IS [NOT] NULL" condition against nullable column. We
1352             distinguish between "IS NOT NULL" and "IS NULL" by flag. For
1353             "IS NULL", flag is set to HA_READ_KEY_EXACT.
1354           */
1355           if (emit_key_part_name(&tmp, key_part) ||
1356               (ranges[i]->flag == HA_READ_KEY_EXACT ?
1357                tmp.append(STRING_WITH_LEN(" IS NULL ")) :
1358                tmp.append(STRING_WITH_LEN(" IS NOT NULL "))))
1359             goto err;
1360           /*
1361             We need to adjust pointer and length to be prepared for next
1362             key part. As well as check if this was last key part.
1363           */
1364           goto prepare_for_next_key_part;
1365         }
1366       }
1367 
1368       if (tmp.append(STRING_WITH_LEN(" (")))
1369         goto err;
1370 
1371       switch (ranges[i]->flag) {
1372       case HA_READ_KEY_EXACT:
1373         DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1374         if (store_length >= length ||
1375             !needs_quotes ||
1376             key_part->type == HA_KEYTYPE_BIT ||
1377             field->result_type() != STRING_RESULT)
1378         {
1379           if (emit_key_part_name(&tmp, key_part))
1380             goto err;
1381 
1382           if (from_records_in_range)
1383           {
1384             if (tmp.append(STRING_WITH_LEN(" >= ")))
1385               goto err;
1386           }
1387           else
1388           {
1389             if (tmp.append(STRING_WITH_LEN(" = ")))
1390               goto err;
1391           }
1392 
1393           if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1394                                     part_length))
1395             goto err;
1396         }
1397         else
1398         {
1399           /* LIKE */
1400           if (emit_key_part_name(&tmp, key_part) ||
1401               tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1402               emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1403                                     part_length))
1404             goto err;
1405         }
1406         break;
1407       case HA_READ_AFTER_KEY:
1408         if (eq_range_arg)
1409         {
1410           if (tmp.append("1=1"))                // Dummy
1411             goto err;
1412           break;
1413         }
1414         DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1415         if ((store_length >= length) || (i > 0)) /* for all parts of end key*/
1416         {
1417           if (emit_key_part_name(&tmp, key_part))
1418             goto err;
1419 
1420           if (i > 0) /* end key */
1421           {
1422             if (tmp.append(STRING_WITH_LEN(" <= ")))
1423               goto err;
1424           }
1425           else /* start key */
1426           {
1427             if (tmp.append(STRING_WITH_LEN(" > ")))
1428               goto err;
1429           }
1430 
1431           if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1432                                     part_length))
1433           {
1434             goto err;
1435           }
1436           break;
1437         }
1438         // Fall through
1439       case HA_READ_KEY_OR_NEXT:
1440         DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1441         if (emit_key_part_name(&tmp, key_part) ||
1442             tmp.append(STRING_WITH_LEN(" >= ")) ||
1443             emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1444               part_length))
1445           goto err;
1446         break;
1447       case HA_READ_BEFORE_KEY:
1448         DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1449         if (store_length >= length)
1450         {
1451           if (emit_key_part_name(&tmp, key_part) ||
1452               tmp.append(STRING_WITH_LEN(" < ")) ||
1453               emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1454                                     part_length))
1455             goto err;
1456           break;
1457         }
1458         // Fall through
1459       case HA_READ_KEY_OR_PREV:
1460         DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1461         if (emit_key_part_name(&tmp, key_part) ||
1462             tmp.append(STRING_WITH_LEN(" <= ")) ||
1463             emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1464                                   part_length))
1465           goto err;
1466         break;
1467       default:
1468         DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1469         goto err;
1470       }
1471       if (tmp.append(STRING_WITH_LEN(") ")))
1472         goto err;
1473 
1474 prepare_for_next_key_part:
1475       if (store_length >= length)
1476         break;
1477       DBUG_PRINT("info", ("remainder %d", remainder));
1478       assert(remainder > 1);
1479       length-= store_length;
1480       /*
1481         For nullable columns, null-byte is already skipped before, that is
1482         ptr was incremented by 1. Since store_length still counts null-byte,
1483         we need to subtract 1 from store_length.
1484       */
1485       ptr+= store_length - MY_TEST(key_part->null_bit);
1486       if (tmp.append(STRING_WITH_LEN(" AND ")))
1487         goto err;
1488 
1489       DBUG_PRINT("info",
1490                  ("create_where_from_key WHERE clause: %s",
1491                   tmp.c_ptr_quick()));
1492     }
1493   }
1494   dbug_tmp_restore_column_map(table->write_set, old_map);
1495 
1496   if (both_not_null)
1497     if (tmp.append(STRING_WITH_LEN(") ")))
1498       DBUG_RETURN(1);
1499 
1500   if (to->append(STRING_WITH_LEN(" WHERE ")))
1501     DBUG_RETURN(1);
1502 
1503   if (to->append(tmp))
1504     DBUG_RETURN(1);
1505 
1506   DBUG_RETURN(0);
1507 
1508 err:
1509   dbug_tmp_restore_column_map(table->write_set, old_map);
1510   DBUG_RETURN(1);
1511 }
1512 
1513 /*
1514   Example of simple lock controls. The "share" it creates is structure we will
1515   pass to each federated handler. Do you have to have one of these? Well, you
1516   have pieces that are used for locking, and they are needed to function.
1517 */
1518 
get_share(const char * table_name,TABLE * table)1519 static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
1520 {
1521   char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1522   Field **field;
1523   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
1524   FEDERATED_SHARE *share= NULL, tmp_share;
1525   MEM_ROOT mem_root;
1526   DBUG_ENTER("ha_federated.cc::get_share");
1527 
1528   /*
1529     In order to use this string, we must first zero it's length,
1530     or it will contain garbage
1531   */
1532   query.length(0);
1533 
1534   init_alloc_root(fe_key_memory_federated_share, &mem_root, 256, 0);
1535 
1536   mysql_mutex_lock(&federated_mutex);
1537 
1538   tmp_share.share_key= table_name;
1539   tmp_share.share_key_length= (uint) strlen(table_name);
1540   if (parse_url(&mem_root, &tmp_share, table, 0))
1541     goto error;
1542 
1543   /* TODO: change tmp_share.scheme to LEX_STRING object */
1544   if (!(share= (FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
1545                                                   (uchar*) tmp_share.share_key,
1546                                                   tmp_share.
1547                                                   share_key_length)))
1548   {
1549     query.set_charset(system_charset_info);
1550     query.append(STRING_WITH_LEN("SELECT "));
1551     for (field= table->field; *field; field++)
1552     {
1553       append_ident(&query, (*field)->field_name,
1554                    strlen((*field)->field_name), ident_quote_char);
1555       query.append(STRING_WITH_LEN(", "));
1556     }
1557     /* chops off trailing comma */
1558     query.length(query.length() - sizeof_trailing_comma);
1559 
1560     query.append(STRING_WITH_LEN(" FROM "));
1561 
1562     append_ident(&query, tmp_share.table_name,
1563                  tmp_share.table_name_length, ident_quote_char);
1564 
1565     if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1566         !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1)))
1567       goto error;
1568 
1569     share->use_count= 0;
1570     share->mem_root= mem_root;
1571 
1572     DBUG_PRINT("info",
1573                ("share->select_query %s", share->select_query));
1574 
1575     if (my_hash_insert(&federated_open_tables, (uchar*) share))
1576       goto error;
1577     thr_lock_init(&share->lock);
1578     mysql_mutex_init(fe_key_mutex_FEDERATED_SHARE_mutex,
1579                      &share->mutex, MY_MUTEX_INIT_FAST);
1580   }
1581   else
1582     free_root(&mem_root, MYF(0)); /* prevents memory leak */
1583 
1584   share->use_count++;
1585   mysql_mutex_unlock(&federated_mutex);
1586 
1587   DBUG_RETURN(share);
1588 
1589 error:
1590   mysql_mutex_unlock(&federated_mutex);
1591   free_root(&mem_root, MYF(0));
1592   DBUG_RETURN(NULL);
1593 }
1594 
1595 
1596 /*
1597   Free lock controls. We call this whenever we close a table.
1598   If the table had the last reference to the share then we
1599   free memory associated with it.
1600 */
1601 
free_share(FEDERATED_SHARE * share)1602 static int free_share(FEDERATED_SHARE *share)
1603 {
1604   MEM_ROOT mem_root= share->mem_root;
1605   DBUG_ENTER("free_share");
1606 
1607   mysql_mutex_lock(&federated_mutex);
1608   if (!--share->use_count)
1609   {
1610     my_hash_delete(&federated_open_tables, (uchar*) share);
1611     thr_lock_delete(&share->lock);
1612     mysql_mutex_destroy(&share->mutex);
1613     free_root(&mem_root, MYF(0));
1614   }
1615   mysql_mutex_unlock(&federated_mutex);
1616 
1617   DBUG_RETURN(0);
1618 }
1619 
1620 
records_in_range(uint inx,key_range * start_key,key_range * end_key)1621 ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
1622                                        key_range *end_key)
1623 {
1624   /*
1625 
1626   We really want indexes to be used as often as possible, therefore
1627   we just need to hard-code the return value to a very low number to
1628   force the issue
1629 
1630 */
1631   DBUG_ENTER("ha_federated::records_in_range");
1632   DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
1633 }
1634 /*
1635   If frm_error() is called then we will use this to to find out
1636   what file extentions exist for the storage engine. This is
1637   also used by the default rename_table and delete_table method
1638   in handler.cc.
1639 */
1640 
bas_ext() const1641 const char **ha_federated::bas_ext() const
1642 {
1643   static const char *ext[]=
1644   {
1645     NullS
1646   };
1647   return ext;
1648 }
1649 
1650 
1651 /*
1652   Used for opening tables. The name will be the name of the file.
1653   A table is opened when it needs to be opened. For instance
1654   when a request comes in for a select on the table (tables are not
1655   open and closed for each request, they are cached).
1656 
1657   Called from handler.cc by handler::ha_open(). The server opens
1658   all tables by calling ha_open() which then calls the handler
1659   specific open().
1660 */
1661 
open(const char * name,int mode,uint test_if_locked)1662 int ha_federated::open(const char *name, int mode, uint test_if_locked)
1663 {
1664   DBUG_ENTER("ha_federated::open");
1665 
1666   if (!(share= get_share(name, table)))
1667     DBUG_RETURN(1);
1668   thr_lock_data_init(&share->lock, &lock, NULL);
1669 
1670   assert(mysql == NULL);
1671 
1672   ref_length= sizeof(MYSQL_RES *) + sizeof(MYSQL_ROW_OFFSET);
1673   DBUG_PRINT("info", ("ref_length: %u", ref_length));
1674 
1675   reset();
1676 
1677   DBUG_RETURN(0);
1678 }
1679 
1680 
1681 /*
1682   Closes a table. We call the free_share() function to free any resources
1683   that we have allocated in the "shared" structure.
1684 
1685   Called from sql_base.cc, sql_select.cc, and table.cc.
1686   In sql_select.cc it is only used to close up temporary tables or during
1687   the process where a temporary table is converted over to being a
1688   myisam table.
1689   For sql_base.cc look at close_data_tables().
1690 */
1691 
close(void)1692 int ha_federated::close(void)
1693 {
1694   THD *thd= current_thd;
1695   DBUG_ENTER("ha_federated::close");
1696 
1697   free_result();
1698 
1699   results.clear();
1700 
1701   /*
1702     Check to verify wheather the connection is still alive or not.
1703     FLUSH TABLES will quit the connection and if connection is broken,
1704     it will reconnect again and quit silently.
1705   */
1706   if (mysql && (!mysql->net.vio || !vio_is_connected(mysql->net.vio)))
1707      mysql->net.error= 2;
1708 
1709   /* Disconnect from mysql */
1710   mysql_close(mysql);
1711   mysql= NULL;
1712 
1713   /*
1714     mysql_close() might return an error if a remote server's gone
1715     for some reason. If that happens while removing a table from
1716     the table cache, the error will be propagated to a client even
1717     if the original query was not issued against the FEDERATED table.
1718     So, don't propagate errors from mysql_close().
1719   */
1720   if (table->in_use && thd != table->in_use)
1721     table->in_use->clear_error();
1722 
1723   /*
1724     Errors from mysql_close() are silently ignored for flush tables.
1725     Close the connection silently.
1726   */
1727   if (thd && thd->lex->sql_command == SQLCOM_FLUSH)
1728      thd->clear_error();
1729 
1730   DBUG_RETURN(free_share(share));
1731 }
1732 
1733 
1734 /**
1735   @brief Construct the INSERT statement.
1736 
1737   @details This method will construct the INSERT statement and appends it to
1738   the supplied query string buffer.
1739 
1740   @return
1741     @retval FALSE       No error
1742     @retval TRUE        Failure
1743 */
1744 
append_stmt_insert(String * query)1745 bool ha_federated::append_stmt_insert(String *query)
1746 {
1747   char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1748   Field **field;
1749   size_t tmp_length;
1750   bool added_field= FALSE;
1751 
1752   /* The main insert query string */
1753   String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1754   DBUG_ENTER("ha_federated::append_stmt_insert");
1755 
1756   insert_string.length(0);
1757 
1758   if (replace_duplicates)
1759     insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
1760   else if (ignore_duplicates && !insert_dup_update)
1761     insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
1762   else
1763     insert_string.append(STRING_WITH_LEN("INSERT INTO "));
1764   append_ident(&insert_string, share->table_name, share->table_name_length,
1765                ident_quote_char);
1766   tmp_length= insert_string.length();
1767   insert_string.append(STRING_WITH_LEN(" ("));
1768 
1769   /*
1770     loop through the field pointer array, add any fields to both the values
1771     list and the fields list that match the current query id
1772   */
1773   for (field= table->field; *field; field++)
1774   {
1775     if (bitmap_is_set(table->write_set, (*field)->field_index))
1776     {
1777       /* append the field name */
1778       append_ident(&insert_string, (*field)->field_name,
1779                    strlen((*field)->field_name), ident_quote_char);
1780 
1781       /* append commas between both fields and fieldnames */
1782       /*
1783         unfortunately, we can't use the logic if *(fields + 1) to
1784         make the following appends conditional as we don't know if the
1785         next field is in the write set
1786       */
1787       insert_string.append(STRING_WITH_LEN(", "));
1788       added_field= TRUE;
1789     }
1790   }
1791 
1792   if (added_field)
1793   {
1794     /* Remove trailing comma. */
1795     insert_string.length(insert_string.length() - sizeof_trailing_comma);
1796     insert_string.append(STRING_WITH_LEN(") "));
1797   }
1798   else
1799   {
1800     /* If there were no fields, we don't want to add a closing paren. */
1801     insert_string.length(tmp_length);
1802   }
1803 
1804   insert_string.append(STRING_WITH_LEN(" VALUES "));
1805 
1806   DBUG_RETURN(query->append(insert_string));
1807 }
1808 
1809 
1810 /*
1811   write_row() inserts a row. No extra() hint is given currently if a bulk load
1812   is happeneding. buf() is a byte array of data. You can use the field
1813   information to extract the data from the native byte array type.
1814   Example of this would be:
1815   for (Field **field=table->field ; *field ; field++)
1816   {
1817     ...
1818   }
1819 
1820   Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
1821   sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
1822 */
1823 
write_row(uchar * buf)1824 int ha_federated::write_row(uchar *buf)
1825 {
1826   char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1827   char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1828   Field **field;
1829   size_t tmp_length;
1830   int error= 0;
1831   bool use_bulk_insert;
1832   bool auto_increment_update_required= (table->next_number_field != NULL);
1833 
1834   /* The string containing the values to be added to the insert */
1835   String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1836   /* The actual value of the field, to be added to the values_string */
1837   String insert_field_value_string(insert_field_value_buffer,
1838                                    sizeof(insert_field_value_buffer),
1839                                    &my_charset_bin);
1840   my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1841   DBUG_ENTER("ha_federated::write_row");
1842 
1843   values_string.length(0);
1844   insert_field_value_string.length(0);
1845   ha_statistic_increment(&SSV::ha_write_count);
1846 
1847   /*
1848     start both our field and field values strings
1849     We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
1850     Ignore duplicates is always true when insert_dup_update is true.
1851     When replace_duplicates == TRUE, we can safely enable multi-row insert.
1852     When performing multi-row insert, we only collect the columns values for
1853     the row. The start of the statement is only created when the first
1854     row is copied in to the bulk_insert string.
1855   */
1856   if (!(use_bulk_insert= bulk_insert.str &&
1857         (!insert_dup_update || replace_duplicates)))
1858     append_stmt_insert(&values_string);
1859 
1860   values_string.append(STRING_WITH_LEN(" ("));
1861   tmp_length= values_string.length();
1862 
1863   /*
1864     loop through the field pointer array, add any fields to both the values
1865     list and the fields list that is part of the write set
1866   */
1867   for (field= table->field; *field; field++)
1868   {
1869     if (bitmap_is_set(table->write_set, (*field)->field_index))
1870     {
1871       if ((*field)->is_null())
1872         values_string.append(STRING_WITH_LEN(" NULL "));
1873       else
1874       {
1875         bool needs_quote= (*field)->str_needs_quotes();
1876         (*field)->val_str(&insert_field_value_string);
1877         if (needs_quote)
1878           values_string.append(value_quote_char);
1879         insert_field_value_string.print(&values_string);
1880         if (needs_quote)
1881           values_string.append(value_quote_char);
1882 
1883         insert_field_value_string.length(0);
1884       }
1885 
1886       /* append commas between both fields and fieldnames */
1887       /*
1888         unfortunately, we can't use the logic if *(fields + 1) to
1889         make the following appends conditional as we don't know if the
1890         next field is in the write set
1891       */
1892       values_string.append(STRING_WITH_LEN(", "));
1893     }
1894   }
1895   dbug_tmp_restore_column_map(table->read_set, old_map);
1896 
1897   /*
1898     if there were no fields, we don't want to add a closing paren
1899     AND, we don't want to chop off the last char '('
1900     insert will be "INSERT INTO t1 VALUES ();"
1901   */
1902   if (values_string.length() > tmp_length)
1903   {
1904     /* chops off trailing comma */
1905     values_string.length(values_string.length() - sizeof_trailing_comma);
1906   }
1907   /* we always want to append this, even if there aren't any fields */
1908   values_string.append(STRING_WITH_LEN(") "));
1909 
1910   if (use_bulk_insert)
1911   {
1912     /*
1913       Send the current bulk insert out if appending the current row would
1914       cause the statement to overflow the packet size, otherwise set
1915       auto_increment_update_required to FALSE as no query was executed.
1916     */
1917     if (bulk_insert.length + values_string.length() + bulk_padding >
1918         mysql->net.max_packet_size && bulk_insert.length)
1919     {
1920       error= real_query(bulk_insert.str, bulk_insert.length);
1921       bulk_insert.length= 0;
1922     }
1923     else
1924       auto_increment_update_required= FALSE;
1925 
1926     if (bulk_insert.length == 0)
1927     {
1928       char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1929       String insert_string(insert_buffer, sizeof(insert_buffer),
1930                            &my_charset_bin);
1931       insert_string.length(0);
1932       append_stmt_insert(&insert_string);
1933       dynstr_append_mem(&bulk_insert, insert_string.ptr(),
1934                         insert_string.length());
1935     }
1936     else
1937       dynstr_append_mem(&bulk_insert, ",", 1);
1938 
1939     dynstr_append_mem(&bulk_insert, values_string.ptr(),
1940                       values_string.length());
1941   }
1942   else
1943   {
1944     error= real_query(values_string.ptr(), values_string.length());
1945   }
1946 
1947   if (error)
1948   {
1949     DBUG_RETURN(stash_remote_error());
1950   }
1951   /*
1952     If the table we've just written a record to contains an auto_increment
1953     field, then store the last_insert_id() value from the foreign server
1954   */
1955   if (auto_increment_update_required)
1956   {
1957     update_auto_increment();
1958 
1959     /* mysql_insert() uses this for protocol return value */
1960     table->next_number_field->store(stats.auto_increment_value, 1);
1961   }
1962 
1963   DBUG_RETURN(0);
1964 }
1965 
1966 
1967 /**
1968   @brief Prepares the storage engine for bulk inserts.
1969 
1970   @param[in] rows       estimated number of rows in bulk insert
1971                         or 0 if unknown.
1972 
1973   @details Initializes memory structures required for bulk insert.
1974 */
1975 
start_bulk_insert(ha_rows rows)1976 void ha_federated::start_bulk_insert(ha_rows rows)
1977 {
1978   uint page_size;
1979   DBUG_ENTER("ha_federated::start_bulk_insert");
1980 
1981   dynstr_free(&bulk_insert);
1982 
1983   /**
1984     We don't bother with bulk-insert semantics when the estimated rows == 1
1985     The rows value will be 0 if the server does not know how many rows
1986     would be inserted. This can occur when performing INSERT...SELECT
1987   */
1988 
1989   if (rows == 1)
1990     DBUG_VOID_RETURN;
1991 
1992   /*
1993     Make sure we have an open connection so that we know the
1994     maximum packet size.
1995   */
1996   if (!mysql && real_connect())
1997     DBUG_VOID_RETURN;
1998 
1999   page_size= (uint) my_getpagesize();
2000 
2001   if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
2002     DBUG_VOID_RETURN;
2003 
2004   bulk_insert.length= 0;
2005   DBUG_VOID_RETURN;
2006 }
2007 
2008 
2009 /**
2010   @brief End bulk insert.
2011 
2012   @details This method will send any remaining rows to the remote server.
2013   Finally, it will deinitialize the bulk insert data structure.
2014 
2015   @return Operation status
2016   @retval       0       No error
2017   @retval       != 0    Error occured at remote server. Also sets my_errno.
2018 */
2019 
end_bulk_insert()2020 int ha_federated::end_bulk_insert()
2021 {
2022   int error= 0;
2023   DBUG_ENTER("ha_federated::end_bulk_insert");
2024 
2025   if (bulk_insert.str && bulk_insert.length)
2026   {
2027     if (real_query(bulk_insert.str, bulk_insert.length))
2028       error= stash_remote_error();
2029     else
2030     if (table->next_number_field)
2031       update_auto_increment();
2032   }
2033 
2034   dynstr_free(&bulk_insert);
2035 
2036   set_my_errno(error);
2037   DBUG_RETURN(error);
2038 }
2039 
2040 
2041 /*
2042   ha_federated::update_auto_increment
2043 
2044   This method ensures that last_insert_id() works properly. What it simply does
2045   is calls last_insert_id() on the foreign database immediately after insert
2046   (if the table has an auto_increment field) and sets the insert id via
2047   thd->insert_id(ID)).
2048 */
update_auto_increment(void)2049 void ha_federated::update_auto_increment(void)
2050 {
2051   THD *thd= current_thd;
2052   DBUG_ENTER("ha_federated::update_auto_increment");
2053 
2054   ha_federated::info(HA_STATUS_AUTO);
2055   thd->first_successful_insert_id_in_cur_stmt=
2056     stats.auto_increment_value;
2057   DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
2058 
2059   DBUG_VOID_RETURN;
2060 }
2061 
optimize(THD * thd,HA_CHECK_OPT * check_opt)2062 int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
2063 {
2064   char query_buffer[STRING_BUFFER_USUAL_SIZE];
2065   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2066   DBUG_ENTER("ha_federated::optimize");
2067 
2068   query.length(0);
2069 
2070   query.set_charset(system_charset_info);
2071   query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
2072   append_ident(&query, share->table_name, share->table_name_length,
2073                ident_quote_char);
2074 
2075   if (real_query(query.ptr(), query.length()))
2076   {
2077     DBUG_RETURN(stash_remote_error());
2078   }
2079 
2080   DBUG_RETURN(0);
2081 }
2082 
2083 
repair(THD * thd,HA_CHECK_OPT * check_opt)2084 int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
2085 {
2086   char query_buffer[STRING_BUFFER_USUAL_SIZE];
2087   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2088   DBUG_ENTER("ha_federated::repair");
2089 
2090   query.length(0);
2091 
2092   query.set_charset(system_charset_info);
2093   query.append(STRING_WITH_LEN("REPAIR TABLE "));
2094   append_ident(&query, share->table_name, share->table_name_length,
2095                ident_quote_char);
2096   if (check_opt->flags & T_QUICK)
2097     query.append(STRING_WITH_LEN(" QUICK"));
2098   if (check_opt->flags & T_EXTEND)
2099     query.append(STRING_WITH_LEN(" EXTENDED"));
2100   if (check_opt->sql_flags & TT_USEFRM)
2101     query.append(STRING_WITH_LEN(" USE_FRM"));
2102 
2103   if (real_query(query.ptr(), query.length()))
2104   {
2105     DBUG_RETURN(stash_remote_error());
2106   }
2107 
2108   DBUG_RETURN(0);
2109 }
2110 
2111 
2112 /*
2113   Yes, update_row() does what you expect, it updates a row. old_data will have
2114   the previous row record in it, while new_data will have the newest data in
2115   it.
2116 
2117   Keep in mind that the server can do updates based on ordering if an ORDER BY
2118   clause was used. Consecutive ordering is not guaranteed.
2119 
2120   Currently new_data will not have an updated AUTO_INCREMENT record. You can
2121   do this for federated by doing the following:
2122 
2123     if (table->next_number_field && record == table->record[0])
2124       update_auto_increment();
2125 
2126   Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
2127 */
2128 
update_row(const uchar * old_data,uchar * new_data)2129 int ha_federated::update_row(const uchar *old_data, uchar *new_data)
2130 {
2131   /*
2132     This used to control how the query was built. If there was a
2133     primary key, the query would be built such that there was a where
2134     clause with only that column as the condition. This is flawed,
2135     because if we have a multi-part primary key, it would only use the
2136     first part! We don't need to do this anyway, because
2137     read_range_first will retrieve the correct record, which is what
2138     is used to build the WHERE clause. We can however use this to
2139     append a LIMIT to the end if there is NOT a primary key. Why do
2140     this? Because we only are updating one record, and LIMIT enforces
2141     this.
2142   */
2143   bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY);
2144 
2145   /*
2146     buffers for following strings
2147   */
2148   char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2149   char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2150   char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2151 
2152   /* Work area for field values */
2153   String field_value(field_value_buffer, sizeof(field_value_buffer),
2154                      &my_charset_bin);
2155   /* stores the update query */
2156   String update_string(update_buffer,
2157                        sizeof(update_buffer),
2158                        &my_charset_bin);
2159   /* stores the WHERE clause */
2160   String where_string(where_buffer,
2161                       sizeof(where_buffer),
2162                       &my_charset_bin);
2163   uchar *record= table->record[0];
2164   DBUG_ENTER("ha_federated::update_row");
2165   /*
2166     set string lengths to 0 to avoid misc chars in string
2167   */
2168   field_value.length(0);
2169   update_string.length(0);
2170   where_string.length(0);
2171 
2172   if (ignore_duplicates)
2173     update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
2174   else
2175     update_string.append(STRING_WITH_LEN("UPDATE "));
2176   append_ident(&update_string, share->table_name,
2177                share->table_name_length, ident_quote_char);
2178   update_string.append(STRING_WITH_LEN(" SET "));
2179 
2180   /*
2181     In this loop, we want to match column names to values being inserted
2182     (while building INSERT statement).
2183 
2184     Iterate through table->field (new data) and share->old_field (old_data)
2185     using the same index to create an SQL UPDATE statement. New data is
2186     used to create SET field=value and old data is used to create WHERE
2187     field=oldvalue
2188   */
2189 
2190   for (Field **field= table->field; *field; field++)
2191   {
2192     if (bitmap_is_set(table->write_set, (*field)->field_index))
2193     {
2194       size_t field_name_length= strlen((*field)->field_name);
2195       append_ident(&update_string, (*field)->field_name, field_name_length,
2196                    ident_quote_char);
2197       update_string.append(STRING_WITH_LEN(" = "));
2198 
2199       if ((*field)->is_null())
2200         update_string.append(STRING_WITH_LEN(" NULL "));
2201       else
2202       {
2203         /* otherwise = */
2204         my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2205         bool needs_quote= (*field)->str_needs_quotes();
2206 	(*field)->val_str(&field_value);
2207         if (needs_quote)
2208           update_string.append(value_quote_char);
2209         field_value.print(&update_string);
2210         if (needs_quote)
2211           update_string.append(value_quote_char);
2212         field_value.length(0);
2213         tmp_restore_column_map(table->read_set, old_map);
2214       }
2215       update_string.append(STRING_WITH_LEN(", "));
2216     }
2217 
2218     if (bitmap_is_set(table->read_set, (*field)->field_index))
2219     {
2220       size_t field_name_length= strlen((*field)->field_name);
2221       append_ident(&where_string, (*field)->field_name, field_name_length,
2222                    ident_quote_char);
2223       if ((*field)->is_null_in_record(old_data))
2224         where_string.append(STRING_WITH_LEN(" IS NULL "));
2225       else
2226       {
2227         bool needs_quote= (*field)->str_needs_quotes();
2228         where_string.append(STRING_WITH_LEN(" = "));
2229         (*field)->val_str(&field_value,
2230                           (old_data + (*field)->offset(record)));
2231         if (needs_quote)
2232           where_string.append(value_quote_char);
2233         field_value.print(&where_string);
2234         if (needs_quote)
2235           where_string.append(value_quote_char);
2236         field_value.length(0);
2237       }
2238       where_string.append(STRING_WITH_LEN(" AND "));
2239     }
2240   }
2241 
2242   /* Remove last ', '. This works as there must be at least on updated field */
2243   update_string.length(update_string.length() - sizeof_trailing_comma);
2244 
2245   if (where_string.length())
2246   {
2247     /* chop off trailing AND */
2248     where_string.length(where_string.length() - sizeof_trailing_and);
2249     update_string.append(STRING_WITH_LEN(" WHERE "));
2250     update_string.append(where_string);
2251   }
2252 
2253   /*
2254     If this table has not a primary key, then we could possibly
2255     update multiple rows. We want to make sure to only update one!
2256   */
2257   if (!has_a_primary_key)
2258     update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2259 
2260   if (real_query(update_string.ptr(), update_string.length()))
2261   {
2262     DBUG_RETURN(stash_remote_error());
2263   }
2264   DBUG_RETURN(0);
2265 }
2266 
2267 /*
2268   This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2269   The server will call this right after the current row has been called (from
2270   either a previous rnd_next() or index call).
2271   If you keep a pointer to the last row or can access a primary key it will
2272   make doing the deletion quite a bit easier.
2273   Keep in mind that the server does no guarentee consecutive deletions.
2274   ORDER BY clauses can be used.
2275 
2276   Called in sql_acl.cc and sql_udf.cc to manage internal table information.
2277   Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2278   it is used for removing duplicates while in insert it is used for REPLACE
2279   calls.
2280 */
2281 
delete_row(const uchar * buf)2282 int ha_federated::delete_row(const uchar *buf)
2283 {
2284   char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2285   char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2286   String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
2287   String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2288   uint found= 0;
2289   DBUG_ENTER("ha_federated::delete_row");
2290 
2291   delete_string.length(0);
2292   if (ignore_duplicates)
2293     delete_string.append(STRING_WITH_LEN("DELETE IGNORE FROM "));
2294   else
2295     delete_string.append(STRING_WITH_LEN("DELETE FROM "));
2296   append_ident(&delete_string, share->table_name,
2297                share->table_name_length, ident_quote_char);
2298   delete_string.append(STRING_WITH_LEN(" WHERE "));
2299 
2300   for (Field **field= table->field; *field; field++)
2301   {
2302     Field *cur_field= *field;
2303     found++;
2304     if (bitmap_is_set(table->read_set, cur_field->field_index))
2305     {
2306       append_ident(&delete_string, (*field)->field_name,
2307                    strlen((*field)->field_name), ident_quote_char);
2308       data_string.length(0);
2309       if (cur_field->is_null())
2310       {
2311         delete_string.append(STRING_WITH_LEN(" IS NULL "));
2312       }
2313       else
2314       {
2315         bool needs_quote= cur_field->str_needs_quotes();
2316         delete_string.append(STRING_WITH_LEN(" = "));
2317         cur_field->val_str(&data_string);
2318         if (needs_quote)
2319           delete_string.append(value_quote_char);
2320         data_string.print(&delete_string);
2321         if (needs_quote)
2322           delete_string.append(value_quote_char);
2323       }
2324       delete_string.append(STRING_WITH_LEN(" AND "));
2325     }
2326   }
2327 
2328   // Remove trailing AND
2329   delete_string.length(delete_string.length() - sizeof_trailing_and);
2330   if (!found)
2331     delete_string.length(delete_string.length() - sizeof_trailing_where);
2332 
2333   delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2334   DBUG_PRINT("info",
2335              ("Delete sql: %s", delete_string.c_ptr_quick()));
2336   if (real_query(delete_string.ptr(), delete_string.length()))
2337   {
2338     DBUG_RETURN(stash_remote_error());
2339   }
2340   stats.deleted+= (ha_rows) mysql->affected_rows;
2341   stats.records-= (ha_rows) mysql->affected_rows;
2342   DBUG_PRINT("info",
2343              ("rows deleted %ld  rows deleted for all time %ld",
2344               (long) mysql->affected_rows, (long) stats.deleted));
2345 
2346   DBUG_RETURN(0);
2347 }
2348 
index_read_idx_map(uchar * buf,uint index,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)2349 int ha_federated::index_read_idx_map(uchar *buf, uint index, const uchar *key,
2350                                 key_part_map keypart_map,
2351                                 enum ha_rkey_function find_flag)
2352 {
2353   int error= index_init(index, 0);
2354   if (error)
2355     return error;
2356   error= index_read_map(buf, key, keypart_map, find_flag);
2357   if(!error && stored_result)
2358   {
2359     uchar *dummy_arg=NULL;
2360     position(dummy_arg);
2361   }
2362   int error1= index_end();
2363   return error ?  error : error1;
2364 }
2365 
2366 /*
2367   Positions an index cursor to the index specified in the handle. Fetches the
2368   row if available. If the key value is null, begin at the first key of the
2369   index. This method, which is called in the case of an SQL statement having
2370   a WHERE clause on a non-primary key index, simply calls index_read_idx.
2371 */
2372 
index_read(uchar * buf,const uchar * key,uint key_len,ha_rkey_function find_flag)2373 int ha_federated::index_read(uchar *buf, const uchar *key,
2374                              uint key_len, ha_rkey_function find_flag)
2375 {
2376   int rc;
2377   DBUG_ENTER("ha_federated::index_read");
2378 
2379   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2380   free_result();
2381   rc= index_read_idx_with_result_set(buf, active_index, key,
2382                                      key_len, find_flag,
2383                                      &stored_result);
2384   MYSQL_INDEX_READ_ROW_DONE(rc);
2385   DBUG_RETURN(rc);
2386 }
2387 
2388 
2389 /*
2390   Positions an index cursor to the index specified in key. Fetches the
2391   row if any.  This is only used to read whole keys.
2392 
2393   This method is called via index_read in the case of a WHERE clause using
2394   a primary key index OR is called DIRECTLY when the WHERE clause
2395   uses a PRIMARY KEY index.
2396 
2397   NOTES
2398     This uses an internal result set that is deleted before function
2399     returns.  We need to be able to be calable from ha_rnd_pos()
2400 */
2401 
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)2402 int ha_federated::index_read_idx(uchar *buf, uint index, const uchar *key,
2403                                  uint key_len, enum ha_rkey_function find_flag)
2404 {
2405   int retval;
2406   MYSQL_RES *mysql_result;
2407   DBUG_ENTER("ha_federated::index_read_idx");
2408 
2409   if ((retval= index_read_idx_with_result_set(buf, index, key,
2410                                               key_len, find_flag,
2411                                               &mysql_result)))
2412     DBUG_RETURN(retval);
2413   mysql_free_result(mysql_result);
2414   results.pop_back();
2415   DBUG_RETURN(0);
2416 }
2417 
2418 
2419 /*
2420   Create result set for rows matching query and return first row
2421 
2422   RESULT
2423     0	ok     In this case *result will contain the result set
2424 	       table->status == 0
2425     #   error  In this case *result will contain 0
2426                table->status == STATUS_NOT_FOUND
2427 */
2428 
index_read_idx_with_result_set(uchar * buf,uint index,const uchar * key,uint key_len,ha_rkey_function find_flag,MYSQL_RES ** result)2429 int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
2430                                                  const uchar *key,
2431                                                  uint key_len,
2432                                                  ha_rkey_function find_flag,
2433                                                  MYSQL_RES **result)
2434 {
2435   int retval;
2436   char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2437   char index_value[STRING_BUFFER_USUAL_SIZE];
2438   char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2439   String index_string(index_value,
2440                       sizeof(index_value),
2441                       &my_charset_bin);
2442   String sql_query(sql_query_buffer,
2443                    sizeof(sql_query_buffer),
2444                    &my_charset_bin);
2445   key_range range;
2446   DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2447 
2448   *result= 0;                                   // In case of errors
2449   index_string.length(0);
2450   sql_query.length(0);
2451   ha_statistic_increment(&SSV::ha_read_key_count);
2452 
2453   sql_query.append(share->select_query);
2454 
2455   range.key= key;
2456   range.length= key_len;
2457   range.flag= find_flag;
2458   create_where_from_key(&index_string,
2459                         &table->key_info[index],
2460                         &range,
2461                         NULL, 0, 0);
2462   sql_query.append(index_string);
2463 
2464   if (real_query(sql_query.ptr(), sql_query.length()))
2465   {
2466     sprintf(error_buffer, "error: %d '%s'",
2467             mysql_errno(mysql), mysql_error(mysql));
2468     retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2469     goto error;
2470   }
2471   if (!(*result= store_result(mysql)))
2472   {
2473     retval= HA_ERR_END_OF_FILE;
2474     goto error;
2475   }
2476   if ((retval= read_next(buf, *result)))
2477   {
2478     mysql_free_result(*result);
2479     results.pop_back();
2480     *result= 0;
2481     table->status= STATUS_NOT_FOUND;
2482     DBUG_RETURN(retval);
2483   }
2484   DBUG_RETURN(0);
2485 
2486 error:
2487   table->status= STATUS_NOT_FOUND;
2488   my_error(retval, MYF(0), error_buffer);
2489   DBUG_RETURN(retval);
2490 }
2491 
2492 
2493 /*
2494   This method is used exlusevely by filesort() to check if we
2495   can create sorting buffers of necessary size.
2496   If the handler returns more records that it declares
2497   here server can just crash on filesort().
2498   We cannot guarantee that's not going to happen with
2499   the FEDERATED engine, as we have records==0 always if the
2500   client is a VIEW, and for the table the number of
2501   records can inpredictably change during execution.
2502   So we return maximum possible value here.
2503 */
2504 
estimate_rows_upper_bound()2505 ha_rows ha_federated::estimate_rows_upper_bound()
2506 {
2507   return HA_POS_ERROR;
2508 }
2509 
2510 
2511 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
2512 
index_init(uint keynr,bool sorted)2513 int ha_federated::index_init(uint keynr, bool sorted)
2514 {
2515   DBUG_ENTER("ha_federated::index_init");
2516   DBUG_PRINT("info", ("table: '%s'  key: %u", table->s->table_name.str, keynr));
2517   active_index= keynr;
2518   DBUG_RETURN(0);
2519 }
2520 
2521 
2522 /*
2523   Read first range
2524 */
2525 
read_range_first(const key_range * start_key,const key_range * end_key,bool eq_range_arg,bool sorted)2526 int ha_federated::read_range_first(const key_range *start_key,
2527                                    const key_range *end_key,
2528                                    bool eq_range_arg, bool sorted)
2529 {
2530   char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2531   int retval;
2532   String sql_query(sql_query_buffer,
2533                    sizeof(sql_query_buffer),
2534                    &my_charset_bin);
2535   DBUG_ENTER("ha_federated::read_range_first");
2536   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2537 
2538   assert(!(start_key == NULL && end_key == NULL));
2539 
2540   sql_query.length(0);
2541   sql_query.append(share->select_query);
2542   create_where_from_key(&sql_query,
2543                         &table->key_info[active_index],
2544                         start_key, end_key, 0, eq_range_arg);
2545   if (real_query(sql_query.ptr(), sql_query.length()))
2546   {
2547     retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2548     goto error;
2549   }
2550   sql_query.length(0);
2551 
2552   if (!(stored_result= store_result(mysql)))
2553   {
2554     retval= HA_ERR_END_OF_FILE;
2555     goto error;
2556   }
2557 
2558   retval= read_next(table->record[0], stored_result);
2559   MYSQL_INDEX_READ_ROW_DONE(retval);
2560   DBUG_RETURN(retval);
2561 
2562 error:
2563   table->status= STATUS_NOT_FOUND;
2564   MYSQL_INDEX_READ_ROW_DONE(retval);
2565   DBUG_RETURN(retval);
2566 }
2567 
2568 
read_range_next()2569 int ha_federated::read_range_next()
2570 {
2571   int retval;
2572   DBUG_ENTER("ha_federated::read_range_next");
2573   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2574   retval= rnd_next_int(table->record[0]);
2575   MYSQL_INDEX_READ_ROW_DONE(retval);
2576   DBUG_RETURN(retval);
2577 }
2578 
2579 
2580 /* Used to read forward through the index.  */
index_next(uchar * buf)2581 int ha_federated::index_next(uchar *buf)
2582 {
2583   int retval;
2584   DBUG_ENTER("ha_federated::index_next");
2585   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2586   ha_statistic_increment(&SSV::ha_read_next_count);
2587   retval= read_next(buf, stored_result);
2588   MYSQL_INDEX_READ_ROW_DONE(retval);
2589   DBUG_RETURN(retval);
2590 }
2591 
2592 
2593 /*
2594   rnd_init() is called when the system wants the storage engine to do a table
2595   scan.
2596 
2597   This is the method that gets data for the SELECT calls.
2598 
2599   See the federated in the introduction at the top of this file to see when
2600   rnd_init() is called.
2601 
2602   Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2603   sql_table.cc, and sql_update.cc.
2604 */
2605 
rnd_init(bool scan)2606 int ha_federated::rnd_init(bool scan)
2607 {
2608   DBUG_ENTER("ha_federated::rnd_init");
2609   /*
2610     The use of the 'scan' flag is incredibly important for this handler
2611     to work properly, especially with updates containing WHERE clauses
2612     using indexed columns.
2613 
2614     When the initial query contains a WHERE clause of the query using an
2615     indexed column, it's index_read_idx that selects the exact record from
2616     the foreign database.
2617 
2618     When there is NO index in the query, either due to not having a WHERE
2619     clause, or the WHERE clause is using columns that are not indexed, a
2620     'full table scan' done by rnd_init, which in this situation simply means
2621     a 'select * from ...' on the foreign table.
2622 
2623     In other words, this 'scan' flag gives us the means to ensure that if
2624     there is an index involved in the query, we want index_read_idx to
2625     retrieve the exact record (scan flag is 0), and do not  want rnd_init
2626     to do a 'full table scan' and wipe out that result set.
2627 
2628     Prior to using this flag, the problem was most apparent with updates.
2629 
2630     An initial query like 'UPDATE tablename SET anything = whatever WHERE
2631     indexedcol = someval', index_read_idx would get called, using a query
2632     constructed with a WHERE clause built from the values of index ('indexcol'
2633     in this case, having a value of 'someval').  mysql_store_result would
2634     then get called (this would be the result set we want to use).
2635 
2636     After this rnd_init (from sql_update.cc) would be called, it would then
2637     unecessarily call "select * from table" on the foreign table, then call
2638     mysql_store_result, which would wipe out the correct previous result set
2639     from the previous call of index_read_idx's that had the result set
2640     containing the correct record, hence update the wrong row!
2641 
2642   */
2643 
2644   if (scan)
2645   {
2646     if (real_query(share->select_query, strlen(share->select_query)) ||
2647         !(stored_result= store_result(mysql)))
2648       DBUG_RETURN(stash_remote_error());
2649   }
2650   DBUG_RETURN(0);
2651 }
2652 
2653 
rnd_end()2654 int ha_federated::rnd_end()
2655 {
2656   DBUG_ENTER("ha_federated::rnd_end");
2657   DBUG_RETURN(index_end());
2658 }
2659 
2660 
index_end(void)2661 int ha_federated::index_end(void)
2662 {
2663   DBUG_ENTER("ha_federated::index_end");
2664   free_result();
2665   active_index= MAX_KEY;
2666   DBUG_RETURN(0);
2667 }
2668 
2669 
2670 /*
2671   This is called for each row of the table scan. When you run out of records
2672   you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
2673   The Field structure for the table is the key to getting data into buf
2674   in a manner that will allow the server to understand it.
2675 
2676   Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2677   sql_table.cc, and sql_update.cc.
2678 */
2679 
rnd_next(uchar * buf)2680 int ha_federated::rnd_next(uchar *buf)
2681 {
2682   int rc;
2683   DBUG_ENTER("ha_federated::rnd_next");
2684   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2685                        TRUE);
2686   rc= rnd_next_int(buf);
2687   MYSQL_READ_ROW_DONE(rc);
2688   DBUG_RETURN(rc);
2689 }
2690 
rnd_next_int(uchar * buf)2691 int ha_federated::rnd_next_int(uchar *buf)
2692 {
2693   DBUG_ENTER("ha_federated::rnd_next_int");
2694 
2695   if (stored_result == 0)
2696   {
2697     /*
2698       Return value of rnd_init is not always checked (see records.cc),
2699       so we can get here _even_ if there is _no_ pre-fetched result-set!
2700       TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
2701     */
2702     DBUG_RETURN(1);
2703   }
2704   DBUG_RETURN(read_next(buf, stored_result));
2705 }
2706 
2707 
2708 /*
2709   ha_federated::read_next
2710 
2711   reads from a result set and converts to mysql internal
2712   format
2713 
2714   SYNOPSIS
2715     ha_federated::read_next()
2716       buf       byte pointer to record
2717       result    mysql result set
2718 
2719     DESCRIPTION
2720      This method is a wrapper method that reads one record from a result
2721      set and converts it to the internal table format
2722 
2723     RETURN VALUE
2724       1    error
2725       0    no error
2726 */
2727 
read_next(uchar * buf,MYSQL_RES * result)2728 int ha_federated::read_next(uchar *buf, MYSQL_RES *result)
2729 {
2730   int retval;
2731   MYSQL_ROW row;
2732   DBUG_ENTER("ha_federated::read_next");
2733 
2734   table->status= STATUS_NOT_FOUND;              // For easier return
2735 
2736   /* Save current data cursor position. */
2737   current_position= result->data_cursor;
2738 
2739   /* Fetch a row, insert it back in a row format. */
2740   if (!(row= mysql_fetch_row(result)))
2741     DBUG_RETURN(HA_ERR_END_OF_FILE);
2742 
2743   if (!(retval= convert_row_to_internal_format(buf, row, result)))
2744     table->status= 0;
2745 
2746   DBUG_RETURN(retval);
2747 }
2748 
2749 
2750 /**
2751   @brief      Store a reference to current row.
2752 
2753   @details    During a query execution we may have different result sets (RS),
2754               e.g. for different ranges. All the RS's used are stored in
2755               memory and placed in @c results dynamic array. At the end of
2756               execution all stored RS's are freed at once in the
2757               @c ha_federated::reset().
2758               So, in case of federated, a reference to current row is a
2759               stored result address and current data cursor position.
2760               As we keep all RS in memory during a query execution,
2761               we can get any record using the reference any time until
2762               @c ha_federated::reset() is called.
2763               TODO: we don't have to store all RS's rows but only those
2764               we call @c ha_federated::position() for, so we can free memory
2765               where we store other rows in the @c ha_federated::index_end().
2766 
2767   @param[in]  record  record data (unused)
2768 */
2769 
position(const uchar * record MY_ATTRIBUTE ((unused)))2770 void ha_federated::position(const uchar *record MY_ATTRIBUTE ((unused)))
2771 {
2772   DBUG_ENTER("ha_federated::position");
2773 
2774   assert(stored_result);
2775 
2776   position_called= TRUE;
2777   /* Store result set address. */
2778   memcpy(ref, &stored_result, sizeof(MYSQL_RES *));
2779   /* Store data cursor position. */
2780   memcpy(ref + sizeof(MYSQL_RES *), &current_position,
2781                sizeof(MYSQL_ROW_OFFSET));
2782   DBUG_VOID_RETURN;
2783 }
2784 
2785 
2786 /*
2787   This is like rnd_next, but you are given a position to use to determine the
2788   row. The position will be of the type that you stored in ref.
2789 
2790   This method is required for an ORDER BY
2791 
2792   Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
2793 */
2794 
rnd_pos(uchar * buf,uchar * pos)2795 int ha_federated::rnd_pos(uchar *buf, uchar *pos)
2796 {
2797   MYSQL_RES *result;
2798   int ret_val;
2799   DBUG_ENTER("ha_federated::rnd_pos");
2800 
2801   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2802                        FALSE);
2803   ha_statistic_increment(&SSV::ha_read_rnd_count);
2804 
2805   /* Get stored result set. */
2806   memcpy(&result, pos, sizeof(MYSQL_RES *));
2807   assert(result);
2808   /* Set data cursor position. */
2809   memcpy(&result->data_cursor, pos + sizeof(MYSQL_RES *),
2810          sizeof(MYSQL_ROW_OFFSET));
2811   /* Read a row. */
2812   ret_val= read_next(buf, result);
2813   MYSQL_READ_ROW_DONE(ret_val);
2814   DBUG_RETURN(ret_val);
2815 }
2816 
2817 
2818 /*
2819   ::info() is used to return information to the optimizer.
2820   Currently this table handler doesn't implement most of the fields
2821   really needed. SHOW also makes use of this data
2822   Another note, you will probably want to have the following in your
2823   code:
2824   if (records < 2)
2825     records = 2;
2826   The reason is that the server will optimize for cases of only a single
2827   record. If in a table scan you don't know the number of records
2828   it will probably be better to set records to two so you can return
2829   as many records as you need.
2830   Along with records a few more variables you may wish to set are:
2831     records
2832     deleted
2833     data_file_length
2834     index_file_length
2835     delete_length
2836     check_time
2837   Take a look at the public variables in handler.h for more information.
2838 
2839   Called in:
2840     filesort.cc
2841     ha_heap.cc
2842     item_sum.cc
2843     opt_sum.cc
2844     sql_delete.cc
2845     sql_delete.cc
2846     sql_derived.cc
2847     sql_select.cc
2848     sql_select.cc
2849     sql_select.cc
2850     sql_select.cc
2851     sql_select.cc
2852     sql_show.cc
2853     sql_show.cc
2854     sql_show.cc
2855     sql_show.cc
2856     sql_table.cc
2857     sql_union.cc
2858     sql_update.cc
2859 
2860 */
2861 
info(uint flag)2862 int ha_federated::info(uint flag)
2863 {
2864   char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
2865   int error;
2866   uint error_code;
2867   MYSQL_RES *result= 0;
2868   MYSQL_ROW row;
2869   String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2870   DBUG_ENTER("ha_federated::info");
2871 
2872   error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2873   /* we want not to show table status if not needed to do so */
2874   if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
2875   {
2876     status_query_string.length(0);
2877     status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
2878     append_ident(&status_query_string, share->table_name,
2879                  share->table_name_length, value_quote_char);
2880 
2881     if (real_query(status_query_string.ptr(), status_query_string.length()))
2882       goto error;
2883 
2884     status_query_string.length(0);
2885 
2886     result= mysql_store_result(mysql);
2887 
2888     /*
2889       We're going to use fields num. 4, 12 and 13 of the resultset,
2890       so make sure we have these fields.
2891     */
2892     if (!result || (mysql_num_fields(result) < 14))
2893       goto error;
2894 
2895     if (!mysql_num_rows(result))
2896       goto error;
2897 
2898     if (!(row= mysql_fetch_row(result)))
2899       goto error;
2900 
2901     /*
2902       deleted is set in ha_federated::info
2903     */
2904     /*
2905       need to figure out what this means as far as federated is concerned,
2906       since we don't have a "file"
2907 
2908       data_file_length = ?
2909       index_file_length = ?
2910       delete_length = ?
2911     */
2912     if (row[4] != NULL)
2913       stats.records=   (ha_rows) my_strtoll10(row[4], (char**) 0,
2914                                                        &error);
2915     if (row[5] != NULL)
2916       stats.mean_rec_length= (ulong) my_strtoll10(row[5], (char**) 0, &error);
2917 
2918     stats.data_file_length= stats.records * stats.mean_rec_length;
2919 
2920     if (row[12] != NULL)
2921       stats.update_time=     (ulong) my_strtoll10(row[12], (char**) 0,
2922                                                       &error);
2923     if (row[13] != NULL)
2924       stats.check_time=      (ulong) my_strtoll10(row[13], (char**) 0,
2925                                                       &error);
2926 
2927     /*
2928       size of IO operations (This is based on a good guess, no high science
2929       involved)
2930     */
2931     if (flag & HA_STATUS_CONST)
2932       stats.block_size= 4096;
2933 
2934   }
2935 
2936   if ((flag & HA_STATUS_AUTO) && mysql)
2937     stats.auto_increment_value= mysql->insert_id;
2938 
2939   mysql_free_result(result);
2940 
2941   DBUG_RETURN(0);
2942 
2943 error:
2944   mysql_free_result(result);
2945   if (mysql)
2946   {
2947     my_printf_error(error_code, ": %d : %s", MYF(0),
2948                     mysql_errno(mysql), mysql_error(mysql));
2949   }
2950   else
2951   if (remote_error_number != -1 /* error already reported */)
2952   {
2953     error_code= remote_error_number;
2954     my_error(error_code, MYF(0), ER(error_code));
2955   }
2956   DBUG_RETURN(error_code);
2957 }
2958 
2959 
2960 /**
2961   @brief Handles extra signals from MySQL server
2962 
2963   @param[in] operation  Hint for storage engine
2964 
2965   @return Operation Status
2966   @retval 0     OK
2967  */
extra(ha_extra_function operation)2968 int ha_federated::extra(ha_extra_function operation)
2969 {
2970   DBUG_ENTER("ha_federated::extra");
2971   switch (operation) {
2972   case HA_EXTRA_IGNORE_DUP_KEY:
2973     ignore_duplicates= TRUE;
2974     break;
2975   case HA_EXTRA_NO_IGNORE_DUP_KEY:
2976     insert_dup_update= FALSE;
2977     ignore_duplicates= FALSE;
2978     break;
2979   case HA_EXTRA_WRITE_CAN_REPLACE:
2980     replace_duplicates= TRUE;
2981     break;
2982   case HA_EXTRA_WRITE_CANNOT_REPLACE:
2983     /*
2984       We use this flag to ensure that we do not create an "INSERT IGNORE"
2985       statement when inserting new rows into the remote table.
2986     */
2987     replace_duplicates= FALSE;
2988     break;
2989   case HA_EXTRA_INSERT_WITH_UPDATE:
2990     insert_dup_update= TRUE;
2991     break;
2992   default:
2993     /* do nothing */
2994     DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
2995   }
2996   DBUG_RETURN(0);
2997 }
2998 
2999 
3000 /**
3001   @brief Reset state of file to after 'open'.
3002 
3003   @detail This function is called after every statement for all tables
3004     used by that statement.
3005 
3006   @return Operation status
3007     @retval     0       OK
3008 */
3009 
reset(void)3010 int ha_federated::reset(void)
3011 {
3012   insert_dup_update= FALSE;
3013   ignore_duplicates= FALSE;
3014   replace_duplicates= FALSE;
3015 
3016   /* Free stored result sets. */
3017   for (MYSQL_RES **result= results.begin(); result != results.end(); ++result)
3018   {
3019     mysql_free_result(*result);
3020   }
3021   results.clear();
3022 
3023   return 0;
3024 }
3025 
3026 
3027 /*
3028   Used to delete all rows in a table. Both for cases of truncate and
3029   for cases where the optimizer realizes that all rows will be
3030   removed as a result of a SQL statement.
3031 
3032   Called from item_sum.cc by Item_func_group_concat::clear(),
3033   Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
3034   Called from sql_delete.cc by mysql_delete().
3035   Called from sql_select.cc by JOIN::reinit().
3036   Called from sql_union.cc by st_select_lex_unit::exec().
3037 */
3038 
delete_all_rows()3039 int ha_federated::delete_all_rows()
3040 {
3041   char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3042   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3043   DBUG_ENTER("ha_federated::delete_all_rows");
3044 
3045   query.length(0);
3046 
3047   query.set_charset(system_charset_info);
3048   if (ignore_duplicates)
3049     query.append(STRING_WITH_LEN("DELETE IGNORE FROM "));
3050   else
3051     query.append(STRING_WITH_LEN("DELETE FROM "));
3052   append_ident(&query, share->table_name, share->table_name_length,
3053                ident_quote_char);
3054 
3055   if (real_query(query.ptr(), query.length()))
3056   {
3057     DBUG_RETURN(stash_remote_error());
3058   }
3059   stats.deleted+= stats.records;
3060   stats.records= 0;
3061   DBUG_RETURN(0);
3062 }
3063 
3064 
3065 /*
3066   Used to manually truncate the table.
3067 */
3068 
truncate()3069 int ha_federated::truncate()
3070 {
3071   char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3072   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3073   DBUG_ENTER("ha_federated::truncate");
3074 
3075   query.length(0);
3076 
3077   query.set_charset(system_charset_info);
3078   query.append(STRING_WITH_LEN("TRUNCATE TABLE "));
3079   append_ident(&query, share->table_name, share->table_name_length,
3080                ident_quote_char);
3081 
3082   /*
3083     TRUNCATE won't return anything in mysql_affected_rows
3084   */
3085   if (real_query(query.ptr(), query.length()))
3086   {
3087     DBUG_RETURN(stash_remote_error());
3088   }
3089   stats.deleted+= stats.records;
3090   stats.records= 0;
3091   DBUG_RETURN(0);
3092 }
3093 
3094 
3095 /*
3096   The idea with handler::store_lock() is the following:
3097 
3098   The statement decided which locks we should need for the table
3099   for updates/deletes/inserts we get WRITE locks, for SELECT... we get
3100   read locks.
3101 
3102   Before adding the lock into the table lock handler (see thr_lock.c)
3103   mysqld calls store lock with the requested locks.  Store lock can now
3104   modify a write lock to a read lock (or some other lock), ignore the
3105   lock (if we don't want to use MySQL table locks at all) or add locks
3106   for many tables (like we do when we are using a MERGE handler).
3107 
3108   Berkeley DB for federated  changes all WRITE locks to TL_WRITE_ALLOW_WRITE
3109   (which signals that we are doing WRITES, but we are still allowing other
3110   reader's and writer's.
3111 
3112   When releasing locks, store_lock() are also called. In this case one
3113   usually doesn't have to do anything.
3114 
3115   In some exceptional cases MySQL may send a request for a TL_IGNORE;
3116   This means that we are requesting the same lock as last time and this
3117   should also be ignored. (This may happen when someone does a flush
3118   table when we have opened a part of the tables, in which case mysqld
3119   closes and reopens the tables and tries to get the same locks at last
3120   time).  In the future we will probably try to remove this.
3121 
3122   Called from lock.cc by get_lock_data().
3123 */
3124 
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3125 THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
3126                                          THR_LOCK_DATA **to,
3127                                          enum thr_lock_type lock_type)
3128 {
3129   DBUG_ENTER("ha_federated::store_lock");
3130   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3131   {
3132     /*
3133       Here is where we get into the guts of a row level lock.
3134       If TL_UNLOCK is set
3135       If we are not doing a LOCK TABLE or DISCARD/IMPORT
3136       TABLESPACE, then allow multiple writers
3137     */
3138 
3139     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3140          lock_type <= TL_WRITE) && !thd->in_lock_tables)
3141       lock_type= TL_WRITE_ALLOW_WRITE;
3142 
3143     /*
3144       In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
3145       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
3146       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
3147       to t2. Convert the lock to a normal read lock to allow
3148       concurrent inserts to t2.
3149     */
3150 
3151     if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3152       lock_type= TL_READ;
3153 
3154     lock.type= lock_type;
3155   }
3156 
3157   *to++= &lock;
3158 
3159   DBUG_RETURN(to);
3160 }
3161 
3162 /*
3163   create() does nothing, since we have no local setup of our own.
3164   FUTURE: We should potentially connect to the foreign database and
3165 */
3166 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)3167 int ha_federated::create(const char *name, TABLE *table_arg,
3168                          HA_CREATE_INFO *create_info)
3169 {
3170   int retval;
3171   THD *thd= current_thd;
3172   FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
3173   DBUG_ENTER("ha_federated::create");
3174 
3175   retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3176 
3177   DBUG_RETURN(retval);
3178 
3179 }
3180 
3181 
real_connect()3182 int ha_federated::real_connect()
3183 {
3184   char buffer[FEDERATED_QUERY_BUFFER_SIZE];
3185   String sql_query(buffer, sizeof(buffer), &my_charset_bin);
3186   DBUG_ENTER("ha_federated::real_connect");
3187 
3188   /*
3189     Bug#25679
3190     Ensure that we do not hold the LOCK_open mutex while attempting
3191     to establish Federated connection to guard against a trivial
3192     Denial of Service scenerio.
3193   */
3194   mysql_mutex_assert_not_owner(&LOCK_open);
3195 
3196   assert(mysql == NULL);
3197 
3198   if (!(mysql= mysql_init(NULL)))
3199   {
3200     remote_error_number= HA_ERR_OUT_OF_MEM;
3201     DBUG_RETURN(-1);
3202   }
3203 
3204   /*
3205     BUG# 17044 Federated Storage Engine is not UTF8 clean
3206     Add set names to whatever charset the table is at open
3207     of table
3208   */
3209   /* this sets the csname like 'set names utf8' */
3210   mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
3211                 this->table->s->table_charset->csname);
3212   mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD,
3213                 "program_name", "mysqld");
3214   mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD,
3215                 "_client_role", "federated_storage");
3216   sql_query.length(0);
3217 
3218   if (!mysql_real_connect(mysql,
3219                           share->hostname,
3220                           share->username,
3221                           share->password,
3222                           share->database,
3223                           share->port,
3224                           share->socket, 0))
3225   {
3226     stash_remote_error();
3227     mysql_close(mysql);
3228     mysql= NULL;
3229     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
3230     remote_error_number= -1;
3231     DBUG_RETURN(-1);
3232   }
3233 
3234   /*
3235     We have established a connection, lets try a simple dummy query just
3236     to check that the table and expected columns are present.
3237   */
3238   sql_query.append(share->select_query);
3239   sql_query.append(STRING_WITH_LEN(" WHERE 1=0"));
3240   if (mysql_real_query(mysql, sql_query.ptr(),
3241                        static_cast<ulong>(sql_query.length())))
3242   {
3243     sql_query.length(0);
3244     sql_query.append("error: ");
3245     sql_query.qs_append(mysql_errno(mysql));
3246     sql_query.append("  '");
3247     sql_query.append(mysql_error(mysql));
3248     sql_query.append("'");
3249     mysql_close(mysql);
3250     mysql= NULL;
3251     my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
3252     remote_error_number= -1;
3253     DBUG_RETURN(-1);
3254   }
3255 
3256   /* Just throw away the result, no rows anyways but need to keep in sync */
3257   mysql_free_result(mysql_store_result(mysql));
3258 
3259   /*
3260     Since we do not support transactions at this version, we can let the client
3261     API silently reconnect. For future versions, we will need more logic to
3262     deal with transactions
3263   */
3264 
3265   mysql->reconnect= 1;
3266   DBUG_RETURN(0);
3267 }
3268 
3269 
real_query(const char * query,size_t length)3270 int ha_federated::real_query(const char *query, size_t length)
3271 {
3272   int rc= 0;
3273   DBUG_ENTER("ha_federated::real_query");
3274 
3275   if (!mysql && (rc= real_connect()))
3276     goto end;
3277 
3278   if (!query || !length)
3279     goto end;
3280 
3281   rc= mysql_real_query(mysql, query, static_cast<ulong>(length));
3282 
3283 end:
3284   DBUG_RETURN(rc);
3285 }
3286 
3287 
stash_remote_error()3288 int ha_federated::stash_remote_error()
3289 {
3290   DBUG_ENTER("ha_federated::stash_remote_error()");
3291   if (!mysql)
3292     DBUG_RETURN(remote_error_number);
3293   remote_error_number= mysql_errno(mysql);
3294   strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
3295   if (remote_error_number == ER_DUP_ENTRY ||
3296       remote_error_number == ER_DUP_KEY)
3297     DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3298   if (remote_error_number == ER_NO_REFERENCED_ROW ||
3299       remote_error_number == ER_NO_REFERENCED_ROW_2)
3300     DBUG_RETURN(HA_ERR_NO_REFERENCED_ROW);
3301   DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
3302 }
3303 
3304 
get_error_message(int error,String * buf)3305 bool ha_federated::get_error_message(int error, String* buf)
3306 {
3307   DBUG_ENTER("ha_federated::get_error_message");
3308   DBUG_PRINT("enter", ("error: %d", error));
3309   if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
3310   {
3311     buf->append(STRING_WITH_LEN("Error on remote system: "));
3312     buf->qs_append(remote_error_number);
3313     buf->append(STRING_WITH_LEN(": "));
3314     buf->append(remote_error_buf);
3315 
3316     remote_error_number= 0;
3317     remote_error_buf[0]= '\0';
3318   }
3319   DBUG_PRINT("exit", ("message: %s", buf->ptr()));
3320   DBUG_RETURN(FALSE);
3321 }
3322 
3323 
3324 /**
3325   @brief      Store a result set.
3326 
3327   @details    Call @c mysql_store_result() to save a result set then
3328               append it to the stored results array.
3329 
3330   @param[in]  mysql_arg  MySLQ connection structure.
3331 
3332   @return     Stored result set (MYSQL_RES object).
3333 */
3334 
store_result(MYSQL * mysql_arg)3335 MYSQL_RES *ha_federated::store_result(MYSQL *mysql_arg)
3336 {
3337   MYSQL_RES *result= mysql_store_result(mysql_arg);
3338   DBUG_ENTER("ha_federated::store_result");
3339   if (result)
3340   {
3341     results.push_back(result);
3342   }
3343   position_called= FALSE;
3344   DBUG_RETURN(result);
3345 }
3346 
3347 
free_result()3348 void ha_federated::free_result()
3349 {
3350   DBUG_ENTER("ha_federated::free_result");
3351   if (stored_result && !position_called)
3352   {
3353     mysql_free_result(stored_result);
3354     stored_result= 0;
3355     if (!results.empty())
3356       results.pop_back();
3357   }
3358   DBUG_VOID_RETURN;
3359 }
3360 
3361 
external_lock(THD * thd,int lock_type)3362 int ha_federated::external_lock(THD *thd, int lock_type)
3363 {
3364   int error= 0;
3365   DBUG_ENTER("ha_federated::external_lock");
3366 
3367   /*
3368     Support for transactions disabled until WL#2952 fixes it.
3369   */
3370   DBUG_RETURN(error);
3371 }
3372 
3373 
federated_commit(handlerton * hton,THD * thd,bool all)3374 static int federated_commit(handlerton *hton, THD *thd, bool all)
3375 {
3376   int return_val= 0;
3377   ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
3378   DBUG_ENTER("federated_commit");
3379 
3380   if (all)
3381   {
3382     int error= 0;
3383     ha_federated *ptr, *old= NULL;
3384     for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3385     {
3386       if (old)
3387         old->trx_next= NULL;
3388       error= ptr->connection_commit();
3389       if (error && !return_val)
3390         return_val= error;
3391     }
3392     thd_set_ha_data(thd, hton, NULL);
3393   }
3394 
3395   DBUG_PRINT("info", ("error val: %d", return_val));
3396   DBUG_RETURN(return_val);
3397 }
3398 
3399 
federated_rollback(handlerton * hton,THD * thd,bool all)3400 static int federated_rollback(handlerton *hton, THD *thd, bool all)
3401 {
3402   int return_val= 0;
3403   ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
3404   DBUG_ENTER("federated_rollback");
3405 
3406   if (all)
3407   {
3408     int error= 0;
3409     ha_federated *ptr, *old= NULL;
3410     for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3411     {
3412       if (old)
3413         old->trx_next= NULL;
3414       error= ptr->connection_rollback();
3415       if (error && !return_val)
3416         return_val= error;
3417     }
3418     thd_set_ha_data(thd, hton, NULL);
3419   }
3420 
3421   DBUG_PRINT("info", ("error val: %d", return_val));
3422   DBUG_RETURN(return_val);
3423 }
3424 
connection_commit()3425 int ha_federated::connection_commit()
3426 {
3427   DBUG_ENTER("ha_federated::connection_commit");
3428   DBUG_RETURN(execute_simple_query("COMMIT", 6));
3429 }
3430 
3431 
connection_rollback()3432 int ha_federated::connection_rollback()
3433 {
3434   DBUG_ENTER("ha_federated::connection_rollback");
3435   DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
3436 }
3437 
3438 
connection_autocommit(bool state)3439 int ha_federated::connection_autocommit(bool state)
3440 {
3441   const char *text;
3442   DBUG_ENTER("ha_federated::connection_autocommit");
3443   text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
3444   DBUG_RETURN(execute_simple_query(text, 16));
3445 }
3446 
3447 
execute_simple_query(const char * query,int len)3448 int ha_federated::execute_simple_query(const char *query, int len)
3449 {
3450   DBUG_ENTER("ha_federated::execute_simple_query");
3451 
3452   if (mysql_real_query(mysql, query, (ulong)len))
3453   {
3454     DBUG_RETURN(stash_remote_error());
3455   }
3456   DBUG_RETURN(0);
3457 }
3458 
3459 struct st_mysql_storage_engine federated_storage_engine=
3460 { MYSQL_HANDLERTON_INTERFACE_VERSION };
3461 
mysql_declare_plugin(federated)3462 mysql_declare_plugin(federated)
3463 {
3464   MYSQL_STORAGE_ENGINE_PLUGIN,
3465   &federated_storage_engine,
3466   "FEDERATED",
3467   "Patrick Galbraith and Brian Aker, MySQL AB",
3468   "Federated MySQL storage engine",
3469   PLUGIN_LICENSE_GPL,
3470   federated_db_init, /* Plugin Init */
3471   federated_done, /* Plugin Deinit */
3472   0x0100 /* 1.0 */,
3473   NULL,                       /* status variables                */
3474   NULL,                       /* system variables                */
3475   NULL,                       /* config options                  */
3476   0,                          /* flags                           */
3477 }
3478 mysql_declare_plugin_end;
3479