1 /* 2 Copyright (c) 2008, Patrick Galbraith 3 All rights reserved. 4 5 Redistribution and use in source and binary forms, with or without 6 modification, are permitted provided that the following conditions are 7 met: 8 9 * Redistributions of source code must retain the above copyright 10 notice, this list of conditions and the following disclaimer. 11 12 * Redistributions in binary form must reproduce the above 13 copyright notice, this list of conditions and the following disclaimer 14 in the documentation and/or other materials provided with the 15 distribution. 16 17 * Neither the name of Patrick Galbraith nor the names of its 18 contributors may be used to endorse or promote products derived from 19 this software without specific prior written permission. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 35 #ifdef USE_PRAGMA_INTERFACE 36 #pragma interface /* gcc class implementation */ 37 #endif 38 39 //#include <mysql.h> 40 #include <my_global.h> 41 #include <thr_lock.h> 42 #include "handler.h" 43 44 class federatedx_io; 45 46 /* 47 FEDERATEDX_SERVER will eventually be a structure that will be shared among 48 all FEDERATEDX_SHARE instances so that the federated server can minimise 49 the number of open connections. This will eventually lead to the support 50 of reliable XA federated tables. 51 */ 52 typedef struct st_fedrated_server { 53 MEM_ROOT mem_root; 54 uint use_count, io_count; 55 56 uchar *key; 57 uint key_length; 58 59 const char *scheme; 60 const char *hostname; 61 const char *username; 62 const char *password; 63 const char *database; 64 const char *socket; 65 ushort port; 66 67 const char *csname; 68 69 mysql_mutex_t mutex; 70 federatedx_io *idle_list; 71 } FEDERATEDX_SERVER; 72 73 /* 74 Please read ha_exmple.cc before reading this file. 75 Please keep in mind that the federatedx storage engine implements all methods 76 that are required to be implemented. handler.h has a full list of methods 77 that you can implement. 78 */ 79 80 /* 81 handler::print_error has a case statement for error numbers. 82 This value is (10000) is far out of range and will envoke the 83 default: case. 84 (Current error range is 120-159 from include/my_base.h) 85 */ 86 #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 87 88 #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5 89 #define FEDERATEDX_RECORDS_IN_RANGE 2 90 #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb 91 92 /* 93 FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers 94 The example implements the minimum of what you will probably need. 95 */ 96 typedef struct st_federatedx_share { 97 MEM_ROOT mem_root; 98 99 bool parsed; 100 /* this key is unique db/tablename */ 101 const char *share_key; 102 /* 103 the primary select query to be used in rnd_init 104 */ 105 char *select_query; 106 /* 107 remote host info, parse_url supplies 108 */ 109 char *server_name; 110 char *connection_string; 111 char *scheme; 112 char *hostname; 113 char *username; 114 char *password; 115 char *database; 116 char *table_name; 117 char *table; 118 char *socket; 119 char *sport; 120 int share_key_length; 121 ushort port; 122 123 size_t table_name_length, server_name_length, connect_string_length; 124 uint use_count; 125 THR_LOCK lock; 126 FEDERATEDX_SERVER *s; 127 } FEDERATEDX_SHARE; 128 129 130 typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; 131 typedef struct st_federatedx_row FEDERATEDX_IO_ROW; 132 typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS; 133 typedef ptrdiff_t FEDERATEDX_IO_OFFSET; 134 135 class federatedx_io 136 { 137 friend class federatedx_txn; 138 FEDERATEDX_SERVER * const server; 139 federatedx_io **owner_ptr; 140 federatedx_io *txn_next; 141 federatedx_io *idle_next; 142 bool active; /* currently participating in a transaction */ 143 bool busy; /* in use by a ha_federated instance */ 144 bool readonly;/* indicates that no updates have occurred */ 145 146 protected: set_active(bool new_active)147 void set_active(bool new_active) 148 { active= new_active; } 149 public: 150 federatedx_io(FEDERATEDX_SERVER *); 151 virtual ~federatedx_io(); 152 is_readonly()153 bool is_readonly() const { return readonly; } is_active()154 bool is_active() const { return active; } 155 get_charsetname()156 const char * get_charsetname() const 157 { return server->csname ? server->csname : "latin1"; } 158 get_hostname()159 const char * get_hostname() const { return server->hostname; } get_username()160 const char * get_username() const { return server->username; } get_password()161 const char * get_password() const { return server->password; } get_database()162 const char * get_database() const { return server->database; } get_port()163 ushort get_port() const { return server->port; } get_socket()164 const char * get_socket() const { return server->socket; } 165 166 static bool handles_scheme(const char *scheme); 167 static federatedx_io *construct(MEM_ROOT *server_root, 168 FEDERATEDX_SERVER *server); 169 new(size_t size,MEM_ROOT * mem_root)170 static void *operator new(size_t size, MEM_ROOT *mem_root) throw () 171 { return alloc_root(mem_root, size); } delete(void * ptr,size_t size)172 static void operator delete(void *ptr, size_t size) 173 { TRASH_FREE(ptr, size); } delete(void *,MEM_ROOT *)174 static void operator delete(void *, MEM_ROOT *) 175 { } 176 177 virtual int query(const char *buffer, size_t length)=0; 178 virtual FEDERATEDX_IO_RESULT *store_result()=0; 179 180 virtual size_t max_query_size() const=0; 181 182 virtual my_ulonglong affected_rows() const=0; 183 virtual my_ulonglong last_insert_id() const=0; 184 185 virtual int error_code()=0; 186 virtual const char *error_str()=0; 187 188 virtual void reset()=0; 189 virtual int commit()=0; 190 virtual int rollback()=0; 191 192 virtual int savepoint_set(ulong sp)=0; 193 virtual ulong savepoint_release(ulong sp)=0; 194 virtual ulong savepoint_rollback(ulong sp)=0; 195 virtual void savepoint_restrict(ulong sp)=0; 196 197 virtual ulong last_savepoint() const=0; 198 virtual ulong actual_savepoint() const=0; 199 virtual bool is_autocommit() const=0; 200 201 virtual bool table_metadata(ha_statistics *stats, const char *table_name, 202 uint table_name_length, uint flag) = 0; 203 204 /* resultset operations */ 205 206 virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; 207 virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; 208 virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; 209 virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result, 210 FEDERATEDX_IO_ROWS **current= NULL)=0; 211 virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; 212 virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, 213 unsigned int column)=0; 214 virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, 215 unsigned int column) const=0; 216 217 virtual size_t get_ref_length() const=0; 218 virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, 219 void *ref, FEDERATEDX_IO_ROWS *current)=0; 220 virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, 221 const void *ref)=0; set_thd(void * thd)222 virtual void set_thd(void *thd) { } 223 224 }; 225 226 227 class federatedx_txn 228 { 229 federatedx_io *txn_list; 230 ulong savepoint_level; 231 ulong savepoint_stmt; 232 ulong savepoint_next; 233 234 void release_scan(); 235 public: 236 federatedx_txn(); 237 ~federatedx_txn(); 238 has_connections()239 bool has_connections() const { return txn_list != NULL; } in_transaction()240 bool in_transaction() const { return savepoint_next != 0; } 241 int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io); 242 void release(federatedx_io **io); 243 void close(FEDERATEDX_SERVER *); 244 245 bool txn_begin(); 246 int txn_commit(); 247 int txn_rollback(); 248 249 bool sp_acquire(ulong *save); 250 int sp_rollback(ulong *save); 251 int sp_release(ulong *save); 252 253 bool stmt_begin(); 254 int stmt_commit(); 255 int stmt_rollback(); 256 void stmt_autocommit(); 257 }; 258 259 260 /* 261 Class definition for the storage engine 262 */ 263 class ha_federatedx: public handler 264 { 265 friend int federatedx_db_init(void *p); 266 267 THR_LOCK_DATA lock; /* MySQL lock */ 268 FEDERATEDX_SHARE *share; /* Shared lock info */ 269 federatedx_txn *txn; 270 federatedx_io *io; 271 FEDERATEDX_IO_RESULT *stored_result; 272 FEDERATEDX_IO_ROWS *current; 273 /** 274 Array of all stored results we get during a query execution. 275 */ 276 DYNAMIC_ARRAY results; 277 bool position_called; 278 int remote_error_number; 279 char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; 280 bool ignore_duplicates, replace_duplicates; 281 bool insert_dup_update, table_will_be_deleted; 282 DYNAMIC_STRING bulk_insert; 283 284 private: 285 /* 286 return 0 on success 287 return errorcode otherwise 288 */ 289 uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, 290 FEDERATEDX_IO_RESULT *result); 291 bool create_where_from_key(String *to, KEY *key_info, 292 const key_range *start_key, 293 const key_range *end_key, 294 bool records_in_range, bool eq_range); 295 int stash_remote_error(); 296 297 federatedx_txn *get_txn(THD *thd, bool no_create= FALSE); 298 299 static int disconnect(handlerton *hton, MYSQL_THD thd); 300 static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv); 301 static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv); 302 static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); 303 static int commit(handlerton *hton, MYSQL_THD thd, bool all); 304 static int rollback(handlerton *hton, MYSQL_THD thd, bool all); 305 static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, 306 HA_CREATE_INFO *); 307 308 bool append_stmt_insert(String *query); 309 310 int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result); 311 int index_read_idx_with_result_set(uchar *buf, uint index, 312 const uchar *key, 313 uint key_len, 314 ha_rkey_function find_flag, 315 FEDERATEDX_IO_RESULT **result); 316 int real_query(const char *query, uint length); 317 int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag); 318 public: 319 ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); ~ha_federatedx()320 ~ha_federatedx() {} 321 /* 322 The name of the index type that will be used for display 323 don't implement this method unless you really have indexes 324 */ 325 // perhaps get index type index_type(uint inx)326 const char *index_type(uint inx) { return "REMOTE"; } 327 /* 328 This is a list of flags that says what the storage engine 329 implements. The current table flags are documented in 330 handler.h 331 */ table_flags()332 ulonglong table_flags() const 333 { 334 /* fix server to be able to get remote server table flags */ 335 return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED 336 | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | 337 HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR | 338 HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | 339 HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY); 340 } 341 /* 342 This is a bitmap of flags that says how the storage engine 343 implements indexes. The current index flags are documented in 344 handler.h. If you do not implement indexes, just return zero 345 here. 346 347 part is the key part to check. First key part is 0 348 If all_parts it's set, MySQL want to know the flags for the combined 349 index up to and including 'part'. 350 */ 351 /* fix server to be able to get remote server index flags */ index_flags(uint inx,uint part,bool all_parts)352 ulong index_flags(uint inx, uint part, bool all_parts) const 353 { 354 return (HA_READ_NEXT | HA_READ_RANGE); 355 } max_supported_record_length()356 uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } max_supported_keys()357 uint max_supported_keys() const { return MAX_KEY; } max_supported_key_parts()358 uint max_supported_key_parts() const { return MAX_REF_PARTS; } max_supported_key_length()359 uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } max_supported_key_part_length()360 uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } 361 /* 362 Called in test_quick_select to determine if indexes should be used. 363 Normally, we need to know number of blocks . For federatedx we need to 364 know number of blocks on remote side, and number of packets and blocks 365 on the network side (?) 366 Talk to Kostja about this - how to get the 367 number of rows * ... 368 disk scan time on other side (block size, size of the row) + network time ... 369 The reason for "records * 1000" is that such a large number forces 370 this to use indexes " 371 */ scan_time()372 double scan_time() 373 { 374 DBUG_PRINT("info", ("records %lu", (ulong) stats.records)); 375 return (double)(stats.records*1000); 376 } 377 /* 378 The next method will never be called if you do not implement indexes. 379 */ read_time(uint index,uint ranges,ha_rows rows)380 double read_time(uint index, uint ranges, ha_rows rows) 381 { 382 /* 383 Per Brian, this number is bugus, but this method must be implemented, 384 and at a later date, he intends to document this issue for handler code 385 */ 386 return (double) rows / 20.0+1; 387 } 388 keys_to_use_for_scanning()389 const key_map *keys_to_use_for_scanning() { return &key_map_full; } 390 /* 391 Everything below are methods that we implment in ha_federatedx.cc. 392 393 Most of these methods are not obligatory, skip them and 394 MySQL will treat them as not implemented 395 */ 396 int open(const char *name, int mode, uint test_if_locked); // required 397 int close(void); // required 398 399 void start_bulk_insert(ha_rows rows, uint flags); 400 int end_bulk_insert(); 401 int write_row(uchar *buf); 402 int update_row(const uchar *old_data, const uchar *new_data); 403 int delete_row(const uchar *buf); 404 int index_init(uint keynr, bool sorted); 405 ha_rows estimate_rows_upper_bound(); 406 int index_read(uchar *buf, const uchar *key, 407 uint key_len, enum ha_rkey_function find_flag); 408 int index_read_idx(uchar *buf, uint idx, const uchar *key, 409 uint key_len, enum ha_rkey_function find_flag); 410 int index_next(uchar *buf); 411 int index_end(); 412 int read_range_first(const key_range *start_key, 413 const key_range *end_key, 414 bool eq_range, bool sorted); 415 int read_range_next(); 416 /* 417 unlike index_init(), rnd_init() can be called two times 418 without rnd_end() in between (it only makes sense if scan=1). 419 then the second call should prepare for the new table scan 420 (e.g if rnd_init allocates the cursor, second call should 421 position it to the start of the table, no need to deallocate 422 and allocate it again 423 */ 424 int rnd_init(bool scan); //required 425 int rnd_end(); 426 int rnd_next(uchar *buf); //required 427 int rnd_pos(uchar *buf, uchar *pos); //required 428 void position(const uchar *record); //required 429 int info(uint); //required 430 int extra(ha_extra_function operation); 431 432 void update_auto_increment(void); 433 int repair(THD* thd, HA_CHECK_OPT* check_opt); 434 int optimize(THD* thd, HA_CHECK_OPT* check_opt); 435 436 int delete_all_rows(void); 437 int create(const char *name, TABLE *form, 438 HA_CREATE_INFO *create_info); //required 439 ha_rows records_in_range(uint inx, key_range *start_key, 440 key_range *end_key); table_cache_type()441 uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; } 442 443 THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, 444 enum thr_lock_type lock_type); //required 445 bool get_error_message(int error, String *buf); 446 int start_stmt(THD *thd, thr_lock_type lock_type); 447 int external_lock(THD *thd, int lock_type); 448 int reset(void); 449 int free_result(void); 450 }; 451 452 extern const char ident_quote_char; // Character for quoting 453 // identifiers 454 extern const char value_quote_char; // Character for quoting 455 // literals 456 457 extern bool append_ident(String *string, const char *name, size_t length, 458 const char quote_char); 459 460 461 extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, 462 FEDERATEDX_SERVER *server); 463 extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, 464 FEDERATEDX_SERVER *server); 465