1 #ifndef HA_FEDERATEDX_INCLUDED
2 #define HA_FEDERATEDX_INCLUDED
3 /*
4 Copyright (c) 2008, Patrick Galbraith
5 All rights reserved.
6 
7 Redistribution and use in source and binary forms, with or without
8 modification, are permitted provided that the following conditions are
9 met:
10 
11     * Redistributions of source code must retain the above copyright
12 notice, this list of conditions and the following disclaimer.
13 
14     * Redistributions in binary form must reproduce the above
15 copyright notice, this list of conditions and the following disclaimer
16 in the documentation and/or other materials provided with the
17 distribution.
18 
19     * Neither the name of Patrick Galbraith nor the names of its
20 contributors may be used to endorse or promote products derived from
21 this software without specific prior written permission.
22 
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35 
36 
37 #ifdef USE_PRAGMA_INTERFACE
38 #pragma interface			/* gcc class implementation */
39 #endif
40 
41 //#include <mysql.h>
42 #include <my_global.h>
43 #include <thr_lock.h>
44 #include "handler.h"
45 
46 class federatedx_io;
47 
48 /*
49   FEDERATEDX_SERVER will eventually be a structure that will be shared among
50   all FEDERATEDX_SHARE instances so that the federated server can minimise
51   the number of open connections. This will eventually lead to the support
52   of reliable XA federated tables.
53 */
54 typedef struct st_fedrated_server {
55   MEM_ROOT mem_root;
56   uint use_count, io_count;
57 
58   uchar *key;
59   uint key_length;
60 
61   const char *scheme;
62   const char *hostname;
63   const char *username;
64   const char *password;
65   const char *database;
66   const char *socket;
67   ushort port;
68 
69   const char *csname;
70 
71   mysql_mutex_t mutex;
72   federatedx_io *idle_list;
73 } FEDERATEDX_SERVER;
74 
75 /*
76   Please read ha_exmple.cc before reading this file.
77   Please keep in mind that the federatedx storage engine implements all methods
78   that are required to be implemented. handler.h has a full list of methods
79   that you can implement.
80 */
81 
82 /*
83   handler::print_error has a case statement for error numbers.
84   This value is (10000) is far out of range and will envoke the
85   default: case.
86   (Current error range is 120-159 from include/my_base.h)
87 */
88 #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
89 
90 #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5
91 #define FEDERATEDX_RECORDS_IN_RANGE 2
92 #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb
93 
94 /*
95   FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers
96   The example implements the minimum of what you will probably need.
97 */
98 typedef struct st_federatedx_share {
99   MEM_ROOT mem_root;
100 
101   bool parsed;
102   /* this key is unique db/tablename */
103   const char *share_key;
104   /*
105     the primary select query to be used in rnd_init
106   */
107   char *select_query;
108   /*
109     remote host info, parse_url supplies
110   */
111   char *server_name;
112   char *connection_string;
113   char *scheme;
114   char *hostname;
115   char *username;
116   char *password;
117   char *database;
118   char *table_name;
119   char *table;
120   char *socket;
121   char *sport;
122   int share_key_length;
123   ushort port;
124 
125   size_t table_name_length, server_name_length, connect_string_length;
126   uint use_count;
127   THR_LOCK lock;
128   FEDERATEDX_SERVER *s;
129 } FEDERATEDX_SHARE;
130 
131 
132 typedef struct st_federatedx_result FEDERATEDX_IO_RESULT;
133 typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
134 typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS;
135 typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
136 
137 class federatedx_io
138 {
139   friend class federatedx_txn;
140   FEDERATEDX_SERVER * const server;
141   federatedx_io **owner_ptr;
142   federatedx_io *txn_next;
143   federatedx_io *idle_next;
144   bool active;  /* currently participating in a transaction */
145   bool busy;    /* in use by a ha_federated instance */
146   bool readonly;/* indicates that no updates have occurred */
147 
148 protected:
set_active(bool new_active)149   void set_active(bool new_active)
150   { active= new_active; }
151 public:
152   federatedx_io(FEDERATEDX_SERVER *);
153   virtual ~federatedx_io();
154 
is_readonly()155   bool is_readonly() const { return readonly; }
is_active()156   bool is_active() const { return active; }
157 
get_charsetname()158   const char * get_charsetname() const
159   { return server->csname ? server->csname : "latin1"; }
160 
get_hostname()161   const char * get_hostname() const { return server->hostname; }
get_username()162   const char * get_username() const { return server->username; }
get_password()163   const char * get_password() const { return server->password; }
get_database()164   const char * get_database() const { return server->database; }
get_port()165   ushort       get_port() const     { return server->port; }
get_socket()166   const char * get_socket() const   { return server->socket; }
167 
168   static bool handles_scheme(const char *scheme);
169   static federatedx_io *construct(MEM_ROOT *server_root,
170                                   FEDERATEDX_SERVER *server);
171 
new(size_t size,MEM_ROOT * mem_root)172   static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
173   { return alloc_root(mem_root, size); }
delete(void * ptr,size_t size)174   static void operator delete(void *ptr, size_t size)
175   { TRASH_FREE(ptr, size); }
delete(void *,MEM_ROOT *)176   static void operator delete(void *, MEM_ROOT *)
177   { }
178 
179   virtual int query(const char *buffer, size_t length)=0;
180   virtual FEDERATEDX_IO_RESULT *store_result()=0;
181 
182   virtual size_t max_query_size() const=0;
183 
184   virtual my_ulonglong affected_rows() const=0;
185   virtual my_ulonglong last_insert_id() const=0;
186 
187   virtual int error_code()=0;
188   virtual const char *error_str()=0;
189 
190   virtual void reset()=0;
191   virtual int commit()=0;
192   virtual int rollback()=0;
193 
194   virtual int savepoint_set(ulong sp)=0;
195   virtual ulong savepoint_release(ulong sp)=0;
196   virtual ulong savepoint_rollback(ulong sp)=0;
197   virtual void savepoint_restrict(ulong sp)=0;
198 
199   virtual ulong last_savepoint() const=0;
200   virtual ulong actual_savepoint() const=0;
201   virtual bool is_autocommit() const=0;
202 
203   virtual bool table_metadata(ha_statistics *stats, const char *table_name,
204                               uint table_name_length, uint flag) = 0;
205 
206   /* resultset operations */
207 
208   virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
209   virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
210   virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
211   virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
212                                        FEDERATEDX_IO_ROWS **current= NULL)=0;
213   virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
214   virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
215                                       unsigned int column)=0;
216   virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
217                               unsigned int column) const=0;
218 
219   virtual size_t get_ref_length() const=0;
220   virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
221                              void *ref, FEDERATEDX_IO_ROWS *current)=0;
222   virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
223                             const void *ref)=0;
set_thd(void * thd)224   virtual void set_thd(void *thd) { }
225 
226 };
227 
228 
229 class federatedx_txn
230 {
231   federatedx_io *txn_list;
232   ulong savepoint_level;
233   ulong savepoint_stmt;
234   ulong savepoint_next;
235 
236   void release_scan();
237 public:
238   federatedx_txn();
239   ~federatedx_txn();
240 
has_connections()241   bool has_connections() const { return txn_list != NULL; }
in_transaction()242   bool in_transaction() const { return savepoint_next != 0; }
243   int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
244   void release(federatedx_io **io);
245   void close(FEDERATEDX_SERVER *);
246 
247   bool txn_begin();
248   int txn_commit();
249   int txn_rollback();
250 
251   bool sp_acquire(ulong *save);
252   int sp_rollback(ulong *save);
253   int sp_release(ulong *save);
254 
255   bool stmt_begin();
256   int stmt_commit();
257   int stmt_rollback();
258   void stmt_autocommit();
259 };
260 
261 
262 /*
263   Class definition for the storage engine
264 */
265 class ha_federatedx: public handler
266 {
267   friend int federatedx_db_init(void *p);
268 
269   THR_LOCK_DATA lock;      /* MySQL lock */
270   FEDERATEDX_SHARE *share;    /* Shared lock info */
271   federatedx_txn *txn;
272   federatedx_io *io;
273   FEDERATEDX_IO_RESULT *stored_result;
274   FEDERATEDX_IO_ROWS *current;
275   /**
276       Array of all stored results we get during a query execution.
277   */
278   DYNAMIC_ARRAY results;
279   bool position_called;
280   int remote_error_number;
281   char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
282   bool ignore_duplicates, replace_duplicates;
283   bool insert_dup_update, table_will_be_deleted;
284   DYNAMIC_STRING bulk_insert;
285 
286 private:
287   /*
288       return 0 on success
289       return errorcode otherwise
290   */
291   uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
292                                       FEDERATEDX_IO_RESULT *result);
293   bool create_where_from_key(String *to, KEY *key_info,
294                              const key_range *start_key,
295                              const key_range *end_key,
296                              bool records_in_range, bool eq_range);
297   int stash_remote_error();
298 
299   federatedx_txn *get_txn(THD *thd, bool no_create= FALSE);
300 
301   static int disconnect(handlerton *hton, MYSQL_THD thd);
302   static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv);
303   static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv);
304   static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
305   static int commit(handlerton *hton, MYSQL_THD thd, bool all);
306   static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
307   static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
308                                HA_CREATE_INFO *);
309 
310   bool append_stmt_insert(String *query);
311 
312   int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result);
313   int index_read_idx_with_result_set(uchar *buf, uint index,
314                                      const uchar *key,
315                                      uint key_len,
316                                      ha_rkey_function find_flag,
317                                      FEDERATEDX_IO_RESULT **result);
318   int real_query(const char *query, uint length);
319   int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag);
320 public:
321   ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg);
~ha_federatedx()322   ~ha_federatedx() {}
323   /*
324     The name of the index type that will be used for display
325     don't implement this method unless you really have indexes
326    */
327   // perhaps get index type
index_type(uint inx)328   const char *index_type(uint inx) { return "REMOTE"; }
329   /*
330     This is a list of flags that says what the storage engine
331     implements. The current table flags are documented in
332     handler.h
333   */
table_flags()334   ulonglong table_flags() const
335   {
336     /* fix server to be able to get remote server table flags */
337     return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED
338             | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS |
339             HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR |
340             HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | HA_CAN_ONLINE_BACKUPS |
341             HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY | HA_NON_COMPARABLE_ROWID);
342   }
343   /*
344     This is a bitmap of flags that says how the storage engine
345     implements indexes. The current index flags are documented in
346     handler.h. If you do not implement indexes, just return zero
347     here.
348 
349     part is the key part to check. First key part is 0
350     If all_parts it's set, MySQL want to know the flags for the combined
351     index up to and including 'part'.
352   */
353     /* fix server to be able to get remote server index flags */
index_flags(uint inx,uint part,bool all_parts)354   ulong index_flags(uint inx, uint part, bool all_parts) const
355   {
356     return (HA_READ_NEXT | HA_READ_RANGE);
357   }
max_supported_record_length()358   uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
max_supported_keys()359   uint max_supported_keys()          const { return MAX_KEY; }
max_supported_key_parts()360   uint max_supported_key_parts()     const { return MAX_REF_PARTS; }
max_supported_key_length()361   uint max_supported_key_length()    const { return FEDERATEDX_MAX_KEY_LENGTH; }
max_supported_key_part_length()362   uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
363   /*
364     Called in test_quick_select to determine if indexes should be used.
365     Normally, we need to know number of blocks . For federatedx we need to
366     know number of blocks on remote side, and number of packets and blocks
367     on the network side (?)
368     Talk to Kostja about this - how to get the
369     number of rows * ...
370     disk scan time on other side (block size, size of the row) + network time ...
371     The reason for "records * 1000" is that such a large number forces
372     this to use indexes "
373   */
scan_time()374   double scan_time()
375   {
376     DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
377     return (double)(stats.records*1000);
378   }
379   /*
380     The next method will never be called if you do not implement indexes.
381   */
read_time(uint index,uint ranges,ha_rows rows)382   double read_time(uint index, uint ranges, ha_rows rows)
383   {
384     /*
385       Per Brian, this number is bugus, but this method must be implemented,
386       and at a later date, he intends to document this issue for handler code
387     */
388     return (double) rows /  20.0+1;
389   }
390 
keys_to_use_for_scanning()391   const key_map *keys_to_use_for_scanning() { return &key_map_full; }
392   /*
393     Everything below are methods that we implment in ha_federatedx.cc.
394 
395     Most of these methods are not obligatory, skip them and
396     MySQL will treat them as not implemented
397   */
398   int open(const char *name, int mode, uint test_if_locked);    // required
399   int close(void);                                              // required
400 
401   void start_bulk_insert(ha_rows rows, uint flags);
402   int end_bulk_insert();
403   int write_row(const uchar *buf);
404   int update_row(const uchar *old_data, const uchar *new_data);
405   int delete_row(const uchar *buf);
406   int index_init(uint keynr, bool sorted);
407   ha_rows estimate_rows_upper_bound();
408   int index_read(uchar *buf, const uchar *key,
409                  uint key_len, enum ha_rkey_function find_flag);
410   int index_read_idx(uchar *buf, uint idx, const uchar *key,
411                      uint key_len, enum ha_rkey_function find_flag);
412   int index_next(uchar *buf);
413   int index_end();
414   int read_range_first(const key_range *start_key,
415                                const key_range *end_key,
416                                bool eq_range, bool sorted);
417   int read_range_next();
418   /*
419     unlike index_init(), rnd_init() can be called two times
420     without rnd_end() in between (it only makes sense if scan=1).
421     then the second call should prepare for the new table scan
422     (e.g if rnd_init allocates the cursor, second call should
423     position it to the start of the table, no need to deallocate
424     and allocate it again
425   */
426   int rnd_init(bool scan);                                      //required
427   int rnd_end();
428   int rnd_next(uchar *buf);                                      //required
429   int rnd_pos(uchar *buf, uchar *pos);                            //required
430   void position(const uchar *record);                            //required
431   /*
432     A ref is a pointer inside a local buffer. It is not comparable to
433     other ref's. This is never called as HA_NON_COMPARABLE_ROWID is set.
434   */
cmp_ref(const uchar * ref1,const uchar * ref2)435   int cmp_ref(const uchar *ref1, const uchar *ref2)
436   {
437 #ifdef NOT_YET
438     DBUG_ASSERT(0);
439     return 0;
440 #else
441     return handler::cmp_ref(ref1,ref2);         /* Works if table scan is used */
442 #endif
443   }
444   int info(uint);                                              //required
445   int extra(ha_extra_function operation);
446 
447   void update_auto_increment(void);
448   int repair(THD* thd, HA_CHECK_OPT* check_opt);
449   int optimize(THD* thd, HA_CHECK_OPT* check_opt);
450 
451   int delete_all_rows(void);
452   int create(const char *name, TABLE *form,
453              HA_CREATE_INFO *create_info);                      //required
454   ha_rows records_in_range(uint inx, key_range *start_key,
455                                    key_range *end_key);
table_cache_type()456   uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; }
457 
458   THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
459                              enum thr_lock_type lock_type);     //required
460   bool get_error_message(int error, String *buf);
461   int start_stmt(THD *thd, thr_lock_type lock_type);
462   int external_lock(THD *thd, int lock_type);
463   int reset(void);
464   int free_result(void);
465 
466   friend class ha_federatedx_derived_handler;
467   friend class ha_federatedx_select_handler;
468 };
469 
470 extern const char ident_quote_char;              // Character for quoting
471                                                  // identifiers
472 extern const char value_quote_char;              // Character for quoting
473                                                  // literals
474 
475 extern bool append_ident(String *string, const char *name, size_t length,
476                          const char quote_char);
477 
478 
479 extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
480                                            FEDERATEDX_SERVER *server);
481 extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
482                                           FEDERATEDX_SERVER *server);
483 
484 #include "federatedx_pushdown.h"
485 
486 #endif /* HA_FEDERATEDX_INCLUDED */
487