1 #ifndef HA_FEDERATEDX_INCLUDED 2 #define HA_FEDERATEDX_INCLUDED 3 /* 4 Copyright (c) 2008, Patrick Galbraith 5 All rights reserved. 6 7 Redistribution and use in source and binary forms, with or without 8 modification, are permitted provided that the following conditions are 9 met: 10 11 * Redistributions of source code must retain the above copyright 12 notice, this list of conditions and the following disclaimer. 13 14 * Redistributions in binary form must reproduce the above 15 copyright notice, this list of conditions and the following disclaimer 16 in the documentation and/or other materials provided with the 17 distribution. 18 19 * Neither the name of Patrick Galbraith nor the names of its 20 contributors may be used to endorse or promote products derived from 21 this software without specific prior written permission. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 29 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 30 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 31 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 32 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 33 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 37 #ifdef USE_PRAGMA_INTERFACE 38 #pragma interface /* gcc class implementation */ 39 #endif 40 41 //#include <mysql.h> 42 #include <my_global.h> 43 #include <thr_lock.h> 44 #include "handler.h" 45 46 class federatedx_io; 47 48 /* 49 FEDERATEDX_SERVER will eventually be a structure that will be shared among 50 all FEDERATEDX_SHARE instances so that the federated server can minimise 51 the number of open connections. This will eventually lead to the support 52 of reliable XA federated tables. 53 */ 54 typedef struct st_fedrated_server { 55 MEM_ROOT mem_root; 56 uint use_count, io_count; 57 58 uchar *key; 59 uint key_length; 60 61 const char *scheme; 62 const char *hostname; 63 const char *username; 64 const char *password; 65 const char *database; 66 const char *socket; 67 ushort port; 68 69 const char *csname; 70 71 mysql_mutex_t mutex; 72 federatedx_io *idle_list; 73 } FEDERATEDX_SERVER; 74 75 /* 76 Please read ha_exmple.cc before reading this file. 77 Please keep in mind that the federatedx storage engine implements all methods 78 that are required to be implemented. handler.h has a full list of methods 79 that you can implement. 80 */ 81 82 /* 83 handler::print_error has a case statement for error numbers. 84 This value is (10000) is far out of range and will envoke the 85 default: case. 86 (Current error range is 120-159 from include/my_base.h) 87 */ 88 #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 89 90 #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5 91 #define FEDERATEDX_RECORDS_IN_RANGE 2 92 #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb 93 94 /* 95 FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers 96 The example implements the minimum of what you will probably need. 97 */ 98 typedef struct st_federatedx_share { 99 MEM_ROOT mem_root; 100 101 bool parsed; 102 /* this key is unique db/tablename */ 103 const char *share_key; 104 /* 105 the primary select query to be used in rnd_init 106 */ 107 char *select_query; 108 /* 109 remote host info, parse_url supplies 110 */ 111 char *server_name; 112 char *connection_string; 113 char *scheme; 114 char *hostname; 115 char *username; 116 char *password; 117 char *database; 118 char *table_name; 119 char *table; 120 char *socket; 121 char *sport; 122 int share_key_length; 123 ushort port; 124 125 size_t table_name_length, server_name_length, connect_string_length; 126 uint use_count; 127 THR_LOCK lock; 128 FEDERATEDX_SERVER *s; 129 } FEDERATEDX_SHARE; 130 131 132 typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; 133 typedef struct st_federatedx_row FEDERATEDX_IO_ROW; 134 typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS; 135 typedef ptrdiff_t FEDERATEDX_IO_OFFSET; 136 137 class federatedx_io 138 { 139 friend class federatedx_txn; 140 FEDERATEDX_SERVER * const server; 141 federatedx_io **owner_ptr; 142 federatedx_io *txn_next; 143 federatedx_io *idle_next; 144 bool active; /* currently participating in a transaction */ 145 bool busy; /* in use by a ha_federated instance */ 146 bool readonly;/* indicates that no updates have occurred */ 147 148 protected: set_active(bool new_active)149 void set_active(bool new_active) 150 { active= new_active; } 151 public: 152 federatedx_io(FEDERATEDX_SERVER *); 153 virtual ~federatedx_io(); 154 is_readonly()155 bool is_readonly() const { return readonly; } is_active()156 bool is_active() const { return active; } 157 get_charsetname()158 const char * get_charsetname() const 159 { return server->csname ? server->csname : "latin1"; } 160 get_hostname()161 const char * get_hostname() const { return server->hostname; } get_username()162 const char * get_username() const { return server->username; } get_password()163 const char * get_password() const { return server->password; } get_database()164 const char * get_database() const { return server->database; } get_port()165 ushort get_port() const { return server->port; } get_socket()166 const char * get_socket() const { return server->socket; } 167 168 static bool handles_scheme(const char *scheme); 169 static federatedx_io *construct(MEM_ROOT *server_root, 170 FEDERATEDX_SERVER *server); 171 new(size_t size,MEM_ROOT * mem_root)172 static void *operator new(size_t size, MEM_ROOT *mem_root) throw () 173 { return alloc_root(mem_root, size); } delete(void * ptr,size_t size)174 static void operator delete(void *ptr, size_t size) 175 { TRASH_FREE(ptr, size); } delete(void *,MEM_ROOT *)176 static void operator delete(void *, MEM_ROOT *) 177 { } 178 179 virtual int query(const char *buffer, size_t length)=0; 180 virtual FEDERATEDX_IO_RESULT *store_result()=0; 181 182 virtual size_t max_query_size() const=0; 183 184 virtual my_ulonglong affected_rows() const=0; 185 virtual my_ulonglong last_insert_id() const=0; 186 187 virtual int error_code()=0; 188 virtual const char *error_str()=0; 189 190 virtual void reset()=0; 191 virtual int commit()=0; 192 virtual int rollback()=0; 193 194 virtual int savepoint_set(ulong sp)=0; 195 virtual ulong savepoint_release(ulong sp)=0; 196 virtual ulong savepoint_rollback(ulong sp)=0; 197 virtual void savepoint_restrict(ulong sp)=0; 198 199 virtual ulong last_savepoint() const=0; 200 virtual ulong actual_savepoint() const=0; 201 virtual bool is_autocommit() const=0; 202 203 virtual bool table_metadata(ha_statistics *stats, const char *table_name, 204 uint table_name_length, uint flag) = 0; 205 206 /* resultset operations */ 207 208 virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; 209 virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; 210 virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; 211 virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result, 212 FEDERATEDX_IO_ROWS **current= NULL)=0; 213 virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; 214 virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, 215 unsigned int column)=0; 216 virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, 217 unsigned int column) const=0; 218 219 virtual size_t get_ref_length() const=0; 220 virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, 221 void *ref, FEDERATEDX_IO_ROWS *current)=0; 222 virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, 223 const void *ref)=0; set_thd(void * thd)224 virtual void set_thd(void *thd) { } 225 226 }; 227 228 229 class federatedx_txn 230 { 231 federatedx_io *txn_list; 232 ulong savepoint_level; 233 ulong savepoint_stmt; 234 ulong savepoint_next; 235 236 void release_scan(); 237 public: 238 federatedx_txn(); 239 ~federatedx_txn(); 240 has_connections()241 bool has_connections() const { return txn_list != NULL; } in_transaction()242 bool in_transaction() const { return savepoint_next != 0; } 243 int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io); 244 void release(federatedx_io **io); 245 void close(FEDERATEDX_SERVER *); 246 247 bool txn_begin(); 248 int txn_commit(); 249 int txn_rollback(); 250 251 bool sp_acquire(ulong *save); 252 int sp_rollback(ulong *save); 253 int sp_release(ulong *save); 254 255 bool stmt_begin(); 256 int stmt_commit(); 257 int stmt_rollback(); 258 void stmt_autocommit(); 259 }; 260 261 262 /* 263 Class definition for the storage engine 264 */ 265 class ha_federatedx final : public handler 266 { 267 friend int federatedx_db_init(void *p); 268 269 THR_LOCK_DATA lock; /* MySQL lock */ 270 FEDERATEDX_SHARE *share; /* Shared lock info */ 271 federatedx_txn *txn; 272 federatedx_io *io; 273 FEDERATEDX_IO_RESULT *stored_result; 274 FEDERATEDX_IO_ROWS *current; 275 /** 276 Array of all stored results we get during a query execution. 277 */ 278 DYNAMIC_ARRAY results; 279 bool position_called; 280 int remote_error_number; 281 char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; 282 bool ignore_duplicates, replace_duplicates; 283 bool insert_dup_update, table_will_be_deleted; 284 DYNAMIC_STRING bulk_insert; 285 286 private: 287 /* 288 return 0 on success 289 return errorcode otherwise 290 */ 291 uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, 292 FEDERATEDX_IO_RESULT *result); 293 bool create_where_from_key(String *to, KEY *key_info, 294 const key_range *start_key, 295 const key_range *end_key, 296 bool records_in_range, bool eq_range); 297 int stash_remote_error(); 298 299 federatedx_txn *get_txn(THD *thd, bool no_create= FALSE); 300 301 static int disconnect(handlerton *hton, MYSQL_THD thd); 302 static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv); 303 static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv); 304 static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); 305 static int commit(handlerton *hton, MYSQL_THD thd, bool all); 306 static int rollback(handlerton *hton, MYSQL_THD thd, bool all); 307 static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, 308 HA_CREATE_INFO *); 309 310 bool append_stmt_insert(String *query); 311 312 int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result); 313 int index_read_idx_with_result_set(uchar *buf, uint index, 314 const uchar *key, 315 uint key_len, 316 ha_rkey_function find_flag, 317 FEDERATEDX_IO_RESULT **result); 318 int real_query(const char *query, uint length); 319 int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag); 320 public: 321 ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); ~ha_federatedx()322 ~ha_federatedx() {} 323 /* 324 The name of the index type that will be used for display 325 don't implement this method unless you really have indexes 326 */ 327 // perhaps get index type index_type(uint inx)328 const char *index_type(uint inx) { return "REMOTE"; } 329 /* 330 This is a list of flags that says what the storage engine 331 implements. The current table flags are documented in 332 handler.h 333 */ table_flags()334 ulonglong table_flags() const 335 { 336 /* fix server to be able to get remote server table flags */ 337 return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED 338 | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | 339 HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR | 340 HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | HA_CAN_ONLINE_BACKUPS | 341 HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY | HA_NON_COMPARABLE_ROWID); 342 } 343 /* 344 This is a bitmap of flags that says how the storage engine 345 implements indexes. The current index flags are documented in 346 handler.h. If you do not implement indexes, just return zero 347 here. 348 349 part is the key part to check. First key part is 0 350 If all_parts it's set, MySQL want to know the flags for the combined 351 index up to and including 'part'. 352 */ 353 /* fix server to be able to get remote server index flags */ index_flags(uint inx,uint part,bool all_parts)354 ulong index_flags(uint inx, uint part, bool all_parts) const 355 { 356 return (HA_READ_NEXT | HA_READ_RANGE); 357 } max_supported_record_length()358 uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } max_supported_keys()359 uint max_supported_keys() const { return MAX_KEY; } max_supported_key_parts()360 uint max_supported_key_parts() const { return MAX_REF_PARTS; } max_supported_key_length()361 uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } max_supported_key_part_length()362 uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } 363 /* 364 Called in test_quick_select to determine if indexes should be used. 365 Normally, we need to know number of blocks . For federatedx we need to 366 know number of blocks on remote side, and number of packets and blocks 367 on the network side (?) 368 Talk to Kostja about this - how to get the 369 number of rows * ... 370 disk scan time on other side (block size, size of the row) + network time ... 371 The reason for "records * 1000" is that such a large number forces 372 this to use indexes " 373 */ scan_time()374 double scan_time() 375 { 376 DBUG_PRINT("info", ("records %lu", (ulong) stats.records)); 377 return (double)(stats.records*1000); 378 } 379 /* 380 The next method will never be called if you do not implement indexes. 381 */ read_time(uint index,uint ranges,ha_rows rows)382 double read_time(uint index, uint ranges, ha_rows rows) 383 { 384 /* 385 Per Brian, this number is bugus, but this method must be implemented, 386 and at a later date, he intends to document this issue for handler code 387 */ 388 return (double) rows / 20.0+1; 389 } 390 keys_to_use_for_scanning()391 const key_map *keys_to_use_for_scanning() { return &key_map_full; } 392 /* 393 Everything below are methods that we implment in ha_federatedx.cc. 394 395 Most of these methods are not obligatory, skip them and 396 MySQL will treat them as not implemented 397 */ 398 int open(const char *name, int mode, uint test_if_locked); // required 399 int close(void); // required 400 401 void start_bulk_insert(ha_rows rows, uint flags); 402 int end_bulk_insert(); 403 int write_row(const uchar *buf); 404 int update_row(const uchar *old_data, const uchar *new_data); 405 int delete_row(const uchar *buf); 406 int index_init(uint keynr, bool sorted); 407 ha_rows estimate_rows_upper_bound(); 408 int index_read(uchar *buf, const uchar *key, 409 uint key_len, enum ha_rkey_function find_flag); 410 int index_read_idx(uchar *buf, uint idx, const uchar *key, 411 uint key_len, enum ha_rkey_function find_flag); 412 int index_next(uchar *buf); 413 int index_end(); 414 int read_range_first(const key_range *start_key, 415 const key_range *end_key, 416 bool eq_range, bool sorted); 417 int read_range_next(); 418 /* 419 unlike index_init(), rnd_init() can be called two times 420 without rnd_end() in between (it only makes sense if scan=1). 421 then the second call should prepare for the new table scan 422 (e.g if rnd_init allocates the cursor, second call should 423 position it to the start of the table, no need to deallocate 424 and allocate it again 425 */ 426 int rnd_init(bool scan); //required 427 int rnd_end(); 428 int rnd_next(uchar *buf); //required 429 int rnd_pos(uchar *buf, uchar *pos); //required 430 void position(const uchar *record); //required 431 /* 432 A ref is a pointer inside a local buffer. It is not comparable to 433 other ref's. This is never called as HA_NON_COMPARABLE_ROWID is set. 434 */ cmp_ref(const uchar * ref1,const uchar * ref2)435 int cmp_ref(const uchar *ref1, const uchar *ref2) 436 { 437 #ifdef NOT_YET 438 DBUG_ASSERT(0); 439 return 0; 440 #else 441 return handler::cmp_ref(ref1,ref2); /* Works if table scan is used */ 442 #endif 443 } 444 int info(uint); //required 445 int extra(ha_extra_function operation); 446 447 void update_auto_increment(void); 448 int repair(THD* thd, HA_CHECK_OPT* check_opt); 449 int optimize(THD* thd, HA_CHECK_OPT* check_opt); delete_table(const char * name)450 int delete_table(const char *name) 451 { 452 return 0; 453 } 454 int delete_all_rows(void); 455 int create(const char *name, TABLE *form, 456 HA_CREATE_INFO *create_info); //required 457 ha_rows records_in_range(uint inx, const key_range *start_key, 458 const key_range *end_key, page_range *pages); table_cache_type()459 uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; } 460 461 THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, 462 enum thr_lock_type lock_type); //required 463 bool get_error_message(int error, String *buf); 464 int start_stmt(THD *thd, thr_lock_type lock_type); 465 int external_lock(THD *thd, int lock_type); 466 int reset(void); 467 int free_result(void); 468 469 friend class ha_federatedx_derived_handler; 470 friend class ha_federatedx_select_handler; 471 }; 472 473 extern const char ident_quote_char; // Character for quoting 474 // identifiers 475 extern const char value_quote_char; // Character for quoting 476 // literals 477 478 extern bool append_ident(String *string, const char *name, size_t length, 479 const char quote_char); 480 481 482 extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, 483 FEDERATEDX_SERVER *server); 484 extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, 485 FEDERATEDX_SERVER *server); 486 487 #include "federatedx_pushdown.h" 488 489 #endif /* HA_FEDERATEDX_INCLUDED */ 490