1 /* Copyright (c) 2004, 2013, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License as published by
5   the Free Software Foundation; version 2 of the License.
6 
7   This program is distributed in the hope that it will be useful,
8   but WITHOUT ANY WARRANTY; without even the implied warranty of
9   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10   GNU General Public License for more details.
11 
12   You should have received a copy of the GNU General Public License
13   along with this program; if not, write to the Free Software
14   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
15 
16 /*
17   Please read ha_exmple.cc before reading this file.
18   Please keep in mind that the federated storage engine implements all methods
19   that are required to be implemented. handler.h has a full list of methods
20   that you can implement.
21 */
22 
23 #ifdef USE_PRAGMA_INTERFACE
24 #pragma interface			/* gcc class implementation */
25 #endif
26 
27 #include <mysql.h>
28 
29 /*
30   handler::print_error has a case statement for error numbers.
31   This value is (10000) is far out of range and will envoke the
32   default: case.
33   (Current error range is 120-159 from include/my_base.h)
34 */
35 #define HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM 10000
36 
37 #define FEDERATED_QUERY_BUFFER_SIZE (STRING_BUFFER_USUAL_SIZE * 5)
38 #define FEDERATED_RECORDS_IN_RANGE 2
39 #define FEDERATED_MAX_KEY_LENGTH 3500 // Same as innodb
40 
41 /*
42   FEDERATED_SHARE is a structure that will be shared amoung all open handlers
43   The example implements the minimum of what you will probably need.
44 */
45 typedef struct st_federated_share {
46   MEM_ROOT mem_root;
47 
48   bool parsed;
49   /* this key is unique db/tablename */
50   const char *share_key;
51   /*
52     the primary select query to be used in rnd_init
53   */
54   char *select_query;
55   /*
56     remote host info, parse_url supplies
57   */
58   char *server_name;
59   char *connection_string;
60   char *scheme;
61   char *connect_string;
62   char *hostname;
63   char *username;
64   char *password;
65   char *database;
66   char *table_name;
67   char *table;
68   char *socket;
69   char *sport;
70   int share_key_length;
71   ushort port;
72 
73   size_t table_name_length, server_name_length, connect_string_length, use_count;
74   mysql_mutex_t mutex;
75   THR_LOCK lock;
76 } FEDERATED_SHARE;
77 
78 /*
79   Class definition for the storage engine
80 */
81 class ha_federated: public handler
82 {
83   THR_LOCK_DATA lock;      /* MySQL lock */
84   FEDERATED_SHARE *share;    /* Shared lock info */
85   MYSQL *mysql; /* MySQL connection */
86   MYSQL_RES *stored_result;
87   /**
88     Array of all stored results we get during a query execution.
89   */
90   DYNAMIC_ARRAY results;
91   bool position_called, table_will_be_deleted;
92   MYSQL_ROW_OFFSET current_position;  // Current position used by ::position()
93   int remote_error_number;
94   char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
95   bool ignore_duplicates, replace_duplicates;
96   bool insert_dup_update;
97   DYNAMIC_STRING bulk_insert;
98 
99 private:
100   /*
101       return 0 on success
102       return errorcode otherwise
103   */
104   uint convert_row_to_internal_format(uchar *buf, MYSQL_ROW row,
105                                       MYSQL_RES *result);
106   bool create_where_from_key(String *to, KEY *key_info,
107                              const key_range *start_key,
108                              const key_range *end_key,
109                              bool records_in_range, bool eq_range);
110   int stash_remote_error();
111 
112   bool append_stmt_insert(String *query);
113 
114   int read_next(uchar *buf, MYSQL_RES *result);
115   int index_read_idx_with_result_set(uchar *buf, uint index,
116                                      const uchar *key,
117                                      uint key_len,
118                                      ha_rkey_function find_flag,
119                                      MYSQL_RES **result);
120   int real_query(const char *query, size_t length);
121   int real_connect();
122 public:
123   ha_federated(handlerton *hton, TABLE_SHARE *table_arg);
~ha_federated()124   ~ha_federated() {}
125   /*
126     Next pointer used in transaction
127   */
128   ha_federated *trx_next;
129   /*
130     The name of the index type that will be used for display
131     don't implement this method unless you really have indexes
132    */
133   // perhaps get index type
index_type(uint inx)134   const char *index_type(uint inx) { return "REMOTE"; }
135   /*
136     This is a list of flags that says what the storage engine
137     implements. The current table flags are documented in
138     handler.h
139   */
table_flags()140   ulonglong table_flags() const
141   {
142     /* fix server to be able to get remote server table flags */
143     return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED
144             | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS |
145             HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
146             HA_NO_PREFIX_CHAR_KEYS | HA_PRIMARY_KEY_REQUIRED_FOR_DELETE |
147             HA_NO_TRANSACTIONS /* until fixed by WL#2952 */ |
148             HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY |
149             HA_CAN_REPAIR);
150   }
151   /*
152     This is a bitmap of flags that says how the storage engine
153     implements indexes. The current index flags are documented in
154     handler.h. If you do not implement indexes, just return zero
155     here.
156 
157     part is the key part to check. First key part is 0
158     If all_parts it's set, MySQL want to know the flags for the combined
159     index up to and including 'part'.
160   */
161     /* fix server to be able to get remote server index flags */
index_flags(uint inx,uint part,bool all_parts)162   ulong index_flags(uint inx, uint part, bool all_parts) const
163   {
164     return (HA_READ_NEXT | HA_READ_RANGE | HA_READ_AFTER_KEY);
165   }
max_supported_record_length()166   uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
max_supported_keys()167   uint max_supported_keys()          const { return MAX_KEY; }
max_supported_key_parts()168   uint max_supported_key_parts()     const { return MAX_REF_PARTS; }
max_supported_key_length()169   uint max_supported_key_length()    const { return FEDERATED_MAX_KEY_LENGTH; }
max_supported_key_part_length()170   uint max_supported_key_part_length() const { return FEDERATED_MAX_KEY_LENGTH; }
171   /*
172     Called in test_quick_select to determine if indexes should be used.
173     Normally, we need to know number of blocks . For federated we need to
174     know number of blocks on remote side, and number of packets and blocks
175     on the network side (?)
176     Talk to Kostja about this - how to get the
177     number of rows * ...
178     disk scan time on other side (block size, size of the row) + network time ...
179     The reason for "records * 1000" is that such a large number forces
180     this to use indexes "
181   */
scan_time()182   double scan_time()
183   {
184     DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
185     return (double)(stats.records*1000);
186   }
187   /*
188     The next method will never be called if you do not implement indexes.
189   */
read_time(uint index,uint ranges,ha_rows rows)190   double read_time(uint index, uint ranges, ha_rows rows)
191   {
192     /*
193       Per Brian, this number is bugus, but this method must be implemented,
194       and at a later date, he intends to document this issue for handler code
195     */
196     return (double) rows /  20.0+1;
197   }
198 
keys_to_use_for_scanning()199   const key_map *keys_to_use_for_scanning() { return &key_map_full; }
200   /*
201     Everything below are methods that we implment in ha_federated.cc.
202 
203     Most of these methods are not obligatory, skip them and
204     MySQL will treat them as not implemented
205   */
206   int open(const char *name, int mode, uint test_if_locked);    // required
207   int close(void);                                              // required
208 
209   void start_bulk_insert(ha_rows rows, uint flags);
210   int end_bulk_insert();
211   int write_row(uchar *buf);
212   int update_row(const uchar *old_data, const uchar *new_data);
213   int delete_row(const uchar *buf);
214   int index_init(uint keynr, bool sorted);
215   ha_rows estimate_rows_upper_bound();
216   int index_read(uchar *buf, const uchar *key,
217                  uint key_len, enum ha_rkey_function find_flag);
218   int index_read_idx(uchar *buf, uint idx, const uchar *key,
219                      uint key_len, enum ha_rkey_function find_flag);
220   int index_next(uchar *buf);
221   int index_end();
222   int read_range_first(const key_range *start_key,
223                                const key_range *end_key,
224                                bool eq_range, bool sorted);
225   int read_range_next();
226   /*
227     unlike index_init(), rnd_init() can be called two times
228     without rnd_end() in between (it only makes sense if scan=1).
229     then the second call should prepare for the new table scan
230     (e.g if rnd_init allocates the cursor, second call should
231     position it to the start of the table, no need to deallocate
232     and allocate it again
233   */
234   int rnd_init(bool scan);                                      //required
235   int rnd_end();
236   int rnd_next(uchar *buf);                                      //required
237   int rnd_next_int(uchar *buf);
238   int rnd_pos(uchar *buf, uchar *pos);                            //required
239   void position(const uchar *record);                            //required
240   int info(uint);                                              //required
241   int extra(ha_extra_function operation);
242 
243   void update_auto_increment(void);
244   int repair(THD* thd, HA_CHECK_OPT* check_opt);
245   int optimize(THD* thd, HA_CHECK_OPT* check_opt);
246 
247   int delete_all_rows(void);
248   int truncate();
249   int create(const char *name, TABLE *form,
250              HA_CREATE_INFO *create_info);                      //required
251   ha_rows records_in_range(uint inx, key_range *start_key,
252                                    key_range *end_key);
table_cache_type()253   uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; }
254 
255   THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
256                              enum thr_lock_type lock_type);     //required
257   bool get_error_message(int error, String *buf);
258 
259   MYSQL_RES *store_result(MYSQL *mysql);
260   void free_result();
261 
262   int external_lock(THD *thd, int lock_type);
263   int connection_commit();
264   int connection_rollback();
265   int connection_autocommit(bool state);
266   int execute_simple_query(const char *query, int len);
267   int reset(void);
268 };
269 
270