1 /*
2 Copyright (c) 2008, Patrick Galbraith
3 All rights reserved.
4 
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are
7 met:
8 
9     * Redistributions of source code must retain the above copyright
10 notice, this list of conditions and the following disclaimer.
11 
12     * Redistributions in binary form must reproduce the above
13 copyright notice, this list of conditions and the following disclaimer
14 in the documentation and/or other materials provided with the
15 distribution.
16 
17     * Neither the name of Patrick Galbraith nor the names of its
18 contributors may be used to endorse or promote products derived from
19 this software without specific prior written permission.
20 
21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33 
34 
35 #ifdef USE_PRAGMA_INTERFACE
36 #pragma interface			/* gcc class implementation */
37 #endif
38 
39 //#include <mysql.h>
40 #include <my_global.h>
41 #include <thr_lock.h>
42 #include "handler.h"
43 
44 class federatedx_io;
45 
46 /*
47   FEDERATEDX_SERVER will eventually be a structure that will be shared among
48   all FEDERATEDX_SHARE instances so that the federated server can minimise
49   the number of open connections. This will eventually lead to the support
50   of reliable XA federated tables.
51 */
52 typedef struct st_fedrated_server {
53   MEM_ROOT mem_root;
54   uint use_count, io_count;
55 
56   uchar *key;
57   uint key_length;
58 
59   const char *scheme;
60   const char *hostname;
61   const char *username;
62   const char *password;
63   const char *database;
64   const char *socket;
65   ushort port;
66 
67   const char *csname;
68 
69   mysql_mutex_t mutex;
70   federatedx_io *idle_list;
71 } FEDERATEDX_SERVER;
72 
73 /*
74   Please read ha_exmple.cc before reading this file.
75   Please keep in mind that the federatedx storage engine implements all methods
76   that are required to be implemented. handler.h has a full list of methods
77   that you can implement.
78 */
79 
80 /*
81   handler::print_error has a case statement for error numbers.
82   This value is (10000) is far out of range and will envoke the
83   default: case.
84   (Current error range is 120-159 from include/my_base.h)
85 */
86 #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
87 
88 #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5
89 #define FEDERATEDX_RECORDS_IN_RANGE 2
90 #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb
91 
92 /*
93   FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers
94   The example implements the minimum of what you will probably need.
95 */
96 typedef struct st_federatedx_share {
97   MEM_ROOT mem_root;
98 
99   bool parsed;
100   /* this key is unique db/tablename */
101   const char *share_key;
102   /*
103     the primary select query to be used in rnd_init
104   */
105   char *select_query;
106   /*
107     remote host info, parse_url supplies
108   */
109   char *server_name;
110   char *connection_string;
111   char *scheme;
112   char *hostname;
113   char *username;
114   char *password;
115   char *database;
116   char *table_name;
117   char *table;
118   char *socket;
119   char *sport;
120   int share_key_length;
121   ushort port;
122 
123   size_t table_name_length, server_name_length, connect_string_length;
124   uint use_count;
125   THR_LOCK lock;
126   FEDERATEDX_SERVER *s;
127 } FEDERATEDX_SHARE;
128 
129 
130 typedef struct st_federatedx_result FEDERATEDX_IO_RESULT;
131 typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
132 typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS;
133 typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
134 
135 class federatedx_io
136 {
137   friend class federatedx_txn;
138   FEDERATEDX_SERVER * const server;
139   federatedx_io **owner_ptr;
140   federatedx_io *txn_next;
141   federatedx_io *idle_next;
142   bool active;  /* currently participating in a transaction */
143   bool busy;    /* in use by a ha_federated instance */
144   bool readonly;/* indicates that no updates have occurred */
145 
146 protected:
set_active(bool new_active)147   void set_active(bool new_active)
148   { active= new_active; }
149 public:
150   federatedx_io(FEDERATEDX_SERVER *);
151   virtual ~federatedx_io();
152 
is_readonly()153   bool is_readonly() const { return readonly; }
is_active()154   bool is_active() const { return active; }
155 
get_charsetname()156   const char * get_charsetname() const
157   { return server->csname ? server->csname : "latin1"; }
158 
get_hostname()159   const char * get_hostname() const { return server->hostname; }
get_username()160   const char * get_username() const { return server->username; }
get_password()161   const char * get_password() const { return server->password; }
get_database()162   const char * get_database() const { return server->database; }
get_port()163   ushort       get_port() const     { return server->port; }
get_socket()164   const char * get_socket() const   { return server->socket; }
165 
166   static bool handles_scheme(const char *scheme);
167   static federatedx_io *construct(MEM_ROOT *server_root,
168                                   FEDERATEDX_SERVER *server);
169 
new(size_t size,MEM_ROOT * mem_root)170   static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
171   { return alloc_root(mem_root, size); }
delete(void * ptr,size_t size)172   static void operator delete(void *ptr, size_t size)
173   { TRASH_FREE(ptr, size); }
delete(void *,MEM_ROOT *)174   static void operator delete(void *, MEM_ROOT *)
175   { }
176 
177   virtual int query(const char *buffer, size_t length)=0;
178   virtual FEDERATEDX_IO_RESULT *store_result()=0;
179 
180   virtual size_t max_query_size() const=0;
181 
182   virtual my_ulonglong affected_rows() const=0;
183   virtual my_ulonglong last_insert_id() const=0;
184 
185   virtual int error_code()=0;
186   virtual const char *error_str()=0;
187 
188   virtual void reset()=0;
189   virtual int commit()=0;
190   virtual int rollback()=0;
191 
192   virtual int savepoint_set(ulong sp)=0;
193   virtual ulong savepoint_release(ulong sp)=0;
194   virtual ulong savepoint_rollback(ulong sp)=0;
195   virtual void savepoint_restrict(ulong sp)=0;
196 
197   virtual ulong last_savepoint() const=0;
198   virtual ulong actual_savepoint() const=0;
199   virtual bool is_autocommit() const=0;
200 
201   virtual bool table_metadata(ha_statistics *stats, const char *table_name,
202                               uint table_name_length, uint flag) = 0;
203 
204   /* resultset operations */
205 
206   virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
207   virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
208   virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
209   virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
210                                        FEDERATEDX_IO_ROWS **current= NULL)=0;
211   virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
212   virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
213                                       unsigned int column)=0;
214   virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
215                               unsigned int column) const=0;
216 
217   virtual size_t get_ref_length() const=0;
218   virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
219                              void *ref, FEDERATEDX_IO_ROWS *current)=0;
220   virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
221                             const void *ref)=0;
set_thd(void * thd)222   virtual void set_thd(void *thd) { }
223 
224 };
225 
226 
227 class federatedx_txn
228 {
229   federatedx_io *txn_list;
230   ulong savepoint_level;
231   ulong savepoint_stmt;
232   ulong savepoint_next;
233 
234   void release_scan();
235 public:
236   federatedx_txn();
237   ~federatedx_txn();
238 
has_connections()239   bool has_connections() const { return txn_list != NULL; }
in_transaction()240   bool in_transaction() const { return savepoint_next != 0; }
241   int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
242   void release(federatedx_io **io);
243   void close(FEDERATEDX_SERVER *);
244 
245   bool txn_begin();
246   int txn_commit();
247   int txn_rollback();
248 
249   bool sp_acquire(ulong *save);
250   int sp_rollback(ulong *save);
251   int sp_release(ulong *save);
252 
253   bool stmt_begin();
254   int stmt_commit();
255   int stmt_rollback();
256   void stmt_autocommit();
257 };
258 
259 
260 /*
261   Class definition for the storage engine
262 */
263 class ha_federatedx: public handler
264 {
265   friend int federatedx_db_init(void *p);
266 
267   THR_LOCK_DATA lock;      /* MySQL lock */
268   FEDERATEDX_SHARE *share;    /* Shared lock info */
269   federatedx_txn *txn;
270   federatedx_io *io;
271   FEDERATEDX_IO_RESULT *stored_result;
272   FEDERATEDX_IO_ROWS *current;
273   /**
274       Array of all stored results we get during a query execution.
275   */
276   DYNAMIC_ARRAY results;
277   bool position_called;
278   int remote_error_number;
279   char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
280   bool ignore_duplicates, replace_duplicates;
281   bool insert_dup_update, table_will_be_deleted;
282   DYNAMIC_STRING bulk_insert;
283 
284 private:
285   /*
286       return 0 on success
287       return errorcode otherwise
288   */
289   uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
290                                       FEDERATEDX_IO_RESULT *result);
291   bool create_where_from_key(String *to, KEY *key_info,
292                              const key_range *start_key,
293                              const key_range *end_key,
294                              bool records_in_range, bool eq_range);
295   int stash_remote_error();
296 
297   federatedx_txn *get_txn(THD *thd, bool no_create= FALSE);
298 
299   static int disconnect(handlerton *hton, MYSQL_THD thd);
300   static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv);
301   static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv);
302   static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
303   static int commit(handlerton *hton, MYSQL_THD thd, bool all);
304   static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
305   static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
306                                HA_CREATE_INFO *);
307 
308   bool append_stmt_insert(String *query);
309 
310   int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result);
311   int index_read_idx_with_result_set(uchar *buf, uint index,
312                                      const uchar *key,
313                                      uint key_len,
314                                      ha_rkey_function find_flag,
315                                      FEDERATEDX_IO_RESULT **result);
316   int real_query(const char *query, uint length);
317   int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag);
318 public:
319   ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg);
~ha_federatedx()320   ~ha_federatedx() {}
321   /*
322     The name of the index type that will be used for display
323     don't implement this method unless you really have indexes
324    */
325   // perhaps get index type
index_type(uint inx)326   const char *index_type(uint inx) { return "REMOTE"; }
327   /*
328     This is a list of flags that says what the storage engine
329     implements. The current table flags are documented in
330     handler.h
331   */
table_flags()332   ulonglong table_flags() const
333   {
334     /* fix server to be able to get remote server table flags */
335     return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED
336             | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS |
337             HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR |
338             HA_PRIMARY_KEY_REQUIRED_FOR_DELETE |
339             HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY);
340   }
341   /*
342     This is a bitmap of flags that says how the storage engine
343     implements indexes. The current index flags are documented in
344     handler.h. If you do not implement indexes, just return zero
345     here.
346 
347     part is the key part to check. First key part is 0
348     If all_parts it's set, MySQL want to know the flags for the combined
349     index up to and including 'part'.
350   */
351     /* fix server to be able to get remote server index flags */
index_flags(uint inx,uint part,bool all_parts)352   ulong index_flags(uint inx, uint part, bool all_parts) const
353   {
354     return (HA_READ_NEXT | HA_READ_RANGE);
355   }
max_supported_record_length()356   uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
max_supported_keys()357   uint max_supported_keys()          const { return MAX_KEY; }
max_supported_key_parts()358   uint max_supported_key_parts()     const { return MAX_REF_PARTS; }
max_supported_key_length()359   uint max_supported_key_length()    const { return FEDERATEDX_MAX_KEY_LENGTH; }
max_supported_key_part_length()360   uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
361   /*
362     Called in test_quick_select to determine if indexes should be used.
363     Normally, we need to know number of blocks . For federatedx we need to
364     know number of blocks on remote side, and number of packets and blocks
365     on the network side (?)
366     Talk to Kostja about this - how to get the
367     number of rows * ...
368     disk scan time on other side (block size, size of the row) + network time ...
369     The reason for "records * 1000" is that such a large number forces
370     this to use indexes "
371   */
scan_time()372   double scan_time()
373   {
374     DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
375     return (double)(stats.records*1000);
376   }
377   /*
378     The next method will never be called if you do not implement indexes.
379   */
read_time(uint index,uint ranges,ha_rows rows)380   double read_time(uint index, uint ranges, ha_rows rows)
381   {
382     /*
383       Per Brian, this number is bugus, but this method must be implemented,
384       and at a later date, he intends to document this issue for handler code
385     */
386     return (double) rows /  20.0+1;
387   }
388 
keys_to_use_for_scanning()389   const key_map *keys_to_use_for_scanning() { return &key_map_full; }
390   /*
391     Everything below are methods that we implment in ha_federatedx.cc.
392 
393     Most of these methods are not obligatory, skip them and
394     MySQL will treat them as not implemented
395   */
396   int open(const char *name, int mode, uint test_if_locked);    // required
397   int close(void);                                              // required
398 
399   void start_bulk_insert(ha_rows rows, uint flags);
400   int end_bulk_insert();
401   int write_row(uchar *buf);
402   int update_row(const uchar *old_data, const uchar *new_data);
403   int delete_row(const uchar *buf);
404   int index_init(uint keynr, bool sorted);
405   ha_rows estimate_rows_upper_bound();
406   int index_read(uchar *buf, const uchar *key,
407                  uint key_len, enum ha_rkey_function find_flag);
408   int index_read_idx(uchar *buf, uint idx, const uchar *key,
409                      uint key_len, enum ha_rkey_function find_flag);
410   int index_next(uchar *buf);
411   int index_end();
412   int read_range_first(const key_range *start_key,
413                                const key_range *end_key,
414                                bool eq_range, bool sorted);
415   int read_range_next();
416   /*
417     unlike index_init(), rnd_init() can be called two times
418     without rnd_end() in between (it only makes sense if scan=1).
419     then the second call should prepare for the new table scan
420     (e.g if rnd_init allocates the cursor, second call should
421     position it to the start of the table, no need to deallocate
422     and allocate it again
423   */
424   int rnd_init(bool scan);                                      //required
425   int rnd_end();
426   int rnd_next(uchar *buf);                                      //required
427   int rnd_pos(uchar *buf, uchar *pos);                            //required
428   void position(const uchar *record);                            //required
429   int info(uint);                                              //required
430   int extra(ha_extra_function operation);
431 
432   void update_auto_increment(void);
433   int repair(THD* thd, HA_CHECK_OPT* check_opt);
434   int optimize(THD* thd, HA_CHECK_OPT* check_opt);
435 
436   int delete_all_rows(void);
437   int create(const char *name, TABLE *form,
438              HA_CREATE_INFO *create_info);                      //required
439   ha_rows records_in_range(uint inx, key_range *start_key,
440                                    key_range *end_key);
table_cache_type()441   uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; }
442 
443   THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
444                              enum thr_lock_type lock_type);     //required
445   bool get_error_message(int error, String *buf);
446   int start_stmt(THD *thd, thr_lock_type lock_type);
447   int external_lock(THD *thd, int lock_type);
448   int reset(void);
449   int free_result(void);
450 };
451 
452 extern const char ident_quote_char;              // Character for quoting
453                                                  // identifiers
454 extern const char value_quote_char;              // Character for quoting
455                                                  // literals
456 
457 extern bool append_ident(String *string, const char *name, size_t length,
458                          const char quote_char);
459 
460 
461 extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
462                                            FEDERATEDX_SERVER *server);
463 extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
464                                           FEDERATEDX_SERVER *server);
465