1 /* Copyright (C) MariaDB Corporation Ab
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License as published by
5   the Free Software Foundation; version 2 of the License.
6 
7   This program is distributed in the hope that it will be useful,
8   but WITHOUT ANY WARRANTY; without even the implied warranty of
9   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10   GNU General Public License for more details.
11 
12   You should have received a copy of the GNU General Public License
13   along with this program; if not, write to the Free Software
14 	Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 /**
17   @file ha_connect.cc
18 
19   @brief
20   The ha_connect engine is a stubbed storage engine that enables to create tables
21   based on external data. Principally they are based on plain files of many
22   different types, but also on collections of such files, collection of tables,
23   local or remote MySQL/MariaDB tables retrieved via MySQL API,
24   ODBC/JDBC tables retrieving data from other DBMS having an ODBC/JDBC server,
25 	and even virtual tables.
26 
27   @details
28   ha_connect will let you create/open/delete tables, the created table can be
29   done specifying an already existing file, the drop table command will just
30   suppress the table definition but not the eventual data file.
31   Indexes are not supported for all table types but data can be inserted,
32   updated or deleted.
33 
34   You can enable the CONNECT storage engine in your build by doing the
35   following during your build process:<br> ./configure
36   --with-connect-storage-engine
37 
38   You can install the CONNECT handler as all other storage handlers.
39 
40   Once this is done, MySQL will let you create tables with:<br>
41   CREATE TABLE <table name> (...) ENGINE=CONNECT;
42 
43   The example storage engine does not use table locks. It
44   implements an example "SHARE" that is inserted into a hash by table
45   name. This is not used yet.
46 
47   Please read the object definition in ha_connect.h before reading the rest
48   of this file.
49 
50   @note
51   This MariaDB CONNECT handler is currently an adaptation of the XDB handler
52   that was written for MySQL version 4.1.2-alpha. Its overall design should
53   be enhanced in the future to meet MariaDB requirements.
54 
55   @note
56   It was written also from the Brian's ha_example handler and contains parts
57   of it that are there, such as table and system  variables.
58 
59   @note
60   When you create an CONNECT table, the MySQL Server creates a table .frm
61   (format) file in the database directory, using the table name as the file
62   name as is customary with MySQL.
63   For file based tables, if a file name is not specified, this is an inward
64   table. An empty file is made in the current data directory that you can
65   populate later like for other engine tables. This file modified on ALTER
66   and is deleted when dropping the table.
67   If a file name is specified, this in an outward table. The specified file
68   will be used as representing the table data and will not be modified or
69   deleted on command such as ALTER or DROP.
70   To get an idea of what occurs, here is an example select that would do
71   a scan of an entire table:
72 
73   @code
74   ha-connect::open
75   ha_connect::store_lock
76   ha_connect::external_lock
77   ha_connect::info
78   ha_connect::rnd_init
79   ha_connect::extra
80   ENUM HA_EXTRA_CACHE        Cache record in HA_rrnd()
81   ha_connect::rnd_next
82   ha_connect::rnd_next
83   ha_connect::rnd_next
84   ha_connect::rnd_next
85   ha_connect::rnd_next
86   ha_connect::rnd_next
87   ha_connect::rnd_next
88   ha_connect::rnd_next
89   ha_connect::rnd_next
90   ha_connect::extra
91   ENUM HA_EXTRA_NO_CACHE     End caching of records (def)
92   ha_connect::external_lock
93   ha_connect::extra
94   ENUM HA_EXTRA_RESET        Reset database to after open
95   @endcode
96 
97   Here you see that the connect storage engine has 9 rows called before
98   rnd_next signals that it has reached the end of its data. Calls to
99   ha_connect::extra() are hints as to what will be occuring to the request.
100 
101 	Author  Olivier Bertrand
102 	*/
103 
104 #ifdef USE_PRAGMA_IMPLEMENTATION
105 #pragma implementation        // gcc: Class implementation
106 #endif
107 
108 #define MYSQL_SERVER 1
109 #define DONT_DEFINE_VOID
110 #include <my_global.h>
111 #include "sql_parse.h"
112 #include "sql_base.h"
113 #include "sql_partition.h"
114 #undef  OFFSET
115 
116 #define NOPARSE
117 #define NJDBC
118 #if defined(UNIX)
119 #include "osutil.h"
120 #endif   // UNIX
121 #include "global.h"
122 #include "plgdbsem.h"
123 #include "xtable.h"
124 #include "tabext.h"
125 #if defined(ODBC_SUPPORT)
126 #include "odbccat.h"
127 #endif   // ODBC_SUPPORT
128 #if defined(JAVA_SUPPORT)
129 #include "tabjdbc.h"
130 #include "jdbconn.h"
131 #endif   // JAVA_SUPPORT
132 #if defined(CMGO_SUPPORT)
133 #include "cmgoconn.h"
134 #endif   // CMGO_SUPPORT
135 #include "tabmysql.h"
136 #include "filamdbf.h"
137 #include "tabxcl.h"
138 #include "tabfmt.h"
139 //#include "reldef.h"
140 #include "tabcol.h"
141 #include "xindex.h"
142 #if defined(_WIN32)
143 #include <io.h>
144 #include "tabwmi.h"
145 #endif   // _WIN32
146 #include "connect.h"
147 #include "user_connect.h"
148 #include "ha_connect.h"
149 #include "myutil.h"
150 #include "preparse.h"
151 #include "inihandl.h"
152 #if defined(LIBXML2_SUPPORT)
153 #include "libdoc.h"
154 #endif   // LIBXML2_SUPPORT
155 #include "taboccur.h"
156 #include "tabpivot.h"
157 #include "tabfix.h"
158 
159 #define my_strupr(p)    my_caseup_str(default_charset_info, (p));
160 #define my_strlwr(p)    my_casedn_str(default_charset_info, (p));
161 #define my_stricmp(a,b) my_strcasecmp(default_charset_info, (a), (b))
162 
163 
164 /***********************************************************************/
165 /*  Initialize the ha_connect static members.                          */
166 /***********************************************************************/
167 #define SZCONV     1024							// Default converted text size
168 #define SZWORK 67108864             // Default work area size 64M
169 #define SZWMIN  4194304             // Minimum work area size  4M
170 #define JSONMAX      50             // JSON Default max grp size
171 
172 extern "C" {
173        char version[]= "Version 1.07.0003 June 06, 2021";
174 #if defined(_WIN32)
175        char compver[]= "Version 1.07.0003 " __DATE__ " "  __TIME__;
176        char slash= '\\';
177 #else   // !_WIN32
178        char slash= '/';
179 #endif  // !_WIN32
180 } // extern "C"
181 
182 #if MYSQL_VERSION_ID > 100200
183 #define stored_in_db stored_in_db()
184 #endif   // MYSQL_VERSION_ID
185 
186 #if defined(XMAP)
187        my_bool xmap= false;
188 #endif   // XMAP
189 
190 ulong  ha_connect::num= 0;
191 
192 #if defined(XMSG)
193 extern "C" {
194        char *msg_path;
195 } // extern "C"
196 #endif   // XMSG
197 
198 #if defined(JAVA_SUPPORT)
199 	     char *JvmPath;
200 			 char *ClassPath;
201 #endif   // JAVA_SUPPORT
202 
203 pthread_mutex_t parmut;
204 pthread_mutex_t usrmut;
205 pthread_mutex_t tblmut;
206 
207 #if defined(DEVELOPMENT)
208 char *GetUserVariable(PGLOBAL g, const uchar *varname);
209 
GetUserVariable(PGLOBAL g,const uchar * varname)210 char *GetUserVariable(PGLOBAL g, const uchar *varname)
211 {
212 	char buf[1024];
213 	bool b;
214 	THD *thd= current_thd;
215 	CHARSET_INFO *cs= system_charset_info;
216 	String *str= NULL, tmp(buf, sizeof(buf), cs);
217 	HASH uvars= thd->user_vars;
218 	user_var_entry *uvar= (user_var_entry*)my_hash_search(&uvars, varname, 0);
219 
220 	if (uvar)
221 		str= uvar->val_str(&b, &tmp, NOT_FIXED_DEC);
222 
223 	return str ? PlugDup(g, str->ptr()) : NULL;
224 }; // end of GetUserVariable
225 #endif   // DEVELOPMENT
226 
227 /***********************************************************************/
228 /*  Utility functions.                                                 */
229 /***********************************************************************/
230 PQRYRES OEMColumns(PGLOBAL g, PTOS topt, char *tab, char *db, bool info);
231 PQRYRES VirColumns(PGLOBAL g, bool info);
232 PQRYRES JSONColumns(PGLOBAL g, PCSZ db, PCSZ dsn, PTOS topt, bool info);
233 #ifdef    BSON_SUPPORT
234 PQRYRES BSONColumns(PGLOBAL g, PCSZ db, PCSZ dsn, PTOS topt, bool info);
235 #endif // BSON_SUPPORT
236 PQRYRES XMLColumns(PGLOBAL g, char *db, char *tab, PTOS topt, bool info);
237 #if defined(REST_SUPPORT)
238 PQRYRES RESTColumns(PGLOBAL g, PTOS topt, char *tab, char *db, bool info);
239 #endif // REST_SUPPORT
240 #if defined(JAVA_SUPPORT)
241 PQRYRES MGOColumns(PGLOBAL g, PCSZ db, PCSZ url, PTOS topt, bool info);
242 #endif   // JAVA_SUPPORT
243 int     TranslateJDBCType(int stp, char *tn, int prec, int& len, char& v);
244 void    PushWarning(PGLOBAL g, THD *thd, int level);
245 bool    CheckSelf(PGLOBAL g, TABLE_SHARE *s, PCSZ host, PCSZ db,
246 	                                           PCSZ tab, PCSZ src, int port);
247 #if defined(ZIP_SUPPORT)
248 bool    ZipLoadFile(PGLOBAL, PCSZ, PCSZ, PCSZ, bool, bool);
249 #endif   // ZIP_SUPPORT
250 bool    ExactInfo(void);
251 #if defined(CMGO_SUPPORT)
252 //void    mongo_init(bool);
253 #endif   // CMGO_SUPPORT
254 USETEMP UseTemp(void);
255 int     GetConvSize(void);
256 TYPCONV GetTypeConv(void);
257 int     GetDefaultDepth(void);
258 int     GetDefaultPrec(void);
259 bool    JsonAllPath(void);
260 char   *GetJsonNull(void);
261 uint    GetJsonGrpSize(void);
262 char   *GetJavaWrapper(void);
263 #if defined(BSON_SUPPORT)
264 bool    Force_Bson(void);
265 #endif   // BSON_SUPPORT
266 size_t  GetWorkSize(void);
267 void    SetWorkSize(size_t);
268 extern "C" const char *msglang(void);
269 
270 static void PopUser(PCONNECT xp);
271 static PCONNECT GetUser(THD *thd, PCONNECT xp);
272 static PGLOBAL  GetPlug(THD *thd, PCONNECT& lxp);
273 
274 static handler *connect_create_handler(handlerton *hton,
275                                        TABLE_SHARE *table,
276                                        MEM_ROOT *mem_root);
277 
278 static bool checkPrivileges(THD* thd, TABTYPE type, PTOS options,
279                             const char* db, TABLE* table = NULL,
280                             bool quick = false);
281 
282 static int connect_assisted_discovery(handlerton *hton, THD* thd,
283                                       TABLE_SHARE *table_s,
284                                       HA_CREATE_INFO *info);
285 
286 /****************************************************************************/
287 /*  Return str as a zero terminated string.                                 */
288 /****************************************************************************/
strz(PGLOBAL g,LEX_CSTRING & ls)289 static char *strz(PGLOBAL g, LEX_CSTRING &ls)
290 {
291   char* str= NULL;
292 
293   if (ls.str) {
294     str= (char*)PlugSubAlloc(g, NULL, ls.length + 1);
295     memcpy(str, ls.str, ls.length);
296     str[ls.length] = 0;
297   } // endif str
298 
299   return str;
300 } // end of strz
301 
302 /***********************************************************************/
303 /*  CONNECT session variables definitions.                             */
304 /***********************************************************************/
305 // Tracing: 0 no, 1 yes, 2 more, 4 index... 511 all
306 const char *xtrace_names[] =
307 {
308 	"YES", "MORE", "INDEX", "MEMORY", "SUBALLOC",
309 	"QUERY", "STMT", "HANDLER", "BLOCK", "MONGO", NullS
310 };
311 
312 TYPELIB xtrace_typelib =
313 {
314 	array_elements(xtrace_names) - 1, "xtrace_typelib",
315 	xtrace_names, NULL
316 };
317 
318 static MYSQL_THDVAR_SET(
319 	xtrace,                    // name
320 	PLUGIN_VAR_RQCMDARG,       // opt
321 	"Trace values.",           // comment
322 	NULL,                      // check
323 	NULL,                      // update function
324 	0,                         // def (NO)
325 	&xtrace_typelib);          // typelib
326 
327 // Getting exact info values
328 static MYSQL_THDVAR_BOOL(exact_info, PLUGIN_VAR_RQCMDARG,
329        "Getting exact info values",
330        NULL, NULL, 0);
331 
332 // Enabling cond_push
333 static MYSQL_THDVAR_BOOL(cond_push, PLUGIN_VAR_RQCMDARG,
334 	"Enabling cond_push",
335 	NULL, NULL, 1);							// YES by default
336 
337 /**
338   Temporary file usage:
339     no:    Not using temporary file
340     auto:  Using temporary file when needed
341     yes:   Allways using temporary file
342     force: Force using temporary file (no MAP)
343     test:  Reserved
344 */
345 const char *usetemp_names[]=
346 {
347   "NO", "AUTO", "YES", "FORCE", "TEST", NullS
348 };
349 
350 TYPELIB usetemp_typelib=
351 {
352   array_elements(usetemp_names) - 1, "usetemp_typelib",
353   usetemp_names, NULL
354 };
355 
356 static MYSQL_THDVAR_ENUM(
357   use_tempfile,                    // name
358   PLUGIN_VAR_RQCMDARG,             // opt
359   "Temporary file use.",           // comment
360   NULL,                            // check
361   NULL,                            // update function
362   1,                               // def (AUTO)
363   &usetemp_typelib);               // typelib
364 
365 #ifdef _WIN64
366 // Size used for g->Sarea_Size
367 static MYSQL_THDVAR_ULONGLONG(work_size,
368 	PLUGIN_VAR_RQCMDARG,
369 	"Size of the CONNECT work area.",
370 	NULL, NULL, SZWORK, SZWMIN, ULONGLONG_MAX, 1);
371 #else
372 // Size used for g->Sarea_Size
373 static MYSQL_THDVAR_ULONG(work_size,
374   PLUGIN_VAR_RQCMDARG,
375   "Size of the CONNECT work area.",
376   NULL, NULL, SZWORK, SZWMIN, ULONG_MAX, 1);
377 #endif
378 
379 // Size used when converting TEXT columns to VARCHAR
380 static MYSQL_THDVAR_INT(conv_size,
381        PLUGIN_VAR_RQCMDARG,             // opt
382        "Size used when converting TEXT columns.",
383        NULL, NULL, SZCONV, 0, 65500, 1);
384 
385 /**
386   Type conversion:
387     no:   Unsupported types -> TYPE_ERROR
388     yes:  TEXT -> VARCHAR
389 		force: Do it also for ODBC BINARY and BLOBs
390     skip: skip unsupported type columns in Discovery
391 */
392 const char *xconv_names[]=
393 {
394   "NO", "YES", "FORCE", "SKIP", NullS
395 };
396 
397 TYPELIB xconv_typelib=
398 {
399   array_elements(xconv_names) - 1, "xconv_typelib",
400   xconv_names, NULL
401 };
402 
403 static MYSQL_THDVAR_ENUM(
404   type_conv,                       // name
405   PLUGIN_VAR_RQCMDARG,             // opt
406   "Unsupported types conversion.", // comment
407   NULL,                            // check
408   NULL,                            // update function
409   1,                               // def (yes)
410   &xconv_typelib);                 // typelib
411 
412 // Adding JPATH to all Json table columns
413 static MYSQL_THDVAR_BOOL(json_all_path, PLUGIN_VAR_RQCMDARG,
414 	"Adding JPATH to all Json table columns",
415 	NULL, NULL, 1);							     // YES by default
416 
417 // Null representation for JSON values
418 static MYSQL_THDVAR_STR(json_null,
419 	PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
420 	"Representation of Json null values",
421 	//     check_json_null, update_json_null,
422 	NULL, NULL, "<null>");
423 
424 // Default Json, XML or Mongo depth
425 static MYSQL_THDVAR_INT(default_depth,
426 	PLUGIN_VAR_RQCMDARG,
427 	"Default depth used by Json, XML and Mongo discovery",
428 	NULL, NULL, 5, -1, 16, 1);			 // Defaults to 5
429 
430 // Default precision for doubles
431 static MYSQL_THDVAR_INT(default_prec,
432   PLUGIN_VAR_RQCMDARG,
433   "Default precision used for doubles",
434   NULL, NULL, 6, 0, 16, 1);			 // Defaults to 6
435 
436 // Estimate max number of rows for JSON aggregate functions
437 static MYSQL_THDVAR_UINT(json_grp_size,
438        PLUGIN_VAR_RQCMDARG,      // opt
439        "max number of rows for JSON aggregate functions.",
440        NULL, NULL, JSONMAX, 1, INT_MAX, 1);
441 
442 #if defined(JAVA_SUPPORT)
443 // Default java wrapper to use with JDBC tables
444 static MYSQL_THDVAR_STR(java_wrapper,
445 	PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
446 	"Java wrapper class name",
447 	//     check_java_wrapper, update_java_wrapper,
448 	NULL, NULL, "wrappers/JdbcInterface");
449 #endif   // JAVA_SUPPORT
450 
451 // This is apparently not acceptable for a plugin so it is undocumented
452 #if defined(JAVA_SUPPORT) || defined(CMGO_SUPPORT)
453 // Enabling MONGO table type
454 #if defined(MONGO_SUPPORT) || (MYSQL_VERSION_ID > 100200)
455 static MYSQL_THDVAR_BOOL(enable_mongo, PLUGIN_VAR_RQCMDARG,
456 	"Enabling the MongoDB access", NULL, NULL, 1);
457 #else   // !version 2,3
458 static MYSQL_THDVAR_BOOL(enable_mongo, PLUGIN_VAR_RQCMDARG,
459 	"Enabling the MongoDB access", NULL, NULL, 0);
460 #endif  // !version 2,3
461 #endif   // JAVA_SUPPORT || CMGO_SUPPORT
462 
463 #if defined(BSON_SUPPORT)
464 // Force using BSON for JSON tables
465 static MYSQL_THDVAR_BOOL(force_bson, PLUGIN_VAR_RQCMDARG,
466   "Force using BSON for JSON tables",
467   NULL, NULL, 0);							// NO by default
468 #endif   // BSON_SUPPORT
469 
470 #if defined(XMSG) || defined(NEWMSG)
471 const char *language_names[]=
472 {
473   "default", "english", "french", NullS
474 };
475 
476 TYPELIB language_typelib=
477 {
478   array_elements(language_names) - 1, "language_typelib",
479   language_names, NULL
480 };
481 
482 static MYSQL_THDVAR_ENUM(
483   msg_lang,                        // name
484   PLUGIN_VAR_RQCMDARG,             // opt
485   "Message language",              // comment
486   NULL,                            // check
487   NULL,                            // update
488   1,                               // def (ENGLISH)
489   &language_typelib);              // typelib
490 #endif   // XMSG || NEWMSG
491 
492 /***********************************************************************/
493 /*  The CONNECT handlerton object.                                     */
494 /***********************************************************************/
495 handlerton *connect_hton= NULL;
496 
497 /***********************************************************************/
498 /*  Function to export session variable values to other source files.  */
499 /***********************************************************************/
GetTraceValue(void)500 uint GetTraceValue(void)
501 	{return (uint)(connect_hton ? THDVAR(current_thd, xtrace) : 0);}
ExactInfo(void)502 bool ExactInfo(void) {return THDVAR(current_thd, exact_info);}
CondPushEnabled(void)503 static bool CondPushEnabled(void) {return THDVAR(current_thd, cond_push);}
JsonAllPath(void)504 bool JsonAllPath(void) {return THDVAR(current_thd, json_all_path);}
UseTemp(void)505 USETEMP UseTemp(void) {return (USETEMP)THDVAR(current_thd, use_tempfile);}
GetConvSize(void)506 int GetConvSize(void) {return THDVAR(current_thd, conv_size);}
GetTypeConv(void)507 TYPCONV GetTypeConv(void) {return (TYPCONV)THDVAR(current_thd, type_conv);}
GetJsonNull(void)508 char *GetJsonNull(void)
509 	{return connect_hton ? THDVAR(current_thd, json_null) : NULL;}
GetDefaultDepth(void)510 int GetDefaultDepth(void) {return THDVAR(current_thd, default_depth);}
GetDefaultPrec(void)511 int GetDefaultPrec(void) {return THDVAR(current_thd, default_prec);}
GetJsonGrpSize(void)512 uint GetJsonGrpSize(void)
513   {return connect_hton ? THDVAR(current_thd, json_grp_size) : 50;}
GetWorkSize(void)514 size_t GetWorkSize(void) {return (size_t)THDVAR(current_thd, work_size);}
SetWorkSize(size_t)515 void SetWorkSize(size_t)
516 {
517   // Changing the session variable value seems to be impossible here
518   // and should be done in a check function
519   push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, 0,
520     "Work size too big, try setting a smaller value");
521 } // end of SetWorkSize
522 
523 #if defined(JAVA_SUPPORT)
GetJavaWrapper(void)524 char *GetJavaWrapper(void)
525 {return connect_hton ? THDVAR(current_thd, java_wrapper)
526 	                   : (char*)"wrappers/JdbcInterface";}
527 #endif   // JAVA_SUPPORT
528 
529 #if defined(JAVA_SUPPORT) || defined(CMGO_SUPPORT)
MongoEnabled(void)530 bool MongoEnabled(void) {return THDVAR(current_thd, enable_mongo);}
531 #endif   // JAVA_SUPPORT || CMGO_SUPPORT
532 
533 #if defined(BSON_SUPPORT)
Force_Bson(void)534 bool Force_Bson(void) {return THDVAR(current_thd, force_bson);}
535 #endif   // BSON_SUPPORT)
536 
537 #if defined(XMSG) || defined(NEWMSG)
msglang(void)538 extern "C" const char *msglang(void)
539 	{return language_names[THDVAR(current_thd, msg_lang)];}
540 #else   // !XMSG && !NEWMSG
msglang(void)541 extern "C" const char *msglang(void)
542 {
543 #if defined(FRENCH)
544   return "french";
545 #else  // DEFAULT
546   return "english";
547 #endif // DEFAULT
548 } // end of msglang
549 #endif  // !XMSG && !NEWMSG
550 
551 #if 0
552 /***********************************************************************/
553 /*  Global variables update functions.                                 */
554 /***********************************************************************/
555 static void update_connect_zconv(MYSQL_THD thd,
556                                   struct st_mysql_sys_var *var,
557                                   void *var_ptr, const void *save)
558 {
559   zconv= *(int *)var_ptr= *(int *)save;
560 } // end of update_connect_zconv
561 
562 static void update_connect_xconv(MYSQL_THD thd,
563                                  struct st_mysql_sys_var *var,
564                                  void *var_ptr, const void *save)
565 {
566   xconv= (int)(*(ulong *)var_ptr= *(ulong *)save);
567 } // end of update_connect_xconv
568 
569 #if defined(XMAP)
570 static void update_connect_xmap(MYSQL_THD thd,
571                                 struct st_mysql_sys_var *var,
572                                 void *var_ptr, const void *save)
573 {
574   xmap= (my_bool)(*(my_bool *)var_ptr= *(my_bool *)save);
575 } // end of update_connect_xmap
576 #endif   // XMAP
577 #endif // 0
578 
579 #if 0 // (was XMSG) Unuseful because not called for default value
580 static void update_msg_path(MYSQL_THD thd,
581                             struct st_mysql_sys_var *var,
582                             void *var_ptr, const void *save)
583 {
584   char *value= *(char**)save;
585   char *old= *(char**)var_ptr;
586 
587   if (value)
588     *(char**)var_ptr= my_strdup(value, MYF(0));
589   else
590     *(char**)var_ptr= 0;
591 
592   my_free(old);
593 } // end of update_msg_path
594 
595 static int check_msg_path (MYSQL_THD thd, struct st_mysql_sys_var *var,
596 	                         void *save, struct st_mysql_value *value)
597 {
598 	const char *path;
599 	char	buff[512];
600 	int		len= sizeof(buff);
601 
602 	path= value->val_str(value, buff, &len);
603 
604 	if (path && *path != '*') {
605 		/* Save a pointer to the name in the
606 		'file_format_name_map' constant array. */
607 		*(char**)save= my_strdup(path, MYF(0));
608 		return(0);
609 	} else {
610 		push_warning_printf(thd,
611 		  Sql_condition::WARN_LEVEL_WARN,
612 		  ER_WRONG_ARGUMENTS,
613 		  "CONNECT: invalid message path");
614 	} // endif path
615 
616 	*(char**)save= NULL;
617 	return(1);
618 } // end of check_msg_path
619 #endif   // 0
620 
621 /**
622   CREATE TABLE option list (table options)
623 
624   These can be specified in the CREATE TABLE:
625   CREATE TABLE ( ... ) {...here...}
626 */
627 ha_create_table_option connect_table_option_list[]=
628 {
629   HA_TOPTION_STRING("TABLE_TYPE", type),
630   HA_TOPTION_STRING("FILE_NAME", filename),
631   HA_TOPTION_STRING("XFILE_NAME", optname),
632 //HA_TOPTION_STRING("CONNECT_STRING", connect),
633   HA_TOPTION_STRING("TABNAME", tabname),
634   HA_TOPTION_STRING("TABLE_LIST", tablist),
635   HA_TOPTION_STRING("DBNAME", dbname),
636   HA_TOPTION_STRING("SEP_CHAR", separator),
637   HA_TOPTION_STRING("QCHAR", qchar),
638   HA_TOPTION_STRING("MODULE", module),
639   HA_TOPTION_STRING("SUBTYPE", subtype),
640   HA_TOPTION_STRING("CATFUNC", catfunc),
641   HA_TOPTION_STRING("SRCDEF", srcdef),
642   HA_TOPTION_STRING("COLIST", colist),
643 	HA_TOPTION_STRING("FILTER", filter),
644 	HA_TOPTION_STRING("OPTION_LIST", oplist),
645   HA_TOPTION_STRING("DATA_CHARSET", data_charset),
646 	HA_TOPTION_STRING("HTTP", http),
647 	HA_TOPTION_STRING("URI", uri),
648 	HA_TOPTION_NUMBER("LRECL", lrecl, 0, 0, INT_MAX32, 1),
649   HA_TOPTION_NUMBER("BLOCK_SIZE", elements, 0, 0, INT_MAX32, 1),
650 //HA_TOPTION_NUMBER("ESTIMATE", estimate, 0, 0, INT_MAX32, 1),
651   HA_TOPTION_NUMBER("MULTIPLE", multiple, 0, 0, 3, 1),
652   HA_TOPTION_NUMBER("HEADER", header, 0, 0, 3, 1),
653   HA_TOPTION_NUMBER("QUOTED", quoted, (ulonglong) -1, 0, 3, 1),
654   HA_TOPTION_NUMBER("ENDING", ending, (ulonglong) -1, 0, INT_MAX32, 1),
655   HA_TOPTION_NUMBER("COMPRESS", compressed, 0, 0, 2, 1),
656   HA_TOPTION_BOOL("MAPPED", mapped, 0),
657   HA_TOPTION_BOOL("HUGE", huge, 0),
658   HA_TOPTION_BOOL("SPLIT", split, 0),
659   HA_TOPTION_BOOL("READONLY", readonly, 0),
660   HA_TOPTION_BOOL("SEPINDEX", sepindex, 0),
661 	HA_TOPTION_BOOL("ZIPPED", zipped, 0),
662 	HA_TOPTION_END
663 };
664 
665 
666 /**
667   CREATE TABLE option list (field options)
668 
669   These can be specified in the CREATE TABLE per field:
670   CREATE TABLE ( field ... {...here...}, ... )
671 */
672 ha_create_table_option connect_field_option_list[]=
673 {
674   HA_FOPTION_NUMBER("FLAG", offset, (ulonglong) -1, 0, INT_MAX32, 1),
675   HA_FOPTION_NUMBER("MAX_DIST", freq, 0, 0, INT_MAX32, 1), // BLK_INDX
676   HA_FOPTION_NUMBER("FIELD_LENGTH", fldlen, 0, 0, INT_MAX32, 1),
677   HA_FOPTION_STRING("DATE_FORMAT", dateformat),
678   HA_FOPTION_STRING("FIELD_FORMAT", fieldformat),
679   HA_FOPTION_STRING("JPATH", jsonpath),
680 	HA_FOPTION_STRING("XPATH", xmlpath),
681 	HA_FOPTION_STRING("SPECIAL", special),
682 	HA_FOPTION_ENUM("DISTRIB", opt, "scattered,clustered,sorted", 0),
683   HA_FOPTION_END
684 };
685 
686 /*
687   CREATE TABLE option list (index options)
688 
689   These can be specified in the CREATE TABLE per index:
690   CREATE TABLE ( field ..., .., INDEX .... *here*, ... )
691 */
692 ha_create_table_option connect_index_option_list[]=
693 {
694   HA_IOPTION_BOOL("DYNAM", dynamic, 0),
695   HA_IOPTION_BOOL("MAPPED", mapped, 0),
696   HA_IOPTION_END
697 };
698 
699 /***********************************************************************/
700 /*  Push G->Message as a MySQL warning.                                */
701 /***********************************************************************/
PushWarning(PGLOBAL g,PTDB tdbp,int level)702 bool PushWarning(PGLOBAL g, PTDB tdbp, int level)
703 {
704   PHC    phc;
705   THD   *thd;
706   MYCAT *cat= (MYCAT*)tdbp->GetDef()->GetCat();
707 
708   if (!cat || !(phc= cat->GetHandler()) || !phc->GetTable() ||
709       !(thd= (phc->GetTable())->in_use))
710     return true;
711 
712   PushWarning(g, thd, level);
713   return false;
714 } // end of PushWarning
715 
PushWarning(PGLOBAL g,THD * thd,int level)716 void PushWarning(PGLOBAL g, THD *thd, int level)
717   {
718   if (thd) {
719     Sql_condition::enum_warning_level wlvl;
720 
721     wlvl= (Sql_condition::enum_warning_level)level;
722     push_warning(thd, wlvl, 0, g->Message);
723   } else
724     htrc("%s\n", g->Message);
725 
726   } // end of PushWarning
727 
728 #ifdef HAVE_PSI_INTERFACE
729 static PSI_mutex_key con_key_mutex_CONNECT_SHARE_mutex;
730 
731 static PSI_mutex_info all_connect_mutexes[]=
732 {
733   { &con_key_mutex_CONNECT_SHARE_mutex, "CONNECT_SHARE::mutex", 0}
734 };
735 
init_connect_psi_keys()736 static void init_connect_psi_keys()
737 {
738   const char* category= "connect";
739   int count;
740 
741   if (PSI_server == NULL)
742     return;
743 
744   count= array_elements(all_connect_mutexes);
745   PSI_server->register_mutex(category, all_connect_mutexes, count);
746 }
747 #else
init_connect_psi_keys()748 static void init_connect_psi_keys() {}
749 #endif
750 
751 
PlugSetPath(LPSTR to,LPCSTR name,LPCSTR dir)752 DllExport LPCSTR PlugSetPath(LPSTR to, LPCSTR name, LPCSTR dir)
753 {
754   const char *res= PlugSetPath(to, mysql_data_home, name, dir);
755   return res;
756 }
757 
758 
759 /**
760   @brief
761   If frm_error() is called then we will use this to determine
762   the file extensions that exist for the storage engine. This is also
763   used by the default rename_table and delete_table method in
764   handler.cc.
765 
766   For engines that have two file name extensions (separate meta/index file
767   and data file), the order of elements is relevant. First element of engine
768   file name extensions array should be meta/index file extension. Second
769   element - data file extension. This order is assumed by
770   prepare_for_repair() when REPAIR TABLE ... USE_FRM is issued.
771 
772   @see
773   rename_table method in handler.cc and
774   delete_table method in handler.cc
775 */
776 static const char *ha_connect_exts[]= {
777   ".dos", ".fix", ".csv", ".bin", ".fmt", ".dbf", ".xml", ".json", ".ini",
778   ".vec", ".dnx", ".fnx", ".bnx", ".vnx", ".dbx", ".dop", ".fop", ".bop",
779   ".vop", NULL};
780 
781 /**
782   @brief
783   Plugin initialization
784 */
connect_init_func(void * p)785 static int connect_init_func(void *p)
786 {
787   DBUG_ENTER("connect_init_func");
788 
789 // added from Sergei mail
790 #if 0 // (defined(LINUX))
791   Dl_info dl_info;
792   if (dladdr(&connect_hton, &dl_info))
793   {
794     if (dlopen(dl_info.dli_fname, RTLD_NOLOAD | RTLD_NOW | RTLD_GLOBAL) == 0)
795     {
796       sql_print_information("CONNECT: dlopen() failed, OEM table type is not supported");
797       sql_print_information("CONNECT: %s", dlerror());
798     }
799   }
800   else
801   {
802     sql_print_information("CONNECT: dladdr() failed, OEM table type is not supported");
803     sql_print_information("CONNECT: %s", dlerror());
804   }
805 #endif   // 0 (LINUX)
806 
807 #if defined(_WIN32)
808   sql_print_information("CONNECT: %s", compver);
809 #else   // !_WIN32
810   sql_print_information("CONNECT: %s", version);
811 #endif  // !_WIN32
812 	pthread_mutex_init(&parmut, NULL);
813 	pthread_mutex_init(&usrmut, NULL);
814 	pthread_mutex_init(&tblmut, NULL);
815 
816 #if defined(LIBXML2_SUPPORT)
817   XmlInitParserLib();
818 #endif   // LIBXML2_SUPPORT
819 
820 #if 0  //defined(CMGO_SUPPORT)
821 	mongo_init(true);
822 #endif   // CMGO_SUPPORT
823 
824   init_connect_psi_keys();
825 
826   connect_hton= (handlerton *)p;
827   connect_hton->state= SHOW_OPTION_YES;
828   connect_hton->create= connect_create_handler;
829   connect_hton->flags= HTON_TEMPORARY_NOT_SUPPORTED;
830   connect_hton->table_options= connect_table_option_list;
831   connect_hton->field_options= connect_field_option_list;
832   connect_hton->index_options= connect_index_option_list;
833   connect_hton->tablefile_extensions= ha_connect_exts;
834   connect_hton->discover_table_structure= connect_assisted_discovery;
835 
836   if (trace(128))
837     sql_print_information("connect_init: hton=%p", p);
838 
839   DTVAL::SetTimeShift();      // Initialize time zone shift once for all
840   BINCOL::SetEndian();        // Initialize host endian setting
841 #if defined(JAVA_SUPPORT)
842 	JAVAConn::SetJVM();
843 #endif   // JAVA_SUPPORT
844   DBUG_RETURN(0);
845 } // end of connect_init_func
846 
847 
848 /**
849   @brief
850   Plugin clean up
851 */
connect_done_func(void *)852 int connect_done_func(void *)
853 {
854   int error= 0;
855   PCONNECT pc, pn;
856   DBUG_ENTER("connect_done_func");
857 
858 #ifdef LIBXML2_SUPPORT
859   XmlCleanupParserLib();
860 #endif // LIBXML2_SUPPORT
861 
862 #if defined(CMGO_SUPPORT)
863 	CMgoConn::mongo_init(false);
864 #endif   // CMGO_SUPPORT
865 
866 #ifdef JAVA_SUPPORT
867 	JAVAConn::ResetJVM();
868 #endif // JAVA_SUPPORT
869 
870 #if	!defined(_WIN32)
871 	PROFILE_End();
872 #endif  // !_WIN32
873 
874 	pthread_mutex_lock(&usrmut);
875 	for (pc= user_connect::to_users; pc; pc= pn) {
876     if (pc->g)
877       PlugCleanup(pc->g, true);
878 
879     pn= pc->next;
880     delete pc;
881     } // endfor pc
882 
883 	pthread_mutex_unlock(&usrmut);
884 
885 	pthread_mutex_destroy(&usrmut);
886 	pthread_mutex_destroy(&parmut);
887 	pthread_mutex_destroy(&tblmut);
888 	connect_hton= NULL;
889   DBUG_RETURN(error);
890 } // end of connect_done_func
891 
892 
893 /**
894   @brief
895   Example of simple lock controls. The "share" it creates is a
896   structure we will pass to each CONNECT handler. Do you have to have
897   one of these? Well, you have pieces that are used for locking, and
898   they are needed to function.
899 */
900 
get_share()901 CONNECT_SHARE *ha_connect::get_share()
902 {
903   CONNECT_SHARE *tmp_share;
904 
905   lock_shared_ha_data();
906 
907   if (!(tmp_share= static_cast<CONNECT_SHARE*>(get_ha_share_ptr()))) {
908     tmp_share= new CONNECT_SHARE;
909     if (!tmp_share)
910       goto err;
911     mysql_mutex_init(con_key_mutex_CONNECT_SHARE_mutex,
912                      &tmp_share->mutex, MY_MUTEX_INIT_FAST);
913     set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
914     } // endif tmp_share
915 
916  err:
917   unlock_shared_ha_data();
918   return tmp_share;
919 } // end of get_share
920 
921 
connect_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)922 static handler* connect_create_handler(handlerton *hton,
923                                    TABLE_SHARE *table,
924                                    MEM_ROOT *mem_root)
925 {
926   handler *h= new (mem_root) ha_connect(hton, table);
927 
928   if (trace(128))
929     htrc("New CONNECT %p, table: %.*s\n", h,
930           table ? table->table_name.length : 6,
931           table ? table->table_name.str : "<null>");
932 
933   return h;
934 } // end of connect_create_handler
935 
936 /****************************************************************************/
937 /*  ha_connect constructor.                                                 */
938 /****************************************************************************/
ha_connect(handlerton * hton,TABLE_SHARE * table_arg)939 ha_connect::ha_connect(handlerton *hton, TABLE_SHARE *table_arg)
940        :handler(hton, table_arg)
941 {
942   hnum= ++num;
943   xp= (table) ? GetUser(ha_thd(), NULL) : NULL;
944   if (xp)
945     xp->SetHandler(this);
946 #if defined(_WIN32)
947   datapath= ".\\";
948 #else   // !_WIN32
949   datapath= "./";
950 #endif  // !_WIN32
951   tdbp= NULL;
952   sdvalin1= sdvalin2= sdvalin3= sdvalin4= NULL;
953   sdvalout= NULL;
954   xmod= MODE_ANY;
955   istable= false;
956   memset(partname, 0, sizeof(partname));
957   bzero((char*) &xinfo, sizeof(XINFO));
958   valid_info= false;
959   valid_query_id= 0;
960   creat_query_id= (table && table->in_use) ? table->in_use->query_id : 0;
961   stop= false;
962   alter= false;
963   mrr= false;
964   nox= true;
965   abort= false;
966   indexing= -1;
967   locked= 0;
968   part_id= NULL;
969   data_file_name= NULL;
970   index_file_name= NULL;
971   enable_activate_all_index= 0;
972   int_table_flags= (HA_NO_TRANSACTIONS | HA_NO_PREFIX_CHAR_KEYS);
973   ref_length= sizeof(int);
974   share= NULL;
975   tshp= NULL;
976 } // end of ha_connect constructor
977 
978 
979 /****************************************************************************/
980 /*  ha_connect destructor.                                                  */
981 /****************************************************************************/
~ha_connect(void)982 ha_connect::~ha_connect(void)
983 {
984   if (trace(128))
985     htrc("Delete CONNECT %p, table: %.*s, xp=%p count=%d\n", this,
986                          table ? table->s->table_name.length : 6,
987                          table ? table->s->table_name.str : "<null>",
988                          xp, xp ? xp->count : 0);
989 
990 	PopUser(xp);
991 } // end of ha_connect destructor
992 
993 
994 /****************************************************************************/
995 /*  Check whether this user can be removed.                                 */
996 /****************************************************************************/
PopUser(PCONNECT xp)997 static void PopUser(PCONNECT xp)
998 {
999 	if (xp) {
1000 		pthread_mutex_lock(&usrmut);
1001 		xp->count--;
1002 
1003 		if (!xp->count) {
1004 			PCONNECT p;
1005 
1006 			for (p= user_connect::to_users; p; p= p->next)
1007 			  if (p == xp)
1008 				  break;
1009 
1010 		  if (p) {
1011 			  if (p->next)
1012 				  p->next->previous= p->previous;
1013 
1014 			  if (p->previous)
1015 				  p->previous->next= p->next;
1016 			  else
1017 				  user_connect::to_users= p->next;
1018 
1019 		  } // endif p
1020 
1021 			PlugCleanup(xp->g, true);
1022 			delete xp;
1023 		} // endif count
1024 
1025 		pthread_mutex_unlock(&usrmut);
1026 	} // endif xp
1027 
1028 } // end of PopUser
1029 
1030 
1031 /****************************************************************************/
1032 /*  Get a pointer to the user of this handler.                              */
1033 /****************************************************************************/
GetUser(THD * thd,PCONNECT xp)1034 static PCONNECT GetUser(THD *thd, PCONNECT xp)
1035 {
1036 	if (!thd)
1037     return NULL;
1038 
1039 	if (xp) {
1040 		if (thd == xp->thdp)
1041 			return xp;
1042 
1043 		PopUser(xp);		// Avoid memory leak
1044 	} // endif xp
1045 
1046 	pthread_mutex_lock(&usrmut);
1047 
1048 	for (xp= user_connect::to_users; xp; xp= xp->next)
1049     if (thd == xp->thdp)
1050       break;
1051 
1052 	if (xp)
1053 		xp->count++;
1054 
1055 	pthread_mutex_unlock(&usrmut);
1056 
1057 	if (!xp) {
1058 		xp= new user_connect(thd);
1059 
1060 		if (xp->user_init()) {
1061 			delete xp;
1062 			xp= NULL;
1063 		} // endif user_init
1064 
1065 	}	// endif xp
1066 
1067   //} else
1068   //  xp->count++;
1069 
1070   return xp;
1071 } // end of GetUser
1072 
1073 /****************************************************************************/
1074 /*  Get the global pointer of the user of this handler.                     */
1075 /****************************************************************************/
GetPlug(THD * thd,PCONNECT & lxp)1076 static PGLOBAL GetPlug(THD *thd, PCONNECT& lxp)
1077 {
1078   lxp= GetUser(thd, lxp);
1079   return (lxp) ? lxp->g : NULL;
1080 } // end of GetPlug
1081 
1082 /****************************************************************************/
1083 /*  Get the implied table type.                                             */
1084 /****************************************************************************/
GetRealType(PTOS pos)1085 TABTYPE ha_connect::GetRealType(PTOS pos)
1086 {
1087   TABTYPE type= TAB_UNDEF;
1088 
1089   if (pos || (pos= GetTableOptionStruct())) {
1090     type= GetTypeID(pos->type);
1091 
1092     if (type == TAB_UNDEF && !pos->http)
1093       type= pos->srcdef ? TAB_MYSQL : pos->tabname ? TAB_PRX : TAB_DOS;
1094 #if defined(REST_SUPPORT)
1095 		else if (pos->http)
1096 			switch (type) {
1097 				case TAB_JSON:
1098 				case TAB_XML:
1099 				case TAB_CSV:
1100         case TAB_UNDEF:
1101           type = TAB_REST;
1102 					break;
1103 				case TAB_REST:
1104 					type = TAB_NIY;
1105 					break;
1106 				default:
1107 					break;
1108 			}	// endswitch type
1109 #endif   // REST_SUPPORT
1110 
1111   } // endif pos
1112 
1113   return type;
1114 } // end of GetRealType
1115 
1116 /** @brief
1117   The name of the index type that will be used for display.
1118   Don't implement this method unless you really have indexes.
1119  */
index_type(uint inx)1120 const char *ha_connect::index_type(uint inx)
1121 {
1122   switch (GetIndexType(GetRealType())) {
1123     case 1:
1124       if (table_share)
1125         return (GetIndexOption(&table_share->key_info[inx], "Dynamic"))
1126              ? "KINDEX" : "XINDEX";
1127       else
1128         return "XINDEX";
1129 
1130     case 2: return "REMOTE";
1131     case 3: return "VIRTUAL";
1132     } // endswitch
1133 
1134   return "Unknown";
1135 } // end of index_type
1136 
1137 /** @brief
1138   This is a bitmap of flags that indicates how the storage engine
1139   implements indexes. The current index flags are documented in
1140   handler.h. If you do not implement indexes, just return zero here.
1141 
1142     @details
1143   part is the key part to check. First key part is 0.
1144   If all_parts is set, MySQL wants to know the flags for the combined
1145   index, up to and including 'part'.
1146 */
1147 //ong ha_connect::index_flags(uint inx, uint part, bool all_parts) const
index_flags(uint,uint,bool) const1148 ulong ha_connect::index_flags(uint, uint, bool) const
1149 {
1150   ulong       flags= HA_READ_NEXT | HA_READ_RANGE |
1151                      HA_KEYREAD_ONLY | HA_KEY_SCAN_NOT_ROR;
1152   ha_connect *hp= (ha_connect*)this;
1153   PTOS        pos= hp->GetTableOptionStruct();
1154 
1155   if (pos) {
1156     TABTYPE type= hp->GetRealType(pos);
1157 
1158     switch (GetIndexType(type)) {
1159       case 1: flags|= (HA_READ_ORDER | HA_READ_PREV); break;
1160       case 2: flags|= HA_READ_AFTER_KEY;              break;
1161       } // endswitch
1162 
1163     } // endif pos
1164 
1165   return flags;
1166 } // end of index_flags
1167 
1168 /** @brief
1169   This is a list of flags that indicate what functionality the storage
1170   engine implements. The current table flags are documented in handler.h
1171 */
table_flags() const1172 ulonglong ha_connect::table_flags() const
1173 {
1174   ulonglong   flags= HA_CAN_VIRTUAL_COLUMNS | HA_REC_NOT_IN_SEQ |
1175                      HA_NO_AUTO_INCREMENT | HA_NO_PREFIX_CHAR_KEYS |
1176                      HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
1177                      HA_PARTIAL_COLUMN_READ | HA_FILE_BASED |
1178 //                   HA_NULL_IN_KEY |    not implemented yet
1179 //                   HA_FAST_KEY_READ |  causes error when sorting (???)
1180                      HA_NO_TRANSACTIONS | HA_DUPLICATE_KEY_NOT_IN_ORDER |
1181                      HA_NO_BLOBS | HA_MUST_USE_TABLE_CONDITION_PUSHDOWN;
1182   ha_connect *hp= (ha_connect*)this;
1183   PTOS        pos= hp->GetTableOptionStruct();
1184 
1185   if (pos) {
1186     TABTYPE type= hp->GetRealType(pos);
1187 
1188     if (IsFileType(type))
1189       flags|= HA_FILE_BASED;
1190 
1191     if (IsExactType(type))
1192       flags|= (HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT);
1193 
1194     // No data change on ALTER for outward tables
1195     if (!IsFileType(type) || hp->FileExists(pos->filename, true))
1196       flags|= HA_NO_COPY_ON_ALTER;
1197 
1198     } // endif pos
1199 
1200   return flags;
1201 } // end of table_flags
1202 
1203 /****************************************************************************/
1204 /*  Return the value of an option specified in an option list.              */
1205 /****************************************************************************/
GetListOption(PGLOBAL g,PCSZ opname,PCSZ oplist,PCSZ def)1206 PCSZ GetListOption(PGLOBAL g, PCSZ opname, PCSZ oplist, PCSZ def)
1207 {
1208   if (!oplist)
1209     return (char*)def;
1210 
1211 	char  key[16], val[256];
1212 	char *pv, *pn, *pk= (char*)oplist;
1213 	PCSZ  opval= def;
1214 	int   n;
1215 
1216 	while (*pk == ' ')
1217 		pk++;
1218 
1219 	for (; pk; pk= pn) {
1220 		pn= strchr(pk, ',');
1221 		pv= strchr(pk, '=');
1222 
1223 		if (pv && (!pn || pv < pn)) {
1224 			n= MY_MIN(static_cast<size_t>(pv - pk), sizeof(key) - 1);
1225 			memcpy(key, pk, n);
1226 
1227 			while (n && key[n - 1] == ' ')
1228 				n--;
1229 
1230 			key[n]= 0;
1231 
1232 			while (*(++pv) == ' ');
1233 
1234 			n= MY_MIN((pn ? pn - pv : strlen(pv)), sizeof(val) - 1);
1235 			memcpy(val, pv, n);
1236 
1237 			while (n && val[n - 1] == ' ')
1238 				n--;
1239 
1240 			val[n]= 0;
1241 		} else {
1242 			n= MY_MIN((pn ? pn - pk : strlen(pk)), sizeof(key) - 1);
1243 			memcpy(key, pk, n);
1244 
1245 			while (n && key[n - 1] == ' ')
1246 				n--;
1247 
1248 			key[n]= 0;
1249 			val[0]= 0;
1250 		} // endif pv
1251 
1252 		if (!stricmp(opname, key)) {
1253 			opval= PlugDup(g, val);
1254 			break;
1255 		} else if (!pn)
1256 			break;
1257 
1258 		while (*(++pn) == ' ');
1259 	} // endfor pk
1260 
1261   return opval;
1262 } // end of GetListOption
1263 
1264 /****************************************************************************/
1265 /*  Return the value of a string option or NULL if not specified.           */
1266 /****************************************************************************/
GetStringTableOption(PGLOBAL g,PTOS options,PCSZ opname,PCSZ sdef)1267 PCSZ GetStringTableOption(PGLOBAL g, PTOS options, PCSZ opname, PCSZ sdef)
1268 {
1269 	PCSZ opval= NULL;
1270 
1271   if (!options)
1272     return sdef;
1273   else if (!stricmp(opname, "Type"))
1274     opval= options->type;
1275   else if (!stricmp(opname, "Filename"))
1276     opval= options->filename;
1277   else if (!stricmp(opname, "Optname"))
1278     opval= options->optname;
1279   else if (!stricmp(opname, "Tabname"))
1280     opval= options->tabname;
1281   else if (!stricmp(opname, "Tablist"))
1282     opval= options->tablist;
1283   else if (!stricmp(opname, "Database") ||
1284            !stricmp(opname, "DBname"))
1285     opval= options->dbname;
1286   else if (!stricmp(opname, "Separator"))
1287     opval= options->separator;
1288   else if (!stricmp(opname, "Qchar"))
1289     opval= options->qchar;
1290   else if (!stricmp(opname, "Module"))
1291     opval= options->module;
1292   else if (!stricmp(opname, "Subtype"))
1293     opval= options->subtype;
1294   else if (!stricmp(opname, "Catfunc"))
1295     opval= options->catfunc;
1296   else if (!stricmp(opname, "Srcdef"))
1297     opval= options->srcdef;
1298   else if (!stricmp(opname, "Colist"))
1299     opval= options->colist;
1300 	else if (!stricmp(opname, "Filter"))
1301 		opval= options->filter;
1302 	else if (!stricmp(opname, "Data_charset"))
1303     opval= options->data_charset;
1304 	else if (!stricmp(opname, "Http") || !stricmp(opname, "URL"))
1305 		opval= options->http;
1306 	else if (!stricmp(opname, "Uri"))
1307 		opval= options->uri;
1308 
1309   if (!opval && options->oplist)
1310     opval= GetListOption(g, opname, options->oplist);
1311 
1312   return opval ? (char*)opval : sdef;
1313 } // end of GetStringTableOption
1314 
1315 /****************************************************************************/
1316 /*  Return the value of a Boolean option or bdef if not specified.          */
1317 /****************************************************************************/
GetBooleanTableOption(PGLOBAL g,PTOS options,PCSZ opname,bool bdef)1318 bool GetBooleanTableOption(PGLOBAL g, PTOS options, PCSZ opname, bool bdef)
1319 {
1320   bool opval= bdef;
1321 	PCSZ pv;
1322 
1323   if (!options)
1324     return bdef;
1325   else if (!stricmp(opname, "Mapped"))
1326     opval= options->mapped;
1327   else if (!stricmp(opname, "Huge"))
1328     opval= options->huge;
1329   else if (!stricmp(opname, "Split"))
1330     opval= options->split;
1331   else if (!stricmp(opname, "Readonly"))
1332     opval= options->readonly;
1333   else if (!stricmp(opname, "SepIndex"))
1334     opval= options->sepindex;
1335   else if (!stricmp(opname, "Header"))
1336     opval= (options->header != 0);   // Is Boolean for some table types
1337 	else if (!stricmp(opname, "Zipped"))
1338 		opval= options->zipped;
1339 	else if (options->oplist)
1340     if ((pv= GetListOption(g, opname, options->oplist)))
1341       opval= (!*pv || *pv == 'y' || *pv == 'Y' || atoi(pv) != 0);
1342 
1343   return opval;
1344 } // end of GetBooleanTableOption
1345 
1346 /****************************************************************************/
1347 /*  Return the value of an integer option or NO_IVAL if not specified.      */
1348 /****************************************************************************/
GetIntegerTableOption(PGLOBAL g,PTOS options,PCSZ opname,int idef)1349 int GetIntegerTableOption(PGLOBAL g, PTOS options, PCSZ opname, int idef)
1350 {
1351   ulonglong opval= (ulonglong) NO_IVAL;
1352 
1353   if (!options)
1354     return idef;
1355   else if (!stricmp(opname, "Lrecl"))
1356     opval= options->lrecl;
1357   else if (!stricmp(opname, "Elements"))
1358     opval= options->elements;
1359   else if (!stricmp(opname, "Multiple"))
1360     opval= options->multiple;
1361   else if (!stricmp(opname, "Header"))
1362     opval= options->header;
1363   else if (!stricmp(opname, "Quoted"))
1364     opval= options->quoted;
1365   else if (!stricmp(opname, "Ending"))
1366     opval= options->ending;
1367   else if (!stricmp(opname, "Compressed"))
1368     opval= (options->compressed);
1369 
1370   if ((ulonglong) opval == (ulonglong)NO_IVAL) {
1371 		PCSZ pv;
1372 
1373 		if ((pv = GetListOption(g, opname, options->oplist))) {
1374 			// opval = CharToNumber((char*)pv, strlen(pv), ULONGLONG_MAX, false);
1375 			return atoi(pv);
1376 		} else
1377       return idef;
1378 
1379     } // endif opval
1380 
1381   return (int)opval;
1382 } // end of GetIntegerTableOption
1383 
1384 /****************************************************************************/
1385 /*  Return the table option structure.                                      */
1386 /****************************************************************************/
GetTableOptionStruct(TABLE_SHARE * s)1387 PTOS ha_connect::GetTableOptionStruct(TABLE_SHARE *s)
1388 {
1389   TABLE_SHARE *tsp= (tshp) ? tshp : (s) ? s : table_share;
1390 
1391 	return (tsp && (!tsp->db_plugin ||
1392 		              !stricmp(plugin_name(tsp->db_plugin)->str, "connect") ||
1393 									!stricmp(plugin_name(tsp->db_plugin)->str, "partition")))
1394 									? tsp->option_struct : NULL;
1395 } // end of GetTableOptionStruct
1396 
1397 /****************************************************************************/
1398 /*  Return the string eventually formatted with partition name.             */
1399 /****************************************************************************/
GetRealString(PCSZ s)1400 char *ha_connect::GetRealString(PCSZ s)
1401 {
1402   char *sv;
1403 
1404   if (IsPartitioned() && s && *partname) {
1405     sv= (char*)PlugSubAlloc(xp->g, NULL, 0);
1406     sprintf(sv, s, partname);
1407     PlugSubAlloc(xp->g, NULL, strlen(sv) + 1);
1408   } else
1409     sv= (char*)s;
1410 
1411   return sv;
1412 } // end of GetRealString
1413 
1414 /****************************************************************************/
1415 /*  Return the value of a string option or sdef if not specified.           */
1416 /****************************************************************************/
GetStringOption(PCSZ opname,PCSZ sdef)1417 PCSZ ha_connect::GetStringOption(PCSZ opname, PCSZ sdef)
1418 {
1419 	PCSZ opval= NULL;
1420   PTOS options= GetTableOptionStruct();
1421 
1422   if (!stricmp(opname, "Connect")) {
1423     LEX_CSTRING cnc= (tshp) ? tshp->connect_string
1424                            : table->s->connect_string;
1425 
1426     if (cnc.length)
1427       opval= strz(xp->g, cnc);
1428 		else
1429 			opval= GetListOption(xp->g, opname, options->oplist);
1430 
1431 	} else if (!stricmp(opname, "Query_String")) {
1432 //  This escapes everything and returns a wrong query
1433 //	opval= thd_query_string(table->in_use)->str;
1434 		opval= (PCSZ)PlugSubAlloc(xp->g, NULL,
1435 			thd_query_string(table->in_use)->length + 1);
1436 		strcpy((char*)opval, thd_query_string(table->in_use)->str);
1437 //	sprintf((char*)opval, "%s", thd_query_string(table->in_use)->str);
1438 	} else if (!stricmp(opname, "Partname"))
1439     opval= partname;
1440   else if (!stricmp(opname, "Table_charset")) {
1441     const CHARSET_INFO *chif= (tshp) ? tshp->table_charset
1442                                      : table->s->table_charset;
1443 
1444     if (chif)
1445       opval= (char*)chif->csname;
1446 
1447   } else
1448     opval= GetStringTableOption(xp->g, options, opname, NULL);
1449 
1450   if (opval && (!stricmp(opname, "connect")
1451              || !stricmp(opname, "tabname")
1452              || !stricmp(opname, "filename")
1453 						 || !stricmp(opname, "optname")
1454 						 || !stricmp(opname, "entry")))
1455 						 opval= GetRealString(opval);
1456 
1457   if (!opval) {
1458     if (sdef && !strcmp(sdef, "*")) {
1459       // Return the handler default value
1460       if (!stricmp(opname, "Dbname") || !stricmp(opname, "Database"))
1461         opval= (char*)GetDBName(NULL);    // Current database
1462       else if (!stricmp(opname, "Type"))  // Default type
1463         opval= (!options) ? NULL :
1464                (options->srcdef)  ? (char*)"MYSQL" :
1465                (options->tabname) ? (char*)"PROXY" : (char*)"DOS";
1466       else if (!stricmp(opname, "User"))  // Connected user
1467         opval= (char *) "root";
1468       else if (!stricmp(opname, "Host"))  // Connected user host
1469         opval= (char *) "localhost";
1470       else
1471         opval= sdef;                      // Caller default
1472 
1473     } else
1474       opval= sdef;                        // Caller default
1475 
1476     } // endif !opval
1477 
1478   return opval;
1479 } // end of GetStringOption
1480 
1481 /****************************************************************************/
1482 /*  Return the value of a Boolean option or bdef if not specified.          */
1483 /****************************************************************************/
GetBooleanOption(PCSZ opname,bool bdef)1484 bool ha_connect::GetBooleanOption(PCSZ opname, bool bdef)
1485 {
1486   bool  opval;
1487   PTOS  options= GetTableOptionStruct();
1488 
1489   if (!stricmp(opname, "View"))
1490     opval= (tshp) ? tshp->is_view : table_share->is_view;
1491   else
1492     opval= GetBooleanTableOption(xp->g, options, opname, bdef);
1493 
1494   return opval;
1495 } // end of GetBooleanOption
1496 
1497 /****************************************************************************/
1498 /*  Set the value of the opname option (does not work for oplist options)   */
1499 /*  Currently used only to set the Sepindex value.                          */
1500 /****************************************************************************/
SetBooleanOption(PCSZ opname,bool b)1501 bool ha_connect::SetBooleanOption(PCSZ opname, bool b)
1502 {
1503   PTOS options= GetTableOptionStruct();
1504 
1505   if (!options)
1506     return true;
1507 
1508   if (!stricmp(opname, "SepIndex"))
1509     options->sepindex= b;
1510   else
1511     return true;
1512 
1513   return false;
1514 } // end of SetBooleanOption
1515 
1516 /****************************************************************************/
1517 /*  Return the value of an integer option or NO_IVAL if not specified.      */
1518 /****************************************************************************/
GetIntegerOption(PCSZ opname)1519 int ha_connect::GetIntegerOption(PCSZ opname)
1520 {
1521   int          opval;
1522   PTOS         options= GetTableOptionStruct();
1523   TABLE_SHARE *tsp= (tshp) ? tshp : table_share;
1524 
1525   if (!stricmp(opname, "Avglen"))
1526     opval= (int)tsp->avg_row_length;
1527   else if (!stricmp(opname, "Estimate"))
1528     opval= (int)tsp->max_rows;
1529   else
1530     opval= GetIntegerTableOption(xp->g, options, opname, NO_IVAL);
1531 
1532   return opval;
1533 } // end of GetIntegerOption
1534 
1535 /****************************************************************************/
1536 /*  Set the value of the opname option (does not work for oplist options)   */
1537 /*  Currently used only to set the Lrecl value.                             */
1538 /****************************************************************************/
SetIntegerOption(PCSZ opname,int n)1539 bool ha_connect::SetIntegerOption(PCSZ opname, int n)
1540 {
1541   PTOS options= GetTableOptionStruct();
1542 
1543   if (!options)
1544     return true;
1545 
1546   if (!stricmp(opname, "Lrecl"))
1547     options->lrecl= n;
1548   else if (!stricmp(opname, "Elements"))
1549     options->elements= n;
1550 //else if (!stricmp(opname, "Estimate"))
1551 //  options->estimate= n;
1552   else if (!stricmp(opname, "Multiple"))
1553     options->multiple= n;
1554   else if (!stricmp(opname, "Header"))
1555     options->header= n;
1556   else if (!stricmp(opname, "Quoted"))
1557     options->quoted= n;
1558   else if (!stricmp(opname, "Ending"))
1559     options->ending= n;
1560   else if (!stricmp(opname, "Compressed"))
1561     options->compressed= n;
1562   else
1563     return true;
1564 //else if (options->oplist)
1565 //  SetListOption(opname, options->oplist, n);
1566 
1567   return false;
1568 } // end of SetIntegerOption
1569 
1570 /****************************************************************************/
1571 /*  Return a field option structure.                                        */
1572 /****************************************************************************/
GetFieldOptionStruct(Field * fdp)1573 PFOS ha_connect::GetFieldOptionStruct(Field *fdp)
1574 {
1575   return fdp->option_struct;
1576 } // end of GetFildOptionStruct
1577 
1578 /****************************************************************************/
1579 /*  Returns the column description structure used to make the column.       */
1580 /****************************************************************************/
GetColumnOption(PGLOBAL g,void * field,PCOLINFO pcf)1581 void *ha_connect::GetColumnOption(PGLOBAL g, void *field, PCOLINFO pcf)
1582 {
1583   const char *cp;
1584   char   *chset, v= 0;
1585   ha_field_option_struct *fop;
1586   Field*  fp;
1587   Field* *fldp;
1588 
1589   // Double test to be on the safe side
1590   if (!table)
1591     return NULL;
1592 
1593   // Find the column to describe
1594   if (field) {
1595     fldp= (Field**)field;
1596     fldp++;
1597   } else
1598     fldp= (tshp) ? tshp->field : table->field;
1599 
1600   if (!fldp || !(fp= *fldp))
1601     return NULL;
1602 
1603   // Get the CONNECT field options structure
1604   fop= GetFieldOptionStruct(fp);
1605   pcf->Flags= 0;
1606 
1607   // Now get column information
1608   pcf->Name= (char*)fp->field_name.str;
1609 	chset = (char*)fp->charset()->name;
1610 
1611   if (fop && fop->special) {
1612     pcf->Fieldfmt= (char*)fop->special;
1613     pcf->Flags= U_SPECIAL;
1614     return fldp;
1615     } // endif special
1616 
1617   pcf->Scale= 0;
1618   pcf->Opt= (fop) ? (int)fop->opt : 0;
1619 
1620 	if (fp->field_length >= 0) {
1621 		pcf->Length= fp->field_length;
1622 
1623 		// length is bytes for Connect, not characters
1624 		if (!strnicmp(chset, "utf8", 4))
1625 			pcf->Length /= 3;
1626 
1627 	} else
1628 		pcf->Length= 256;            // BLOB?
1629 
1630   pcf->Precision= pcf->Length;
1631 
1632   if (fop) {
1633     pcf->Offset= (int)fop->offset;
1634     pcf->Freq= (int)fop->freq;
1635     pcf->Datefmt= (char*)fop->dateformat;
1636 		pcf->Fieldfmt= fop->fieldformat ? (char*)fop->fieldformat
1637 			: fop->jsonpath ? (char*)fop->jsonpath : (char*)fop->xmlpath;
1638 	} else {
1639     pcf->Offset= -1;
1640     pcf->Freq= 0;
1641     pcf->Datefmt= NULL;
1642     pcf->Fieldfmt= NULL;
1643   } // endif fop
1644 
1645 	if (!strcmp(chset, "binary"))
1646 		v = 'B';		// Binary string
1647 
1648   switch (fp->type()) {
1649     case MYSQL_TYPE_BLOB:
1650     case MYSQL_TYPE_VARCHAR:
1651     case MYSQL_TYPE_VAR_STRING:
1652       pcf->Flags |= U_VAR;
1653 			// fall through
1654     default:
1655       pcf->Type= MYSQLtoPLG(fp->type(), &v);
1656       break;
1657   } // endswitch SQL type
1658 
1659   switch (pcf->Type) {
1660     case TYPE_STRING:
1661 		case TYPE_BIN:
1662 			// Do something for case
1663       cp= chset;
1664 
1665       // Find if collation name ends by _ci
1666       if (!strcmp(cp + strlen(cp) - 3, "_ci")) {
1667         pcf->Scale= 1;     // Case insensitive
1668         pcf->Opt= 0;       // Prevent index opt until it is safe
1669         } // endif ci
1670 
1671       break;
1672     case TYPE_DOUBLE:
1673       pcf->Scale= MY_MAX(MY_MIN(fp->decimals(), ((unsigned)pcf->Length - 2)), 0);
1674       break;
1675     case TYPE_DECIM:
1676       pcf->Precision= ((Field_new_decimal*)fp)->precision;
1677       pcf->Length= pcf->Precision;
1678       pcf->Scale= fp->decimals();
1679       break;
1680     case TYPE_DATE:
1681       // Field_length is only used for DATE columns
1682       if (fop && fop->fldlen)
1683         pcf->Length= (int)fop->fldlen;
1684       else {
1685         int len;
1686 
1687         if (pcf->Datefmt) {
1688           // Find the (max) length produced by the date format
1689           char    buf[256];
1690           PGLOBAL g= GetPlug(table->in_use, xp);
1691           PDTP    pdtp= MakeDateFormat(g, pcf->Datefmt, false, true, 0);
1692           struct tm datm;
1693           bzero(&datm, sizeof(datm));
1694           datm.tm_mday= 12;
1695           datm.tm_mon= 11;
1696           datm.tm_year= 112;
1697           mktime(&datm); // set other fields get proper day name
1698           len= strftime(buf, 256, pdtp->OutFmt, &datm);
1699         } else
1700           len= 0;
1701 
1702         // 11 is for signed numeric representation of the date
1703         pcf->Length= (len) ? len : 11;
1704         } // endelse
1705 
1706       // For Value setting
1707       pcf->Precision= MY_MAX(pcf->Precision, pcf->Length);
1708       break;
1709     default:
1710       break;
1711   } // endswitch type
1712 
1713   if (fp->flags & UNSIGNED_FLAG)
1714     pcf->Flags |= U_UNSIGNED;
1715 
1716   if (fp->flags & ZEROFILL_FLAG)
1717     pcf->Flags |= U_ZEROFILL;
1718 
1719   // This is used to skip null bit
1720   if (fp->real_maybe_null())
1721     pcf->Flags |= U_NULLS;
1722 
1723   // Mark virtual columns as such
1724   if (fp->vcol_info && !fp->stored_in_db)
1725     pcf->Flags |= U_VIRTUAL;
1726 
1727   pcf->Key= 0;   // Not used when called from MySQL
1728 
1729   // Get the comment if any
1730   if (fp->comment.str && fp->comment.length)
1731     pcf->Remark= strz(g, fp->comment);
1732   else
1733     pcf->Remark= NULL;
1734 
1735   return fldp;
1736 } // end of GetColumnOption
1737 
1738 /****************************************************************************/
1739 /*  Return an index option structure.                                       */
1740 /****************************************************************************/
GetIndexOptionStruct(KEY * kp)1741 PXOS ha_connect::GetIndexOptionStruct(KEY *kp)
1742 {
1743   return kp->option_struct;
1744 } // end of GetIndexOptionStruct
1745 
1746 /****************************************************************************/
1747 /*  Return a Boolean index option or false if not specified.                */
1748 /****************************************************************************/
GetIndexOption(KEY * kp,PCSZ opname)1749 bool ha_connect::GetIndexOption(KEY *kp, PCSZ opname)
1750 {
1751   bool opval= false;
1752   PXOS options= GetIndexOptionStruct(kp);
1753 
1754   if (options) {
1755     if (!stricmp(opname, "Dynamic"))
1756       opval= options->dynamic;
1757     else if (!stricmp(opname, "Mapped"))
1758       opval= options->mapped;
1759 
1760   } else if (kp->comment.str && kp->comment.length) {
1761 		PCSZ pv, oplist= strz(xp->g, kp->comment);
1762 
1763     if ((pv= GetListOption(xp->g, opname, oplist)))
1764       opval= (!*pv || *pv == 'y' || *pv == 'Y' || atoi(pv) != 0);
1765 
1766   } // endif comment
1767 
1768   return opval;
1769 } // end of GetIndexOption
1770 
1771 /****************************************************************************/
1772 /*  Returns the index description structure used to make the index.         */
1773 /****************************************************************************/
IsUnique(uint n)1774 bool ha_connect::IsUnique(uint n)
1775 {
1776   return (table->key_info[n].flags & HA_NOSAME) != 0;
1777 } // end of IsUnique
1778 
1779 /****************************************************************************/
1780 /*  Returns the index description structure used to make the index.         */
1781 /****************************************************************************/
GetIndexInfo(TABLE_SHARE * s)1782 PIXDEF ha_connect::GetIndexInfo(TABLE_SHARE *s)
1783 {
1784   char    *name, *pn;
1785   bool     unique;
1786   PIXDEF   xdp, pxd=NULL, toidx= NULL;
1787   PKPDEF   kpp, pkp;
1788   KEY      kp;
1789   PGLOBAL& g= xp->g;
1790 
1791   if (!s)
1792     s= table->s;
1793 
1794   for (int n= 0; (unsigned)n < s->keynames.count; n++) {
1795     if (trace(1))
1796       htrc("Getting created index %d info\n", n + 1);
1797 
1798     // Find the index to describe
1799     kp= s->key_info[n];
1800 
1801     // Now get index information
1802     pn= (char*)s->keynames.type_names[n];
1803     name= PlugDup(g, pn);
1804     unique= (kp.flags & 1) != 0;
1805     pkp= NULL;
1806 
1807     // Allocate the index description block
1808     xdp= new(g) INDEXDEF(name, unique, n);
1809 
1810     // Get the the key parts info
1811     for (int k= 0; (unsigned)k < kp.user_defined_key_parts; k++) {
1812       pn= (char*)kp.key_part[k].field->field_name.str;
1813       name= PlugDup(g, pn);
1814 
1815       // Allocate the key part description block
1816       kpp= new(g) KPARTDEF(name, k + 1);
1817       kpp->SetKlen(kp.key_part[k].length);
1818 
1819 #if 0             // NIY
1820     // Index on auto increment column can be an XXROW index
1821     if (kp.key_part[k].field->flags & AUTO_INCREMENT_FLAG &&
1822         kp.uder_defined_key_parts == 1) {
1823       char   *type= GetStringOption("Type", "DOS");
1824       TABTYPE typ= GetTypeID(type);
1825 
1826       xdp->SetAuto(IsTypeFixed(typ));
1827       } // endif AUTO_INCREMENT
1828 #endif // 0
1829 
1830       if (pkp)
1831         pkp->SetNext(kpp);
1832       else
1833         xdp->SetToKeyParts(kpp);
1834 
1835       pkp= kpp;
1836       } // endfor k
1837 
1838     xdp->SetNParts(kp.user_defined_key_parts);
1839     xdp->Dynamic= GetIndexOption(&kp, "Dynamic");
1840     xdp->Mapped= GetIndexOption(&kp, "Mapped");
1841 
1842     if (pxd)
1843       pxd->SetNext(xdp);
1844     else
1845       toidx= xdp;
1846 
1847     pxd= xdp;
1848     } // endfor n
1849 
1850   return toidx;
1851 } // end of GetIndexInfo
1852 
1853 /****************************************************************************/
1854 /*  Returns the index description structure used to make the index.         */
1855 /****************************************************************************/
CheckVirtualIndex(TABLE_SHARE * s)1856 bool ha_connect::CheckVirtualIndex(TABLE_SHARE *s)
1857 {
1858 
1859   char    *rid;
1860   KEY      kp;
1861   Field   *fp;
1862   PGLOBAL& g= xp->g;
1863 
1864   if (!s)
1865     s= table->s;
1866 
1867   for (int n= 0; (unsigned)n < s->keynames.count; n++) {
1868     kp= s->key_info[n];
1869 
1870     // Now get index information
1871 
1872     // Get the the key parts info
1873     for (int k= 0; (unsigned)k < kp.user_defined_key_parts; k++) {
1874       fp= kp.key_part[k].field;
1875       rid= (fp->option_struct) ? fp->option_struct->special : NULL;
1876 
1877       if (!rid || (stricmp(rid, "ROWID") && stricmp(rid, "ROWNUM"))) {
1878         strcpy(g->Message, "Invalid virtual index");
1879         return true;
1880         } // endif rowid
1881 
1882       } // endfor k
1883 
1884     } // endfor n
1885 
1886   return false;
1887 } // end of CheckVirtualIndex
1888 
IsPartitioned(void)1889 bool ha_connect::IsPartitioned(void)
1890 {
1891 #ifdef WITH_PARTITION_STORAGE_ENGINE
1892 	if (tshp)
1893     return tshp->partition_info_str_len > 0;
1894   else if (table && table->part_info)
1895     return true;
1896   else
1897 #endif
1898 		return false;
1899 
1900 } // end of IsPartitioned
1901 
GetDBName(PCSZ name)1902 PCSZ ha_connect::GetDBName(PCSZ name)
1903 {
1904   return (name) ? name : table->s->db.str;
1905 } // end of GetDBName
1906 
GetTableName(void)1907 const char *ha_connect::GetTableName(void)
1908 {
1909   const char *path= tshp ? tshp->path.str : table_share->path.str;
1910   const char *name= strrchr(path, slash);
1911   return name ? name + 1 : path;
1912 } // end of GetTableName
1913 
GetPartName(void)1914 char *ha_connect::GetPartName(void)
1915 {
1916   return (IsPartitioned()) ? partname : (char*)GetTableName();
1917 } // end of GetTableName
1918 
1919 #if 0
1920 /****************************************************************************/
1921 /*  Returns the column real or special name length of a field.              */
1922 /****************************************************************************/
1923 int ha_connect::GetColNameLen(Field *fp)
1924 {
1925   int n;
1926   PFOS fop= GetFieldOptionStruct(fp);
1927 
1928   // Now get the column name length
1929   if (fop && fop->special)
1930     n= strlen(fop->special) + 1;
1931   else
1932     n= fp->field_name.length;
1933 
1934   return n;
1935 } // end of GetColNameLen
1936 
1937 /****************************************************************************/
1938 /*  Returns the column real or special name of a field.                     */
1939 /****************************************************************************/
1940 char *ha_connect::GetColName(Field *fp)
1941 {
1942   PFOS fop= GetFieldOptionStruct(fp);
1943 
1944   return (fop && fop->special) ? fop->special : (char*)fp->field_name.str;
1945 } // end of GetColName
1946 
1947 /****************************************************************************/
1948 /*  Adds the column real or special name of a field to a string.            */
1949 /****************************************************************************/
1950 void ha_connect::AddColName(char *cp, Field *fp)
1951 {
1952   PFOS fop= GetFieldOptionStruct(fp);
1953 
1954   // Now add the column name
1955   if (fop && fop->special)
1956     // The prefix * mark the column as "special"
1957     strcat(strcpy(cp, "*"), strupr(fop->special));
1958   else
1959     strcpy(cp, fp->field_name.str);
1960 
1961 } // end of AddColName
1962 #endif // 0
1963 
1964 /***********************************************************************/
1965 /*  This function sets the current database path.                      */
1966 /***********************************************************************/
SetDataPath(PGLOBAL g,PCSZ path)1967 bool ha_connect::SetDataPath(PGLOBAL g, PCSZ path)
1968 {
1969   return (!(datapath= SetPath(g, path)));
1970 } // end of SetDataPath
1971 
1972 /****************************************************************************/
1973 /*  Get the table description block of a CONNECT table.                     */
1974 /****************************************************************************/
GetTDB(PGLOBAL g)1975 PTDB ha_connect::GetTDB(PGLOBAL g)
1976 {
1977   const char *table_name;
1978   PTDB        tp;
1979 
1980   // Double test to be on the safe side
1981   if (!g || !table)
1982     return NULL;
1983 
1984   table_name= GetTableName();
1985 
1986   if (!xp->CheckQuery(valid_query_id) && tdbp
1987                       && !stricmp(tdbp->GetName(), table_name)
1988                       && (tdbp->GetMode() == xmod
1989                        || (tdbp->GetMode() == MODE_READ && xmod == MODE_READX)
1990                        || tdbp->GetAmType() == TYPE_AM_XML)) {
1991     tp= tdbp;
1992     tp->SetMode(xmod);
1993   } else if ((tp= CntGetTDB(g, table_name, xmod, this))) {
1994     valid_query_id= xp->last_query_id;
1995 //  tp->SetMode(xmod);
1996   } else
1997     htrc("GetTDB: %s\n", g->Message);
1998 
1999   return tp;
2000 } // end of GetTDB
2001 
2002 /****************************************************************************/
2003 /*  Open a CONNECT table, restricting column list if cols is true.          */
2004 /****************************************************************************/
OpenTable(PGLOBAL g,bool del)2005 int ha_connect::OpenTable(PGLOBAL g, bool del)
2006 {
2007   bool  rc= false;
2008   char *c1= NULL, *c2=NULL;
2009 
2010   // Double test to be on the safe side
2011   if (!g || !table) {
2012     htrc("OpenTable logical error; g=%p table=%p\n", g, table);
2013     return HA_ERR_INITIALIZATION;
2014     } // endif g
2015 
2016   if (!(tdbp= GetTDB(g)))
2017     return RC_FX;
2018   else if (tdbp->IsReadOnly())
2019     switch (xmod) {
2020       case MODE_WRITE:
2021       case MODE_INSERT:
2022       case MODE_UPDATE:
2023       case MODE_DELETE:
2024         strcpy(g->Message, MSG(READ_ONLY));
2025         return HA_ERR_TABLE_READONLY;
2026       default:
2027         break;
2028       } // endswitch xmode
2029 
2030 	// g->More is 1 when executing commands from triggers
2031   if (!g->More && (xmod != MODE_INSERT
2032 		           || tdbp->GetAmType() == TYPE_AM_MYSQL
2033                || tdbp->GetAmType() == TYPE_AM_ODBC
2034 							 || tdbp->GetAmType() == TYPE_AM_JDBC)) {
2035 		// Get the list of used fields (columns)
2036     char        *p;
2037     unsigned int k1, k2, n1, n2;
2038     Field*      *field;
2039     Field*       fp;
2040     MY_BITMAP   *map= (xmod == MODE_INSERT) ? table->write_set : table->read_set;
2041     MY_BITMAP   *ump= (xmod == MODE_UPDATE) ? table->write_set : NULL;
2042 
2043     k1= k2= 0;
2044     n1= n2= 1;         // 1 is space for final null character
2045 
2046     for (field= table->field; (fp= *field); field++) {
2047       if (bitmap_is_set(map, fp->field_index)) {
2048         n1+= (fp->field_name.length + 1);
2049         k1++;
2050         } // endif
2051 
2052       if (ump && bitmap_is_set(ump, fp->field_index)) {
2053         n2+= (fp->field_name.length + 1);
2054         k2++;
2055         } // endif
2056 
2057       } // endfor field
2058 
2059     if (k1) {
2060       p= c1= (char*)PlugSubAlloc(g, NULL, n1);
2061 
2062       for (field= table->field; (fp= *field); field++)
2063         if (bitmap_is_set(map, fp->field_index)) {
2064           strcpy(p, fp->field_name.str);
2065           p+= (fp->field_name.length + 1);
2066           } // endif used field
2067 
2068       *p= '\0';          // mark end of list
2069       } // endif k1
2070 
2071     if (k2) {
2072       p= c2= (char*)PlugSubAlloc(g, NULL, n2);
2073 
2074       for (field= table->field; (fp= *field); field++)
2075         if (bitmap_is_set(ump, fp->field_index)) {
2076           strcpy(p, fp->field_name.str);
2077 
2078           if (part_id && bitmap_is_set(part_id, fp->field_index)) {
2079             // Trying to update a column used for partitioning
2080             // This cannot be currently done because it may require
2081             // a row to be moved in another partition.
2082             sprintf(g->Message,
2083               "Cannot update column %s because it is used for partitioning",
2084               p);
2085             return HA_ERR_INTERNAL_ERROR;
2086             } // endif part_id
2087 
2088           p+= (strlen(p) + 1);
2089           } // endif used field
2090 
2091       *p= '\0';          // mark end of list
2092       } // endif k2
2093 
2094     } // endif xmod
2095 
2096   // Open the table
2097   if (!(rc= CntOpenTable(g, tdbp, xmod, c1, c2, del, this))) {
2098     istable= true;
2099 //  strmake(tname, table_name, sizeof(tname)-1);
2100 
2101 #ifdef NOT_USED_VARIABLE
2102     // We may be in a create index query
2103     if (xmod == MODE_ANY && *tdbp->GetName() != '#') {
2104       // The current indexes
2105       PIXDEF oldpix= GetIndexInfo();
2106       } // endif xmod
2107 #endif
2108 
2109   } else
2110     htrc("OpenTable: %s\n", g->Message);
2111 
2112   if (rc) {
2113     tdbp= NULL;
2114     valid_info= false;
2115     } // endif rc
2116 
2117   return (rc) ? HA_ERR_INITIALIZATION : 0;
2118 } // end of OpenTable
2119 
2120 
2121 /****************************************************************************/
2122 /*  CheckColumnList: check that all bitmap columns do exist.                */
2123 /****************************************************************************/
CheckColumnList(PGLOBAL g)2124 bool ha_connect::CheckColumnList(PGLOBAL g)
2125 {
2126   // Check the list of used fields (columns)
2127   bool       brc= false;
2128   PCOL       colp;
2129   Field*    *field;
2130   Field*     fp;
2131   MY_BITMAP *map= table->read_set;
2132 
2133 	try {
2134           for (field= table->field; (fp= *field); field++)
2135       if (bitmap_is_set(map, fp->field_index)) {
2136         if (!(colp= tdbp->ColDB(g, (PSZ)fp->field_name.str, 0))) {
2137           sprintf(g->Message, "Column %s not found in %s",
2138                   fp->field_name.str, tdbp->GetName());
2139 					throw 1;
2140 				} // endif colp
2141 
2142         if ((brc= colp->InitValue(g)))
2143 					throw 2;
2144 
2145         colp->AddColUse(U_P);           // For PLG tables
2146         } // endif
2147 
2148 	} catch (int n) {
2149 		if (trace(1))
2150 			htrc("Exception %d: %s\n", n, g->Message);
2151 		brc= true;
2152 	} catch (const char *msg) {
2153 		strcpy(g->Message, msg);
2154 		brc= true;
2155 	} // end catch
2156 
2157   return brc;
2158 } // end of CheckColumnList
2159 
2160 
2161 /****************************************************************************/
2162 /*  IsOpened: returns true if the table is already opened.                  */
2163 /****************************************************************************/
IsOpened(void)2164 bool ha_connect::IsOpened(void)
2165 {
2166   return (!xp->CheckQuery(valid_query_id) && tdbp
2167                                           && tdbp->GetUse() == USE_OPEN);
2168 } // end of IsOpened
2169 
2170 
2171 /****************************************************************************/
2172 /*  Close a CONNECT table.                                                  */
2173 /****************************************************************************/
CloseTable(PGLOBAL g)2174 int ha_connect::CloseTable(PGLOBAL g)
2175 {
2176   int rc= CntCloseTable(g, tdbp, nox, abort);
2177   tdbp= NULL;
2178   sdvalin1= sdvalin2= sdvalin3= sdvalin4= NULL;
2179   sdvalout=NULL;
2180   valid_info= false;
2181   indexing= -1;
2182   nox= true;
2183   abort= false;
2184   return rc;
2185 } // end of CloseTable
2186 
2187 
2188 /***********************************************************************/
2189 /*  Make a pseudo record from current row values. Specific to MySQL.   */
2190 /***********************************************************************/
MakeRecord(char * buf)2191 int ha_connect::MakeRecord(char *buf)
2192 {
2193 	PCSZ           fmt;
2194   char          *p, val[32];
2195   int            rc= 0;
2196   Field*        *field;
2197   Field         *fp;
2198   CHARSET_INFO  *charset= tdbp->data_charset();
2199 //MY_BITMAP      readmap;
2200   MY_BITMAP     *map;
2201   PVAL           value;
2202   PCOL           colp= NULL;
2203   DBUG_ENTER("ha_connect::MakeRecord");
2204 
2205   if (trace(2))
2206     htrc("Maps: read=%08X write=%08X defr=%08X defw=%08X\n",
2207             *table->read_set->bitmap, *table->write_set->bitmap,
2208             *table->def_read_set.bitmap, *table->def_write_set.bitmap);
2209 
2210   // Avoid asserts in field::store() for columns that are not updated
2211   MY_BITMAP *org_bitmap= dbug_tmp_use_all_columns(table, &table->write_set);
2212 
2213   // This is for variable_length rows
2214   memset(buf, 0, table->s->null_bytes);
2215 
2216   // When sorting read_set selects all columns, so we use def_read_set
2217   map= (MY_BITMAP *)&table->def_read_set;
2218 
2219   // Make the pseudo record from field values
2220   for (field= table->field; *field && !rc; field++) {
2221     fp= *field;
2222 
2223     if (fp->vcol_info && !fp->stored_in_db)
2224       continue;            // This is a virtual column
2225 
2226     if (bitmap_is_set(map, fp->field_index) || alter) {
2227       // This is a used field, fill the buffer with value
2228       for (colp= tdbp->GetColumns(); colp; colp= colp->GetNext())
2229         if ((!mrr || colp->GetKcol()) &&
2230             !stricmp(colp->GetName(), fp->field_name.str))
2231           break;
2232 
2233       if (!colp) {
2234         if (mrr)
2235           continue;
2236 
2237         htrc("Column %s not found\n", fp->field_name.str);
2238         dbug_tmp_restore_column_map(&table->write_set, org_bitmap);
2239         DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
2240         } // endif colp
2241 
2242       value= colp->GetValue();
2243       p= NULL;
2244 
2245       // All this was better optimized
2246       if (!value->IsNull()) {
2247         switch (value->GetType()) {
2248           case TYPE_DATE:
2249             if (!sdvalout)
2250               sdvalout= AllocateValue(xp->g, TYPE_STRING, 20);
2251 
2252             switch (fp->type()) {
2253               case MYSQL_TYPE_DATE:
2254                 fmt= "%Y-%m-%d";
2255                 break;
2256               case MYSQL_TYPE_TIME:
2257                 fmt= "%H:%M:%S";
2258                 break;
2259               case MYSQL_TYPE_YEAR:
2260                 fmt= "%Y";
2261                 break;
2262               default:
2263                 fmt= "%Y-%m-%d %H:%M:%S";
2264                 break;
2265               } // endswitch type
2266 
2267             // Get date in the format required by MySQL fields
2268             value->FormatValue(sdvalout, fmt);
2269             p= sdvalout->GetCharValue();
2270             rc= fp->store(p, strlen(p), charset, CHECK_FIELD_WARN);
2271             break;
2272           case TYPE_STRING:
2273           case TYPE_DECIM:
2274             p= value->GetCharString(val);
2275             charset= tdbp->data_charset();
2276             rc= fp->store(p, strlen(p), charset, CHECK_FIELD_WARN);
2277             break;
2278 					case TYPE_BIN:
2279 						p= value->GetCharValue();
2280 						charset= &my_charset_bin;
2281 						rc= fp->store(p, value->GetSize(), charset, CHECK_FIELD_WARN);
2282 						break;
2283           case TYPE_DOUBLE:
2284             rc= fp->store(value->GetFloatValue());
2285             break;
2286           default:
2287             rc= fp->store(value->GetBigintValue(), value->IsUnsigned());
2288             break;
2289           } // endswitch Type
2290 
2291         // Store functions returns 1 on overflow and -1 on fatal error
2292         if (rc > 0) {
2293           char buf[256];
2294           THD *thd= ha_thd();
2295 
2296           sprintf(buf, "Out of range value %.140s for column '%s' at row %ld",
2297             value->GetCharString(val),
2298             fp->field_name.str,
2299             thd->get_stmt_da()->current_row_for_warning());
2300 
2301           push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, buf);
2302           DBUG_PRINT("MakeRecord", ("%s", buf));
2303           rc= 0;
2304         } else if (rc < 0)
2305           rc= HA_ERR_WRONG_IN_RECORD;
2306 
2307         fp->set_notnull();
2308       } else
2309         fp->set_null();
2310 
2311       } // endif bitmap
2312 
2313     } // endfor field
2314 
2315   // This is sometimes required for partition tables because the buf
2316   // can be different from the table->record[0] buffer
2317   if (buf != (char*)table->record[0])
2318     memcpy(buf, table->record[0], table->s->stored_rec_length);
2319 
2320   // This is copied from ha_tina and is necessary to avoid asserts
2321   dbug_tmp_restore_column_map(&table->write_set, org_bitmap);
2322   DBUG_RETURN(rc);
2323 } // end of MakeRecord
2324 
2325 
2326 /***********************************************************************/
2327 /*  Set row values from a MySQL pseudo record. Specific to MySQL.      */
2328 /***********************************************************************/
ScanRecord(PGLOBAL g,const uchar *)2329 int ha_connect::ScanRecord(PGLOBAL g, const uchar *)
2330 {
2331   char    attr_buffer[1024];
2332   char    data_buffer[1024];
2333   PCSZ    fmt;
2334   int     rc= 0;
2335   PCOL    colp;
2336   PVAL    value, sdvalin;
2337   Field  *fp;
2338 //PTDBASE tp= (PTDBASE)tdbp;
2339   String  attribute(attr_buffer, sizeof(attr_buffer),
2340                     table->s->table_charset);
2341   MY_BITMAP *bmap= dbug_tmp_use_all_columns(table, &table->read_set);
2342   const CHARSET_INFO *charset= tdbp->data_charset();
2343   String  data_charset_value(data_buffer, sizeof(data_buffer),  charset);
2344 
2345   // Scan the pseudo record for field values and set column values
2346   for (Field **field=table->field ; *field ; field++) {
2347     fp= *field;
2348 
2349     if ((fp->vcol_info && !fp->stored_in_db) ||
2350          fp->option_struct->special)
2351       continue;            // Is a virtual column possible here ???
2352 
2353     if ((xmod == MODE_INSERT && tdbp->GetAmType() != TYPE_AM_MYSQL
2354                              && tdbp->GetAmType() != TYPE_AM_ODBC
2355 														 && tdbp->GetAmType() != TYPE_AM_JDBC) ||
2356 														 bitmap_is_set(table->write_set, fp->field_index)) {
2357       for (colp= tdbp->GetSetCols(); colp; colp= colp->GetNext())
2358         if (!stricmp(colp->GetName(), fp->field_name.str))
2359           break;
2360 
2361       if (!colp) {
2362         htrc("Column %s not found\n", fp->field_name.str);
2363         rc= HA_ERR_WRONG_IN_RECORD;
2364         goto err;
2365       } else
2366         value= colp->GetValue();
2367 
2368       // This is a used field, fill the value from the row buffer
2369       // All this could be better optimized
2370       if (fp->is_null()) {
2371         if (colp->IsNullable())
2372           value->SetNull(true);
2373 
2374         value->Reset();
2375       } else switch (value->GetType()) {
2376         case TYPE_DOUBLE:
2377           value->SetValue(fp->val_real());
2378           break;
2379         case TYPE_DATE:
2380           // Get date in the format produced by MySQL fields
2381           switch (fp->type()) {
2382             case MYSQL_TYPE_DATE:
2383               if (!sdvalin2) {
2384                 sdvalin2= (DTVAL*)AllocateValue(xp->g, TYPE_DATE, 19);
2385                 fmt= "YYYY-MM-DD";
2386                 ((DTVAL*)sdvalin2)->SetFormat(g, fmt, strlen(fmt));
2387                 } // endif sdvalin1
2388 
2389               sdvalin= sdvalin2;
2390               break;
2391             case MYSQL_TYPE_TIME:
2392               if (!sdvalin3) {
2393                 sdvalin3= (DTVAL*)AllocateValue(xp->g, TYPE_DATE, 19);
2394                 fmt= "hh:mm:ss";
2395                 ((DTVAL*)sdvalin3)->SetFormat(g, fmt, strlen(fmt));
2396                 } // endif sdvalin1
2397 
2398               sdvalin= sdvalin3;
2399               break;
2400             case MYSQL_TYPE_YEAR:
2401               if (!sdvalin4) {
2402                 sdvalin4= (DTVAL*)AllocateValue(xp->g, TYPE_DATE, 19);
2403                 fmt= "YYYY";
2404                 ((DTVAL*)sdvalin4)->SetFormat(g, fmt, strlen(fmt));
2405                 } // endif sdvalin1
2406 
2407               sdvalin= sdvalin4;
2408               break;
2409             default:
2410               if (!sdvalin1) {
2411                 sdvalin1= (DTVAL*)AllocateValue(xp->g, TYPE_DATE, 19);
2412                 fmt= "YYYY-MM-DD hh:mm:ss";
2413                 ((DTVAL*)sdvalin1)->SetFormat(g, fmt, strlen(fmt));
2414                 } // endif sdvalin1
2415 
2416               sdvalin= sdvalin1;
2417             } // endswitch type
2418 
2419           sdvalin->SetNullable(colp->IsNullable());
2420           fp->val_str(&attribute);
2421           sdvalin->SetValue_psz(attribute.c_ptr_safe());
2422           value->SetValue_pval(sdvalin);
2423           break;
2424         default:
2425           fp->val_str(&attribute);
2426 
2427           if (charset != &my_charset_bin) {
2428             // Convert from SQL field charset to DATA_CHARSET
2429             uint cnv_errors;
2430 
2431             data_charset_value.copy(attribute.ptr(), attribute.length(),
2432                                     attribute.charset(), charset, &cnv_errors);
2433             value->SetValue_psz(data_charset_value.c_ptr_safe());
2434           } else
2435             value->SetValue_psz(attribute.c_ptr_safe());
2436 
2437           break;
2438         } // endswitch Type
2439 
2440 #ifdef NEWCHANGE
2441     } else if (xmod == MODE_UPDATE) {
2442       PCOL cp;
2443 
2444       for (cp= tdbp->GetColumns(); cp; cp= cp->GetNext())
2445         if (!stricmp(colp->GetName(), cp->GetName()))
2446           break;
2447 
2448       if (!cp) {
2449         rc= HA_ERR_WRONG_IN_RECORD;
2450         goto err;
2451         } // endif cp
2452 
2453       value->SetValue_pval(cp->GetValue());
2454     } else // mode Insert
2455       value->Reset();
2456 #else
2457     } // endif bitmap_is_set
2458 #endif
2459 
2460     } // endfor field
2461 
2462  err:
2463   dbug_tmp_restore_column_map(&table->read_set, bmap);
2464   return rc;
2465 } // end of ScanRecord
2466 
2467 
2468 /***********************************************************************/
2469 /*  Check change in index column. Specific to MySQL.                   */
2470 /*  Should be elaborated to check for real changes.                    */
2471 /***********************************************************************/
CheckRecord(PGLOBAL g,const uchar *,const uchar * newbuf)2472 int ha_connect::CheckRecord(PGLOBAL g, const uchar *, const uchar *newbuf)
2473 {
2474 	return ScanRecord(g, newbuf);
2475 } // end of dummy CheckRecord
2476 
2477 
2478 /***********************************************************************/
2479 /*  Return true if this field is used in current indexing.             */
2480 /***********************************************************************/
IsIndexed(Field * fp)2481 bool ha_connect::IsIndexed(Field *fp)
2482 {
2483 	if (active_index < MAX_KEY) {
2484 		KEY_PART_INFO *kpart;
2485 		KEY           *kfp= &table->key_info[active_index];
2486 		uint           rem= kfp->user_defined_key_parts;
2487 
2488 		for (kpart= kfp->key_part; rem; rem--, kpart++)
2489 			if (kpart->field == fp)
2490 				return true;
2491 
2492 	} // endif active_index
2493 
2494 	return false;
2495 } // end of IsIndexed
2496 
2497 
2498 /***********************************************************************/
2499 /*  Return the where clause for remote indexed read.                   */
2500 /***********************************************************************/
MakeKeyWhere(PGLOBAL g,PSTRG qry,OPVAL vop,char q,const key_range * kr)2501 bool ha_connect::MakeKeyWhere(PGLOBAL g, PSTRG qry, OPVAL vop, char q,
2502 	                            const key_range *kr)
2503 {
2504 	const uchar     *ptr;
2505 //uint             i, rem, len, klen, stlen;
2506 	uint             i, rem, len, stlen;
2507 	bool             nq, both, oom;
2508 	OPVAL            op;
2509 	Field           *fp;
2510 	const key_range *ranges[2];
2511 	MY_BITMAP    *old_map;
2512 	KEY             *kfp;
2513   KEY_PART_INFO   *kpart;
2514 
2515   if (active_index == MAX_KEY)
2516     return false;
2517 
2518 	ranges[0]= kr;
2519 	ranges[1]= (end_range && !eq_range) ? &save_end_range : NULL;
2520 
2521 	if (!ranges[0] && !ranges[1]) {
2522 		strcpy(g->Message, "MakeKeyWhere: No key");
2523 	  return true;
2524 	}	else
2525 		both= ranges[0] && ranges[1];
2526 
2527 	kfp= &table->key_info[active_index];
2528 	old_map= dbug_tmp_use_all_columns(table, &table->write_set);
2529 
2530 	for (i= 0; i <= 1; i++) {
2531 		if (ranges[i] == NULL)
2532 			continue;
2533 
2534 		if (both && i > 0)
2535 			qry->Append(") AND (");
2536 		else
2537 			qry->Append(" WHERE (");
2538 
2539 //	klen= len= ranges[i]->length;
2540 		len= ranges[i]->length;
2541 		rem= kfp->user_defined_key_parts;
2542 		ptr= ranges[i]->key;
2543 
2544 		for (kpart= kfp->key_part; rem; rem--, kpart++) {
2545 			fp= kpart->field;
2546 			stlen= kpart->store_length;
2547 			nq= fp->str_needs_quotes();
2548 
2549 			if (kpart != kfp->key_part)
2550 				qry->Append(" AND ");
2551 
2552 			if (q) {
2553 				qry->Append(q);
2554 				qry->Append((PSZ)fp->field_name.str);
2555 				qry->Append(q);
2556 			}	else
2557 				qry->Append((PSZ)fp->field_name.str);
2558 
2559 			switch (ranges[i]->flag) {
2560 			case HA_READ_KEY_EXACT:
2561 //			op= (stlen >= len || !nq || fp->result_type() != STRING_RESULT)
2562 //				? OP_EQ : OP_LIKE;
2563 				op= OP_EQ;
2564 				break;
2565 			case HA_READ_AFTER_KEY:
2566 				op= (stlen >= len || i > 0) ? (i > 0 ? OP_LE : OP_GT) : OP_GE;
2567 				break;
2568 			case HA_READ_KEY_OR_NEXT:
2569 				op= OP_GE;
2570 				break;
2571 			case HA_READ_BEFORE_KEY:
2572 				op= (stlen >= len) ? OP_LT : OP_LE;
2573 				break;
2574 			case HA_READ_KEY_OR_PREV:
2575 				op= OP_LE;
2576 				break;
2577 			default:
2578 				sprintf(g->Message, "cannot handle flag %d", ranges[i]->flag);
2579 				goto err;
2580 			}	// endswitch flag
2581 
2582 			qry->Append((PSZ)GetValStr(op, false));
2583 
2584 			if (nq)
2585 				qry->Append('\'');
2586 
2587 			if (kpart->key_part_flag & HA_VAR_LENGTH_PART) {
2588 				String varchar;
2589 				uint   var_length= uint2korr(ptr);
2590 
2591 				varchar.set_quick((char*)ptr + HA_KEY_BLOB_LENGTH,
2592 					var_length, &my_charset_bin);
2593 				qry->Append(varchar.ptr(), varchar.length(), nq);
2594 			}	else {
2595 				char   strbuff[MAX_FIELD_WIDTH];
2596 				String str(strbuff, sizeof(strbuff), kpart->field->charset()), *res;
2597 
2598 				res= fp->val_str(&str, ptr);
2599 				qry->Append(res->ptr(), res->length(), nq);
2600 			} // endif flag
2601 
2602 			if (nq)
2603 				qry->Append('\'');
2604 
2605 			if (stlen >= len)
2606 				break;
2607 
2608 			len-= stlen;
2609 
2610 			/* For nullable columns, null-byte is already skipped before, that is
2611 			ptr was incremented by 1. Since store_length still counts null-byte,
2612 			we need to subtract 1 from store_length. */
2613 			ptr+= stlen - MY_TEST(kpart->null_bit);
2614 		} // endfor kpart
2615 
2616 		} // endfor i
2617 
2618 	qry->Append(')');
2619 
2620   if ((oom= qry->IsTruncated()))
2621     strcpy(g->Message, "Out of memory");
2622 
2623 	dbug_tmp_restore_column_map(&table->write_set, old_map);
2624 	return oom;
2625 
2626 err:
2627 	dbug_tmp_restore_column_map(&table->write_set, old_map);
2628 	return true;
2629 } // end of MakeKeyWhere
2630 
2631 
2632 /***********************************************************************/
2633 /*  Return the string representing an operator.                        */
2634 /***********************************************************************/
GetValStr(OPVAL vop,bool neg)2635 const char *ha_connect::GetValStr(OPVAL vop, bool neg)
2636 {
2637   const char *val;
2638 
2639   switch (vop) {
2640     case OP_EQ:
2641       val= "= ";
2642       break;
2643     case OP_NE:
2644       val= " <> ";
2645       break;
2646     case OP_GT:
2647       val= " > ";
2648       break;
2649     case OP_GE:
2650       val= " >= ";
2651       break;
2652     case OP_LT:
2653       val= " < ";
2654       break;
2655     case OP_LE:
2656       val= " <= ";
2657       break;
2658     case OP_IN:
2659       val= (neg) ? " NOT IN (" : " IN (";
2660       break;
2661     case OP_NULL:
2662       val= (neg) ? " IS NOT NULL" : " IS NULL";
2663       break;
2664     case OP_LIKE:
2665       val= (neg) ? " NOT LIKE " : " LIKE ";
2666       break;
2667     case OP_XX:
2668       val= (neg) ? " NOT BETWEEN " : " BETWEEN ";
2669       break;
2670     case OP_EXIST:
2671       val= (neg) ? " NOT EXISTS " : " EXISTS ";
2672       break;
2673     case OP_AND:
2674       val= " AND ";
2675       break;
2676     case OP_OR:
2677       val= " OR ";
2678       break;
2679     case OP_NOT:
2680       val= " NOT ";
2681       break;
2682     case OP_CNC:
2683       val= " || ";
2684       break;
2685     case OP_ADD:
2686       val= " + ";
2687       break;
2688     case OP_SUB:
2689       val= " - ";
2690       break;
2691     case OP_MULT:
2692       val= " * ";
2693       break;
2694     case OP_DIV:
2695       val= " / ";
2696       break;
2697     default:
2698       val= " ? ";
2699       break;
2700     } /* endswitch */
2701 
2702   return val;
2703 } // end of GetValStr
2704 
2705 #if 0
2706 /***********************************************************************/
2707 /*  Check the WHERE condition and return a CONNECT filter.             */
2708 /***********************************************************************/
2709 PFIL ha_connect::CheckFilter(PGLOBAL g)
2710 {
2711   return CondFilter(g, (Item *)pushed_cond);
2712 } // end of CheckFilter
2713 #endif // 0
2714 
2715 /***********************************************************************/
2716 /*  Check the WHERE condition and return a CONNECT filter.             */
2717 /***********************************************************************/
CondFilter(PGLOBAL g,Item * cond)2718 PFIL ha_connect::CondFilter(PGLOBAL g, Item *cond)
2719 {
2720   unsigned int i;
2721   bool  ismul= false;
2722   OPVAL vop= OP_XX;
2723   PFIL  filp= NULL;
2724 
2725   if (!cond)
2726     return NULL;
2727 
2728   if (trace(1))
2729     htrc("Cond type=%d\n", cond->type());
2730 
2731   if (cond->type() == COND::COND_ITEM) {
2732     PFIL       fp;
2733     Item_cond *cond_item= (Item_cond *)cond;
2734 
2735     if (trace(1))
2736       htrc("Cond: Ftype=%d name=%s\n", cond_item->functype(),
2737                                        cond_item->func_name());
2738 
2739     switch (cond_item->functype()) {
2740       case Item_func::COND_AND_FUNC: vop= OP_AND; break;
2741       case Item_func::COND_OR_FUNC:  vop= OP_OR;  break;
2742       default: return NULL;
2743       } // endswitch functype
2744 
2745     List<Item>* arglist= cond_item->argument_list();
2746     List_iterator<Item> li(*arglist);
2747     Item *subitem;
2748 
2749     for (i= 0; i < arglist->elements; i++)
2750       if ((subitem= li++)) {
2751         if (!(fp= CondFilter(g, subitem))) {
2752           if (vop == OP_OR)
2753             return NULL;
2754         } else
2755           filp= (filp) ? MakeFilter(g, filp, vop, fp) : fp;
2756 
2757       } else
2758         return NULL;
2759 
2760   } else if (cond->type() == COND::FUNC_ITEM) {
2761     unsigned int i;
2762     bool       iscol, neg= FALSE;
2763     PCOL       colp[2]= {NULL,NULL};
2764     PPARM      pfirst= NULL, pprec= NULL;
2765     POPER      pop;
2766     Item_func *condf= (Item_func *)cond;
2767     Item*     *args= condf->arguments();
2768 
2769     if (trace(1))
2770       htrc("Func type=%d argnum=%d\n", condf->functype(),
2771                                        condf->argument_count());
2772 
2773     switch (condf->functype()) {
2774       case Item_func::EQUAL_FUNC:
2775       case Item_func::EQ_FUNC: vop= OP_EQ;  break;
2776       case Item_func::NE_FUNC: vop= OP_NE;  break;
2777       case Item_func::LT_FUNC: vop= OP_LT;  break;
2778       case Item_func::LE_FUNC: vop= OP_LE;  break;
2779       case Item_func::GE_FUNC: vop= OP_GE;  break;
2780       case Item_func::GT_FUNC: vop= OP_GT;  break;
2781       case Item_func::IN_FUNC: vop= OP_IN;	/* fall through */
2782       case Item_func::BETWEEN:
2783         ismul= true;
2784         neg= ((Item_func_opt_neg *)condf)->negated;
2785         break;
2786       default: return NULL;
2787       } // endswitch functype
2788 
2789     pop= (POPER)PlugSubAlloc(g, NULL, sizeof(OPER));
2790     pop->Name= NULL;
2791     pop->Val=vop;
2792     pop->Mod= 0;
2793 
2794     if (condf->argument_count() < 2)
2795       return NULL;
2796 
2797     for (i= 0; i < condf->argument_count(); i++) {
2798       if (trace(1))
2799         htrc("Argtype(%d)=%d\n", i, args[i]->type());
2800 
2801       if (i >= 2 && !ismul) {
2802         if (trace(1))
2803           htrc("Unexpected arg for vop=%d\n", vop);
2804 
2805         continue;
2806         } // endif i
2807 
2808       if ((iscol= args[i]->type() == COND::FIELD_ITEM)) {
2809         Item_field *pField= (Item_field *)args[i];
2810 
2811         // IN and BETWEEN clauses should be col VOP list
2812         if (i && ismul)
2813           return NULL;
2814 
2815         if (pField->field->table != table ||
2816             !(colp[i]= tdbp->ColDB(g, (PSZ)pField->field->field_name.str, 0)))
2817           return NULL;  // Column does not belong to this table
2818 
2819 				// These types are not yet implemented (buggy)
2820 				switch (pField->field->type()) {
2821 				case MYSQL_TYPE_TIMESTAMP:
2822 				case MYSQL_TYPE_DATE:
2823 				case MYSQL_TYPE_TIME:
2824 				case MYSQL_TYPE_DATETIME:
2825 				case MYSQL_TYPE_YEAR:
2826 				case MYSQL_TYPE_NEWDATE:
2827 					return NULL;
2828 				default:
2829 					break;
2830 				} // endswitch type
2831 
2832         if (trace(1)) {
2833           htrc("Field index=%d\n", pField->field->field_index);
2834           htrc("Field name=%s\n", pField->field->field_name.str);
2835           } // endif trace
2836 
2837       } else {
2838         char    buff[256];
2839         String *res, tmp(buff, sizeof(buff), &my_charset_bin);
2840         PPARM pp= (PPARM)PlugSubAlloc(g, NULL, sizeof(PARM));
2841 
2842         // IN and BETWEEN clauses should be col VOP list
2843         if (!i && (ismul))
2844           return NULL;
2845 
2846         switch (args[i]->real_type()) {
2847           case COND::CONST_ITEM:
2848           {
2849             Item *pval= (Item *)args[i];
2850           switch (args[i]->cmp_type()) {
2851             case STRING_RESULT:
2852               res= pval->val_str(&tmp);
2853               pp->Value= PlugSubAllocStr(g, NULL, res->ptr(), res->length());
2854               pp->Type= (pp->Value) ? TYPE_STRING : TYPE_ERROR;
2855               break;
2856             case INT_RESULT:
2857               pp->Type= TYPE_INT;
2858               pp->Value= PlugSubAlloc(g, NULL, sizeof(int));
2859               *((int*)pp->Value)= (int)pval->val_int();
2860               break;
2861             case TIME_RESULT:
2862               pp->Type= TYPE_DATE;
2863               pp->Value= PlugSubAlloc(g, NULL, sizeof(int));
2864               *((int*)pp->Value)= (int) Temporal_hybrid(pval).to_longlong();
2865               break;
2866             case REAL_RESULT:
2867             case DECIMAL_RESULT:
2868               pp->Type= TYPE_DOUBLE;
2869               pp->Value= PlugSubAlloc(g, NULL, sizeof(double));
2870               *((double*)pp->Value)= pval->val_real();
2871               break;
2872             case ROW_RESULT:
2873               DBUG_ASSERT(0);
2874               return NULL;
2875           }
2876           }
2877           break;
2878           case COND::CACHE_ITEM:    // Possible ???
2879           case COND::NULL_ITEM:     // TODO: handle this
2880           default:
2881             return NULL;
2882         } // endswitch type
2883 
2884 				if (trace(1))
2885           htrc("Value type=%hd\n", pp->Type);
2886 
2887         // Append the value to the argument list
2888         if (pprec)
2889           pprec->Next= pp;
2890         else
2891           pfirst= pp;
2892 
2893         pp->Domain= i;
2894         pp->Next= NULL;
2895         pprec= pp;
2896       } // endif type
2897 
2898       } // endfor i
2899 
2900     filp= MakeFilter(g, colp, pop, pfirst, neg);
2901   } else {
2902     if (trace(1))
2903       htrc("Unsupported condition\n");
2904 
2905     return NULL;
2906   } // endif's type
2907 
2908   return filp;
2909 } // end of CondFilter
2910 
2911 /***********************************************************************/
2912 /*  Check the WHERE condition and return a MYSQL/ODBC/JDBC/WQL filter. */
2913 /***********************************************************************/
CheckCond(PGLOBAL g,PCFIL filp,const Item * cond)2914 PCFIL ha_connect::CheckCond(PGLOBAL g, PCFIL filp, const Item *cond)
2915 {
2916 	AMT   tty= filp->Type;
2917   char *body= filp->Body;
2918 	char *havg= filp->Having;
2919 	unsigned int i;
2920   bool  ismul= false, x= (tty == TYPE_AM_MYX || tty == TYPE_AM_XDBC);
2921   bool  nonul= ((tty == TYPE_AM_ODBC || tty == TYPE_AM_JDBC) &&
2922 		 (tdbp->GetMode() == MODE_INSERT || tdbp->GetMode() == MODE_DELETE));
2923   OPVAL vop= OP_XX;
2924 
2925   if (!cond)
2926     return NULL;
2927 
2928   if (trace(1))
2929     htrc("Cond type=%d\n", cond->type());
2930 
2931   if (cond->type() == COND::COND_ITEM) {
2932     char      *pb0, *pb1, *pb2, *ph0= 0, *ph1= 0, *ph2= 0;
2933 		bool       bb= false, bh= false;
2934     Item_cond *cond_item= (Item_cond *)cond;
2935 
2936     if (x)
2937       return NULL;
2938 		else
2939 			pb0= pb1= pb2= ph0= ph1= ph2= NULL;
2940 
2941     if (trace(1))
2942       htrc("Cond: Ftype=%d name=%s\n", cond_item->functype(),
2943                                        cond_item->func_name());
2944 
2945     switch (cond_item->functype()) {
2946       case Item_func::COND_AND_FUNC: vop= OP_AND; break;
2947       case Item_func::COND_OR_FUNC:  vop= OP_OR;  break;
2948       default: return NULL;
2949       } // endswitch functype
2950 
2951     List<Item>* arglist= cond_item->argument_list();
2952     List_iterator<Item> li(*arglist);
2953     const Item *subitem;
2954 
2955     pb0= pb1= body + strlen(body);
2956     strcpy(pb0, "(");
2957     pb2= pb1 + 1;
2958 
2959 		if (havg) {
2960 			ph0= ph1= havg + strlen(havg);
2961 			strcpy(ph0, "(");
2962 			ph2= ph1 + 1;
2963 		} // endif havg
2964 
2965     for (i= 0; i < arglist->elements; i++)
2966       if ((subitem= li++)) {
2967         if (!CheckCond(g, filp, subitem)) {
2968           if (vop == OP_OR || nonul)
2969             return NULL;
2970 					else {
2971 						*pb2= 0;
2972 						if (havg) *ph2= 0;
2973 					}	// endelse
2974 
2975         } else {
2976 					if (filp->Bd) {
2977 						pb1= pb2 + strlen(pb2);
2978 						strcpy(pb1, GetValStr(vop, false));
2979 						pb2= pb1 + strlen(pb1);
2980 					} // endif Bd
2981 
2982 					if (filp->Hv) {
2983 						ph1= ph2 + strlen(ph2);
2984 						strcpy(ph1, GetValStr(vop, false));
2985 						ph2= ph1 + strlen(ph1);
2986 					} // endif Hv
2987 
2988         } // endif CheckCond
2989 
2990 				bb |= filp->Bd;
2991 				bh |= filp->Hv;
2992 				filp->Bd= filp->Hv= false;
2993       } else
2994         return NULL;
2995 
2996     if (bb)	{
2997       strcpy(pb1, ")");
2998 			filp->Bd= bb;
2999 		} else
3000 			*pb0= 0;
3001 
3002 		if (havg) {
3003 			if (bb && bh && vop == OP_OR) {
3004 				// Cannot or'ed a where clause with a having clause
3005 				bb= bh= 0;
3006 				*pb0= 0;
3007 				*ph0= 0;
3008 			} else if (bh)	{
3009 				strcpy(ph1, ")");
3010 				filp->Hv= bh;
3011 			} else
3012 				*ph0= 0;
3013 
3014 		} // endif havg
3015 
3016 		if (!bb && !bh)
3017 			return NULL;
3018 
3019   } else if (cond->type() == COND::FUNC_ITEM) {
3020     unsigned int i;
3021     bool       iscol, ishav= false, neg= false;
3022     Item_func *condf= (Item_func *)cond;
3023     Item*     *args= condf->arguments();
3024 
3025 		filp->Bd= filp->Hv= false;
3026 
3027     if (trace(1))
3028       htrc("Func type=%d argnum=%d\n", condf->functype(),
3029                                        condf->argument_count());
3030 
3031     switch (condf->functype()) {
3032       case Item_func::EQUAL_FUNC:
3033 			case Item_func::EQ_FUNC:     vop= OP_EQ;   break;
3034 			case Item_func::NE_FUNC:     vop= OP_NE;   break;
3035 			case Item_func::LT_FUNC:     vop= OP_LT;   break;
3036 			case Item_func::LE_FUNC:     vop= OP_LE;   break;
3037 			case Item_func::GE_FUNC:     vop= OP_GE;   break;
3038 			case Item_func::GT_FUNC:     vop= OP_GT;   break;
3039 #if MYSQL_VERSION_ID > 100200
3040 			case Item_func::LIKE_FUNC:
3041 				vop = OP_LIKE;
3042 			  neg= ((Item_func_like*)condf)->negated;
3043 			  break;
3044 #endif // VERSION_ID > 100200
3045 			case Item_func::ISNOTNULL_FUNC:
3046 				neg= true;
3047 				// fall through
3048 			case Item_func::ISNULL_FUNC: vop= OP_NULL; break;
3049 			case Item_func::IN_FUNC:     vop= OP_IN; /* fall through */
3050       case Item_func::BETWEEN:
3051         ismul= true;
3052         neg= ((Item_func_opt_neg *)condf)->negated;
3053         break;
3054       default: return NULL;
3055       } // endswitch functype
3056 
3057     if (condf->argument_count() < 2)
3058       return NULL;
3059     else if (ismul && tty == TYPE_AM_WMI)
3060       return NULL;        // Not supported by WQL
3061 
3062     if (x && (neg || !(vop == OP_EQ || vop == OP_IN || vop == OP_NULL)))
3063       return NULL;
3064 
3065     for (i= 0; i < condf->argument_count(); i++) {
3066       if (trace(1))
3067         htrc("Argtype(%d)=%d\n", i, args[i]->type());
3068 
3069       if (i >= 2 && !ismul) {
3070         if (trace(1))
3071           htrc("Unexpected arg for vop=%d\n", vop);
3072 
3073         continue;
3074         } // endif i
3075 
3076       if ((iscol= args[i]->type() == COND::FIELD_ITEM)) {
3077         const char *fnm;
3078         ha_field_option_struct *fop;
3079         Item_field *pField= (Item_field *)args[i];
3080 
3081 				// IN and BETWEEN clauses should be col VOP list
3082 				if (i && (x || ismul))
3083           return NULL;	// IN and BETWEEN clauses should be col VOP list
3084 				else if (pField->field->table != table)
3085 					return NULL;  // Field does not belong to this table
3086 				else if (tty != TYPE_AM_WMI && IsIndexed(pField->field))
3087 					return NULL;  // Will be handled by ReadKey
3088         else
3089           fop= GetFieldOptionStruct(pField->field);
3090 
3091         if (fop && fop->special) {
3092           if (tty == TYPE_AM_TBL && !stricmp(fop->special, "TABID"))
3093             fnm= "TABID";
3094           else if (tty == TYPE_AM_PLG)
3095             fnm= fop->special;
3096           else
3097             return NULL;
3098 
3099 				} else if (tty == TYPE_AM_TBL) {
3100 					return NULL;
3101 				} else {
3102 					bool h;
3103 
3104 					fnm= filp->Chk(pField->field->field_name.str, &h);
3105 
3106 					if (h && i && !ishav)
3107 						return NULL;			// Having should be	col VOP arg
3108 					else
3109 						ishav= h;
3110 
3111 				}	// endif's
3112 
3113         if (trace(1)) {
3114           htrc("Field index=%d\n", pField->field->field_index);
3115           htrc("Field name=%s\n", pField->field->field_name.str);
3116           htrc("Field type=%d\n", pField->field->type());
3117           htrc("Field_type=%d\n", args[i]->field_type());
3118           } // endif trace
3119 
3120         strcat((ishav ? havg : body), fnm);
3121       } else if (args[i]->type() == COND::FUNC_ITEM) {
3122         if (tty == TYPE_AM_MYSQL) {
3123           if (!CheckCond(g, filp, args[i]))
3124             return NULL;
3125 
3126         } else
3127           return NULL;
3128 
3129       } else {
3130         char    buff[256];
3131         String *res, tmp(buff, sizeof(buff), &my_charset_bin);
3132         Item *pval= (Item *)args[i];
3133         Item::Type type= args[i]->real_type();
3134 
3135         switch (type) {
3136           case COND::CONST_ITEM:
3137           case COND::NULL_ITEM:
3138           case COND::CACHE_ITEM:
3139             break;
3140           default:
3141             return NULL;
3142           } // endswitch type
3143 
3144         if ((res= pval->val_str(&tmp)) == NULL)
3145           return NULL;                      // To be clarified
3146 
3147         if (trace(1))
3148           htrc("Value=%.*s\n", res->length(), res->ptr());
3149 
3150         // IN and BETWEEN clauses should be col VOP list
3151         if (!i && (x || ismul))
3152           return NULL;
3153 
3154         if (!x) {
3155 					const char *p;
3156 					char *s= (ishav) ? havg : body;
3157 					uint	j, k, n;
3158 
3159           // Append the value to the filter
3160           switch (args[i]->field_type()) {
3161             case MYSQL_TYPE_TIMESTAMP:
3162             case MYSQL_TYPE_DATETIME:
3163               if (tty == TYPE_AM_ODBC) {
3164                 strcat(s, "{ts '");
3165                 strncat(s, res->ptr(), res->length());
3166 
3167                 if (res->length() < 19)
3168                   strcat(s, &"1970-01-01 00:00:00"[res->length()]);
3169 
3170                 strcat(s, "'}");
3171                 break;
3172                 } // endif ODBC
3173 		// fall through
3174             case MYSQL_TYPE_DATE:
3175               if (tty == TYPE_AM_ODBC) {
3176                 strcat(s, "{d '");
3177                 strcat(strncat(s, res->ptr(), res->length()), "'}");
3178                 break;
3179                 } // endif ODBC
3180 		// fall through
3181 
3182             case MYSQL_TYPE_TIME:
3183               if (tty == TYPE_AM_ODBC) {
3184                 strcat(s, "{t '");
3185                 strcat(strncat(s, res->ptr(), res->length()), "'}");
3186                 break;
3187                 } // endif ODBC
3188 		// fall through
3189 
3190             case MYSQL_TYPE_VARCHAR:
3191               if (tty == TYPE_AM_ODBC && i) {
3192                 switch (args[0]->field_type()) {
3193                   case MYSQL_TYPE_TIMESTAMP:
3194                   case MYSQL_TYPE_DATETIME:
3195                     strcat(s, "{ts '");
3196                     strncat(s, res->ptr(), res->length());
3197 
3198                     if (res->length() < 19)
3199                       strcat(s, &"1970-01-01 00:00:00"[res->length()]);
3200 
3201                     strcat(s, "'}");
3202                     break;
3203                   case MYSQL_TYPE_DATE:
3204                     strcat(s, "{d '");
3205                     strncat(s, res->ptr(), res->length());
3206                     strcat(s, "'}");
3207                     break;
3208                   case MYSQL_TYPE_TIME:
3209                     strcat(s, "{t '");
3210                     strncat(s, res->ptr(), res->length());
3211                     strcat(s, "'}");
3212                     break;
3213                   default:
3214 										j= strlen(s);
3215 										s[j++]= '\'';
3216 										p= res->ptr();
3217 										n= res->length();
3218 
3219 										for (k= 0; k < n; k++) {
3220 											if (p[k] == '\'')
3221 												s[j++]= '\'';
3222 
3223 											s[j++]= p[k];
3224 										} // endfor k
3225 
3226 										s[j++]= '\'';
3227 										s[j]= 0;
3228 								} // endswitch field type
3229 
3230               } else {
3231 								j= strlen(s);
3232 								s[j++]= '\'';
3233 								p= res->ptr();
3234 								n= res->length();
3235 
3236 								for (k= 0; k < n; k++) {
3237 									if (p[k] == '\'')
3238 										s[j++]= '\'';
3239 
3240 									s[j++]= p[k];
3241 								} // endfor k
3242 
3243 								s[j++]= '\'';
3244 								s[j]= 0;
3245 							} // endif tty
3246 
3247               break;
3248             default:
3249               strncat(s, res->ptr(), res->length());
3250             } // endswitch field type
3251 
3252         } else {
3253           if (args[i]->field_type() == MYSQL_TYPE_VARCHAR) {
3254             // Add the command to the list
3255             PCMD *ncp, cmdp= new(g) CMD(g, (char*)res->c_ptr());
3256 
3257             for (ncp= &filp->Cmds; *ncp; ncp= &(*ncp)->Next) ;
3258 
3259             *ncp= cmdp;
3260           } else
3261             return NULL;
3262 
3263         } // endif x
3264 
3265       } // endif's Type
3266 
3267       if (!x) {
3268 				char *s= (ishav) ? havg : body;
3269 
3270 				if (!i)
3271           strcat(s, GetValStr(vop, neg));
3272         else if (vop == OP_XX && i == 1)
3273           strcat(s, " AND ");
3274         else if (vop == OP_IN)
3275           strcat(s, (i == condf->argument_count() - 1) ? ")" : ",");
3276 
3277         } // endif x
3278 
3279       } // endfor i
3280 
3281 			if (x)
3282 				filp->Op= vop;
3283 			else if (ishav)
3284 				filp->Hv= true;
3285 			else
3286 				filp->Bd= true;
3287 
3288   } else {
3289     if (trace(1))
3290       htrc("Unsupported condition\n");
3291 
3292     return NULL;
3293   } // endif's type
3294 
3295   return filp;
3296 } // end of CheckCond
3297 
3298 
3299  /**
3300    Push condition down to the table handler.
3301 
3302    @param  cond   Condition to be pushed. The condition tree must not be
3303                   modified by the caller.
3304 
3305    @return
3306      The 'remainder' condition that caller must use to filter out records.
3307      NULL means the handler will not return rows that do not match the
3308      passed condition.
3309 
3310    @note
3311      CONNECT handles the filtering only for table types that construct
3312      an SQL or WQL query, but still leaves it to MySQL because only some
3313      parts of the filter may be relevant.
3314      The first suballocate finds the position where the string will be
3315      constructed in the sarea. The second one does make the suballocation
3316      with the proper length.
3317  */
cond_push(const COND * cond)3318 const COND *ha_connect::cond_push(const COND *cond)
3319 {
3320   DBUG_ENTER("ha_connect::cond_push");
3321 
3322   if (tdbp && CondPushEnabled()) {
3323     PGLOBAL& g= xp->g;
3324     AMT      tty= tdbp->GetAmType();
3325     bool     x= (tty == TYPE_AM_MYX || tty == TYPE_AM_XDBC);
3326     bool     b= (tty == TYPE_AM_WMI || tty == TYPE_AM_ODBC  ||
3327                  tty == TYPE_AM_TBL || tty == TYPE_AM_MYSQL ||
3328 								 tty == TYPE_AM_PLG || tty == TYPE_AM_JDBC  || x);
3329 
3330 		// This should never happen but is done to avoid crashing
3331 		try {
3332 			if (b) {
3333 				PCFIL filp;
3334 				int   rc;
3335 
3336 				if ((filp= tdbp->GetCondFil()) && tdbp->GetCond() == cond &&
3337 					filp->Idx == active_index && filp->Type == tty)
3338 					goto fin;
3339 
3340 				filp= new(g) CONDFIL(active_index, tty);
3341 				rc= filp->Init(g, this);
3342 
3343 				if (rc == RC_INFO) {
3344 					filp->Having= (char*)PlugSubAlloc(g, NULL, 256);
3345 					*filp->Having= 0;
3346 				} else if (rc == RC_FX)
3347 					goto fin;
3348 
3349 				filp->Body= (char*)PlugSubAlloc(g, NULL, (x) ? 128 : 0);
3350 				*filp->Body= 0;
3351 
3352 				if (CheckCond(g, filp, cond)) {
3353 					if (filp->Having && strlen(filp->Having) > 255)
3354 						goto fin;								// Memory collapse
3355 
3356 					if (trace(1))
3357 						htrc("cond_push: %s\n", filp->Body);
3358 
3359 					tdbp->SetCond(cond);
3360 
3361 					if (!x)
3362 						PlugSubAlloc(g, NULL, strlen(filp->Body) + 1);
3363 					else
3364 						cond= NULL;             // Does this work?
3365 
3366 					tdbp->SetCondFil(filp);
3367 				} else if (x && cond)
3368 					tdbp->SetCondFil(filp);   // Wrong filter
3369 
3370 			} else if (tdbp->CanBeFiltered()) {
3371 				if (!tdbp->GetCond() || tdbp->GetCond() != cond) {
3372 					tdbp->SetFilter(CondFilter(g, (Item *)cond));
3373 
3374 					if (tdbp->GetFilter())
3375 					  tdbp->SetCond(cond);
3376 
3377 			  } // endif cond
3378 
3379 			}	// endif tty
3380 
3381 		} catch (int n) {
3382 			if (trace(1))
3383 				htrc("Exception %d: %s\n", n, g->Message);
3384 		} catch (const char *msg) {
3385 			strcpy(g->Message, msg);
3386 		} // end catch
3387 
3388 	fin:;
3389   } // endif tdbp
3390 
3391   // Let MySQL do the filtering
3392   DBUG_RETURN(cond);
3393 } // end of cond_push
3394 
3395 /**
3396   Number of rows in table. It will only be called if
3397   (table_flags() & (HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT)) != 0
3398 */
records()3399 ha_rows ha_connect::records()
3400 {
3401   if (!valid_info)
3402     info(HA_STATUS_VARIABLE);
3403 
3404   if (tdbp)
3405     return stats.records;
3406   else
3407     return HA_POS_ERROR;
3408 
3409 } // end of records
3410 
3411 
check(THD * thd,HA_CHECK_OPT * check_opt)3412 int ha_connect::check(THD* thd, HA_CHECK_OPT* check_opt)
3413 {
3414 	int     rc= HA_ADMIN_OK;
3415 	PGLOBAL g= ((table && table->in_use) ? GetPlug(table->in_use, xp) :
3416 		(xp) ? xp->g : NULL);
3417 	DBUG_ENTER("ha_connect::check");
3418 
3419 	if (!g || !table || xmod != MODE_READ)
3420 		DBUG_RETURN(HA_ADMIN_INTERNAL_ERROR);
3421 
3422 	// Do not close the table if it was opened yet (possible?)
3423 	if (IsOpened()) {
3424 		if (IsPartitioned() && CheckColumnList(g)) // map can have been changed
3425 			rc= HA_ADMIN_CORRUPT;
3426 		else if (tdbp->OpenDB(g))      // Rewind table
3427 			rc= HA_ADMIN_CORRUPT;
3428 
3429 	} else if (xp->CheckQuery(valid_query_id)) {
3430 		tdbp= NULL;       // Not valid anymore
3431 
3432 		if (OpenTable(g, false))
3433 			rc= HA_ADMIN_CORRUPT;
3434 
3435 	} else // possible?
3436 		DBUG_RETURN(HA_ADMIN_INTERNAL_ERROR);
3437 
3438 	if (rc == HA_ADMIN_OK) {
3439 		TABTYPE type= GetTypeID(GetStringOption("Type", "*"));
3440 
3441 		if (IsFileType(type)) {
3442 			if (check_opt->flags & T_MEDIUM) {
3443 				// TO DO
3444 				do {
3445 					if ((rc= CntReadNext(g, tdbp)) == RC_FX)
3446 						break;
3447 
3448 				} while (rc != RC_EF);
3449 
3450 				rc= (rc == RC_EF) ? HA_ADMIN_OK : HA_ADMIN_CORRUPT;
3451 			} else if (check_opt->flags & T_EXTEND) {
3452 				// TO DO
3453 			} // endif's flags
3454 
3455 		} // endif file type
3456 
3457 	} else
3458 		PushWarning(g, thd, 1);
3459 
3460 	DBUG_RETURN(rc);
3461 }	// end of check
3462 
3463 
3464 /**
3465   Return an error message specific to this handler.
3466 
3467   @param error  error code previously returned by handler
3468   @param buf    pointer to String where to add error message
3469 
3470   @return
3471     Returns true if this is a temporary error
3472 */
get_error_message(int error,String * buf)3473 bool ha_connect::get_error_message(int error, String* buf)
3474 {
3475   DBUG_ENTER("ha_connect::get_error_message");
3476 
3477 	if (xp && xp->g) {
3478 		PGLOBAL g= xp->g;
3479 
3480 		if (trace(1))
3481 			htrc("GEM(%d): %s\n", error, g->Message);
3482 
3483 		buf->append(ErrConvString(g->Message, strlen(g->Message),
3484 			&my_charset_latin1).ptr());
3485 	} else
3486     buf->append("Cannot retrieve error message");
3487 
3488   DBUG_RETURN(false);
3489 } // end of get_error_message
3490 
3491 /**
3492   Convert a filename partition name to system
3493 */
decode(PGLOBAL g,const char * pn)3494 static char *decode(PGLOBAL g, const char *pn)
3495   {
3496   char  *buf= (char*)PlugSubAlloc(g, NULL, strlen(pn) + 1);
3497   uint   dummy_errors;
3498   uint32 len= copy_and_convert(buf, strlen(pn) + 1,
3499                                system_charset_info,
3500                                pn, strlen(pn),
3501                                &my_charset_filename,
3502                                &dummy_errors);
3503   buf[len]= '\0';
3504   return buf;
3505   } // end of decode
3506 
3507 /**
3508   @brief
3509   Used for opening tables. The name will be the name of the file.
3510 
3511   @details
3512   A table is opened when it needs to be opened; e.g. when a request comes in
3513   for a SELECT on the table (tables are not open and closed for each request,
3514   they are cached).
3515 
3516   Called from handler.cc by handler::ha_open(). The server opens all tables by
3517   calling ha_open() which then calls the handler specific open().
3518 
3519   @note
3520   For CONNECT no open can be done here because field information is not yet
3521   updated. >>>>> TO BE CHECKED <<<<<
3522   (Thread information could be get by using 'ha_thd')
3523 
3524   @see
3525   handler::ha_open() in handler.cc
3526 */
open(const char * name,int mode,uint test_if_locked)3527 int ha_connect::open(const char *name, int mode, uint test_if_locked)
3528 {
3529   int rc= 0;
3530   DBUG_ENTER("ha_connect::open");
3531 
3532   if (trace(1))
3533      htrc("open: name=%s mode=%d test=%u\n", name, mode, test_if_locked);
3534 
3535   if (!(share= get_share()))
3536     DBUG_RETURN(1);
3537 
3538   thr_lock_data_init(&share->lock,&lock,NULL);
3539 
3540   // Try to get the user if possible
3541   xp= GetUser(ha_thd(), xp);
3542   PGLOBAL g= (xp) ? xp->g : NULL;
3543 
3544   // Try to set the database environment
3545   if (g) {
3546     rc= (CntCheckDB(g, this, name)) ? (-2) : 0;
3547 
3548     if (g->Mrr) {
3549       // This should only happen for the mrr secondary handler
3550       mrr= true;
3551       g->Mrr= false;
3552     } else
3553       mrr= false;
3554 
3555 #if defined(WITH_PARTITION_STORAGE_ENGINE)
3556     if (table->part_info) {
3557       if (GetStringOption("Filename") || GetStringOption("Tabname")
3558                                       || GetStringOption("Connect")) {
3559         strncpy(partname, decode(g, strrchr(name, '#') + 1), sizeof(partname) - 1);
3560 //      strcpy(partname, table->part_info->curr_part_elem->partition_name);
3561 //      part_id= &table->part_info->full_part_field_set;
3562       } else       // Inward table
3563         strncpy(partname, strrchr(name, slash) + 1, sizeof(partname) - 1);
3564 
3565       part_id= &table->part_info->full_part_field_set; // Temporary
3566       } // endif part_info
3567 #endif   // WITH_PARTITION_STORAGE_ENGINE
3568   } else
3569     rc= HA_ERR_INTERNAL_ERROR;
3570 
3571   DBUG_RETURN(rc);
3572 } // end of open
3573 
3574 /**
3575   @brief
3576   Make the indexes for this table
3577 */
optimize(THD * thd,HA_CHECK_OPT *)3578 int ha_connect::optimize(THD* thd, HA_CHECK_OPT*)
3579 {
3580   int      rc= 0;
3581   PGLOBAL& g= xp->g;
3582   PDBUSER  dup= PlgGetUser(g);
3583 
3584 	try {
3585 		// Ignore error on the opt file
3586 		dup->Check &= ~CHK_OPT;
3587 		tdbp= GetTDB(g);
3588 		dup->Check |= CHK_OPT;
3589 
3590 		if (tdbp && !tdbp->IsRemote()) {
3591 			bool dop= IsTypeIndexable(GetRealType(NULL));
3592 			bool dox= (tdbp->GetDef()->Indexable() == 1);
3593 
3594 			if ((rc= ((PTDBASE)tdbp)->ResetTableOpt(g, dop, dox))) {
3595 				if (rc == RC_INFO) {
3596 					push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
3597 					rc= 0;
3598 				} else
3599 					rc= HA_ERR_CRASHED_ON_USAGE;		// Table must be repaired
3600 
3601 			} // endif rc
3602 
3603 		} else if (!tdbp)
3604 			rc= HA_ERR_INTERNAL_ERROR;
3605 
3606 	} catch (int n) {
3607 		if (trace(1))
3608 			htrc("Exception %d: %s\n", n, g->Message);
3609 		rc= HA_ERR_INTERNAL_ERROR;
3610 	} catch (const char *msg) {
3611 		strcpy(g->Message, msg);
3612 		rc= HA_ERR_INTERNAL_ERROR;
3613 	} // end catch
3614 
3615 	if (rc)
3616 		my_message(ER_WARN_DATA_OUT_OF_RANGE, g->Message, MYF(0));
3617 
3618   return rc;
3619 } // end of optimize
3620 
3621 /**
3622   @brief
3623   Closes a table.
3624 
3625   @details
3626   Called from sql_base.cc, sql_select.cc, and table.cc. In sql_select.cc it is
3627   only used to close up temporary tables or during the process where a
3628   temporary table is converted over to being a myisam table.
3629 
3630   For sql_base.cc look at close_data_tables().
3631 
3632   @see
3633   sql_base.cc, sql_select.cc and table.cc
3634 */
close(void)3635 int ha_connect::close(void)
3636 {
3637   int rc= 0;
3638   DBUG_ENTER("ha_connect::close");
3639 
3640   // If this is called by a later query, the table may have
3641   // been already closed and the tdbp is not valid anymore.
3642   if (tdbp && xp->last_query_id == valid_query_id)
3643     rc= CloseTable(xp->g);
3644 
3645   DBUG_RETURN(rc);
3646 } // end of close
3647 
3648 
3649 /**
3650   @brief
3651   write_row() inserts a row. No extra() hint is given currently if a bulk load
3652   is happening. buf() is a byte array of data. You can use the field
3653   information to extract the data from the native byte array type.
3654 
3655     @details
3656   Example of this would be:
3657     @code
3658   for (Field **field=table->field ; *field ; field++)
3659   {
3660     ...
3661   }
3662     @endcode
3663 
3664   See ha_tina.cc for an example of extracting all of the data as strings.
3665   ha_berekly.cc has an example of how to store it intact by "packing" it
3666   for ha_berkeley's own native storage type.
3667 
3668   See the note for update_row() on auto_increments and timestamps. This
3669   case also applies to write_row().
3670 
3671   Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
3672   sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
3673 
3674     @see
3675   item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
3676   sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc and sql_update.cc
3677 */
write_row(const uchar * buf)3678 int ha_connect::write_row(const uchar *buf)
3679 {
3680   int      rc= 0;
3681   PGLOBAL& g= xp->g;
3682   DBUG_ENTER("ha_connect::write_row");
3683 
3684   // This is not tested yet
3685   if (xmod == MODE_ALTER) {
3686     if (IsPartitioned() && GetStringOption("Filename", NULL))
3687       // Why does this happen now that check_if_supported_inplace_alter is called?
3688       DBUG_RETURN(0);     // Alter table on an outward partition table
3689 
3690     xmod= MODE_INSERT;
3691   } else if (xmod == MODE_ANY)
3692     DBUG_RETURN(0);       // Probably never met
3693 
3694   // Open the table if it was not opened yet (locked)
3695   if (!IsOpened() || xmod != tdbp->GetMode()) {
3696     if (IsOpened())
3697       CloseTable(g);
3698 
3699     if ((rc= OpenTable(g)))
3700       DBUG_RETURN(rc);
3701 
3702     } // endif isopened
3703 
3704 #if 0                // AUTO_INCREMENT NIY
3705   if (table->next_number_field && buf == table->record[0]) {
3706     int error;
3707 
3708     if ((error= update_auto_increment()))
3709       return error;
3710 
3711     } // endif nex_number_field
3712 #endif // 0
3713 
3714   // Set column values from the passed pseudo record
3715   if ((rc= ScanRecord(g, buf)))
3716     DBUG_RETURN(rc);
3717 
3718   // Return result code from write operation
3719   if (CntWriteRow(g, tdbp)) {
3720     DBUG_PRINT("write_row", ("%s", g->Message));
3721     htrc("write_row: %s\n", g->Message);
3722     rc= HA_ERR_INTERNAL_ERROR;
3723   } else                // Table is modified
3724     nox= false;         // Indexes to be remade
3725 
3726   DBUG_RETURN(rc);
3727 } // end of write_row
3728 
3729 
3730 /**
3731   @brief
3732   Yes, update_row() does what you expect, it updates a row. old_data will have
3733   the previous row record in it, while new_data will have the newest data in it.
3734   Keep in mind that the server can do updates based on ordering if an ORDER BY
3735   clause was used. Consecutive ordering is not guaranteed.
3736 
3737     @details
3738   Currently new_data will not have an updated auto_increament record, or
3739   and updated timestamp field. You can do these for example by doing:
3740     @code
3741   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
3742     table->timestamp_field->set_time();
3743   if (table->next_number_field && record == table->record[0])
3744     update_auto_increment();
3745     @endcode
3746 
3747   Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
3748 
3749     @see
3750   sql_select.cc, sql_acl.cc, sql_update.cc and sql_insert.cc
3751 */
update_row(const uchar * old_data,const uchar * new_data)3752 int ha_connect::update_row(const uchar *old_data, const uchar *new_data)
3753 {
3754   int      rc= 0;
3755   PGLOBAL& g= xp->g;
3756   DBUG_ENTER("ha_connect::update_row");
3757 
3758   if (trace(2))
3759     htrc("update_row: old=%s new=%s\n", old_data, new_data);
3760 
3761   // Check values for possible change in indexed column
3762   if ((rc= CheckRecord(g, old_data, new_data)))
3763     DBUG_RETURN(rc);
3764 
3765   if (CntUpdateRow(g, tdbp)) {
3766     DBUG_PRINT("update_row", ("%s", g->Message));
3767     htrc("update_row CONNECT: %s\n", g->Message);
3768     rc= HA_ERR_INTERNAL_ERROR;
3769   } else
3770     nox= false;               // Table is modified
3771 
3772   DBUG_RETURN(rc);
3773 } // end of update_row
3774 
3775 
3776 /**
3777   @brief
3778   This will delete a row. buf will contain a copy of the row to be deleted.
3779   The server will call this right after the current row has been called (from
3780   either a previous rnd_nexT() or index call).
3781 
3782   @details
3783   If you keep a pointer to the last row or can access a primary key it will
3784   make doing the deletion quite a bit easier. Keep in mind that the server does
3785   not guarantee consecutive deletions. ORDER BY clauses can be used.
3786 
3787   Called in sql_acl.cc and sql_udf.cc to manage internal table
3788   information.  Called in sql_delete.cc, sql_insert.cc, and
3789   sql_select.cc. In sql_select it is used for removing duplicates
3790   while in insert it is used for REPLACE calls.
3791 
3792   @see
3793   sql_acl.cc, sql_udf.cc, sql_delete.cc, sql_insert.cc and sql_select.cc
3794 */
delete_row(const uchar *)3795 int ha_connect::delete_row(const uchar *)
3796 {
3797   int rc= 0;
3798   DBUG_ENTER("ha_connect::delete_row");
3799 
3800   if (CntDeleteRow(xp->g, tdbp, false)) {
3801     rc= HA_ERR_INTERNAL_ERROR;
3802     htrc("delete_row CONNECT: %s\n", xp->g->Message);
3803   } else
3804     nox= false;             // To remake indexes
3805 
3806   DBUG_RETURN(rc);
3807 } // end of delete_row
3808 
3809 
3810 /****************************************************************************/
3811 /*  We seem to come here at the begining of an index use.                   */
3812 /****************************************************************************/
index_init(uint idx,bool sorted)3813 int ha_connect::index_init(uint idx, bool sorted)
3814 {
3815   int rc;
3816   PGLOBAL& g= xp->g;
3817   DBUG_ENTER("index_init");
3818 
3819   if (trace(1))
3820     htrc("index_init: this=%p idx=%u sorted=%d\n", this, idx, sorted);
3821 
3822   if (GetIndexType(GetRealType()) == 2) {
3823     if (xmod == MODE_READ)
3824       // This is a remote index
3825       xmod= MODE_READX;
3826 
3827     if (!(rc= rnd_init(0))) {
3828 //    if (xmod == MODE_READX) {
3829         active_index= idx;
3830         indexing= IsUnique(idx) ? 1 : 2;
3831 //    } else {
3832 //      active_index= MAX_KEY;
3833 //      indexing= 0;
3834 //    } // endif xmod
3835 
3836       } //endif rc
3837 
3838     DBUG_RETURN(rc);
3839     } // endif index type
3840 
3841   if ((rc= rnd_init(0)))
3842     DBUG_RETURN(rc);
3843 
3844   if (locked == 2) {
3845     // Indexes are not updated in lock write mode
3846     active_index= MAX_KEY;
3847     indexing= 0;
3848     DBUG_RETURN(0);
3849     } // endif locked
3850 
3851   indexing= CntIndexInit(g, tdbp, (signed)idx, sorted);
3852 
3853   if (indexing <= 0) {
3854     DBUG_PRINT("index_init", ("%s", g->Message));
3855     htrc("index_init CONNECT: %s\n", g->Message);
3856     active_index= MAX_KEY;
3857     rc= HA_ERR_INTERNAL_ERROR;
3858   } else if (tdbp->GetKindex()) {
3859     if (((PTDBDOS)tdbp)->GetKindex()->GetNum_K()) {
3860       if (tdbp->GetFtype() != RECFM_NAF)
3861         ((PTDBDOS)tdbp)->GetTxfp()->ResetBuffer(g);
3862 
3863       active_index= idx;
3864 //  } else {        // Void table
3865 //    active_index= MAX_KEY;
3866 //    indexing= 0;
3867     } // endif Num
3868 
3869     rc= 0;
3870   } // endif indexing
3871 
3872   if (trace(1))
3873     htrc("index_init: rc=%d indexing=%d active_index=%d\n",
3874             rc, indexing, active_index);
3875 
3876   DBUG_RETURN(rc);
3877 } // end of index_init
3878 
3879 /****************************************************************************/
3880 /*  We seem to come here at the end of an index use.                        */
3881 /****************************************************************************/
index_end()3882 int ha_connect::index_end()
3883 {
3884   DBUG_ENTER("index_end");
3885   active_index= MAX_KEY;
3886   ds_mrr.dsmrr_close();
3887   DBUG_RETURN(rnd_end());
3888 } // end of index_end
3889 
3890 
3891 /****************************************************************************/
3892 /*  This is internally called by all indexed reading functions.             */
3893 /****************************************************************************/
ReadIndexed(uchar * buf,OPVAL op,const key_range * kr)3894 int ha_connect::ReadIndexed(uchar *buf, OPVAL op, const key_range *kr)
3895 {
3896   int rc;
3897 
3898 //statistic_increment(ha_read_key_count, &LOCK_status);
3899 
3900   switch (CntIndexRead(xp->g, tdbp, op, kr, mrr)) {
3901     case RC_OK:
3902       xp->fnd++;
3903       rc= MakeRecord((char*)buf);
3904       break;
3905     case RC_EF:         // End of file
3906       rc= HA_ERR_END_OF_FILE;
3907       break;
3908     case RC_NF:         // Not found
3909       xp->nfd++;
3910       rc= (op == OP_SAME) ? HA_ERR_END_OF_FILE : HA_ERR_KEY_NOT_FOUND;
3911       break;
3912     default:          // Read error
3913       DBUG_PRINT("ReadIndexed", ("%s", xp->g->Message));
3914       htrc("ReadIndexed: %s\n", xp->g->Message);
3915       rc= HA_ERR_INTERNAL_ERROR;
3916       break;
3917     } // endswitch RC
3918 
3919   if (trace(2))
3920     htrc("ReadIndexed: op=%d rc=%d\n", op, rc);
3921 
3922   table->status= (rc == RC_OK) ? 0 : STATUS_NOT_FOUND;
3923   return rc;
3924 } // end of ReadIndexed
3925 
3926 
3927 #ifdef NOT_USED
3928 /**
3929   @brief
3930   Positions an index cursor to the index specified in the handle. Fetches the
3931   row if available. If the key value is null, begin at the first key of the
3932   index.
3933 */
index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)3934 int ha_connect::index_read_map(uchar *buf, const uchar *key,
3935                                key_part_map keypart_map __attribute__((unused)),
3936                                enum ha_rkey_function find_flag
3937                                __attribute__((unused)))
3938 {
3939   DBUG_ENTER("ha_connect::index_read");
3940   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
3941 }
3942 #endif // NOT_USED
3943 
3944 
3945 /****************************************************************************/
3946 /*  This is called by handler::index_read_map.                              */
3947 /****************************************************************************/
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)3948 int ha_connect::index_read(uchar * buf, const uchar * key, uint key_len,
3949                            enum ha_rkey_function find_flag)
3950 {
3951   int rc;
3952   OPVAL op= OP_XX;
3953   DBUG_ENTER("ha_connect::index_read");
3954 
3955   switch(find_flag) {
3956     case HA_READ_KEY_EXACT:   op= OP_EQ; break;
3957     case HA_READ_AFTER_KEY:   op= OP_GT; break;
3958     case HA_READ_KEY_OR_NEXT: op= OP_GE; break;
3959     default: DBUG_RETURN(-1);      break;
3960     } // endswitch find_flag
3961 
3962   if (trace(2))
3963     htrc("%p index_read: op=%d\n", this, op);
3964 
3965   if (indexing > 0) {
3966 		start_key.key= key;
3967 		start_key.length= key_len;
3968 		start_key.flag= find_flag;
3969 		start_key.keypart_map= 0;
3970 
3971     rc= ReadIndexed(buf, op, &start_key);
3972 
3973     if (rc == HA_ERR_INTERNAL_ERROR) {
3974       nox= true;                  // To block making indexes
3975       abort= true;                // Don't rename temp file
3976       } // endif rc
3977 
3978   } else
3979     rc= HA_ERR_INTERNAL_ERROR;  // HA_ERR_KEY_NOT_FOUND ?
3980 
3981   DBUG_RETURN(rc);
3982 } // end of index_read
3983 
3984 
3985 /**
3986   @brief
3987   Used to read forward through the index.
3988 */
index_next(uchar * buf)3989 int ha_connect::index_next(uchar *buf)
3990 {
3991   int rc;
3992   DBUG_ENTER("ha_connect::index_next");
3993   //statistic_increment(ha_read_next_count, &LOCK_status);
3994 
3995   if (indexing > 0)
3996     rc= ReadIndexed(buf, OP_NEXT);
3997   else if (!indexing)
3998     rc= rnd_next(buf);
3999   else
4000     rc= HA_ERR_INTERNAL_ERROR;
4001 
4002   DBUG_RETURN(rc);
4003 } // end of index_next
4004 
4005 
4006 /**
4007   @brief
4008   Used to read backwards through the index.
4009 */
index_prev(uchar * buf)4010 int ha_connect::index_prev(uchar *buf)
4011 {
4012   DBUG_ENTER("ha_connect::index_prev");
4013   int rc;
4014 
4015   if (indexing > 0) {
4016     rc= ReadIndexed(buf, OP_PREV);
4017   } else
4018     rc= HA_ERR_WRONG_COMMAND;
4019 
4020   DBUG_RETURN(rc);
4021 } // end of index_prev
4022 
4023 
4024 /**
4025   @brief
4026   index_first() asks for the first key in the index.
4027 
4028     @details
4029   Called from opt_range.cc, opt_sum.cc, sql_handler.cc, and sql_select.cc.
4030 
4031     @see
4032   opt_range.cc, opt_sum.cc, sql_handler.cc and sql_select.cc
4033 */
index_first(uchar * buf)4034 int ha_connect::index_first(uchar *buf)
4035 {
4036   int rc;
4037   DBUG_ENTER("ha_connect::index_first");
4038 
4039   if (indexing > 0)
4040     rc= ReadIndexed(buf, OP_FIRST);
4041   else if (indexing < 0)
4042     rc= HA_ERR_INTERNAL_ERROR;
4043   else if (CntRewindTable(xp->g, tdbp)) {
4044     table->status= STATUS_NOT_FOUND;
4045     rc= HA_ERR_INTERNAL_ERROR;
4046   } else
4047     rc= rnd_next(buf);
4048 
4049   DBUG_RETURN(rc);
4050 } // end of index_first
4051 
4052 
4053 /**
4054   @brief
4055   index_last() asks for the last key in the index.
4056 
4057     @details
4058   Called from opt_range.cc, opt_sum.cc, sql_handler.cc, and sql_select.cc.
4059 
4060     @see
4061   opt_range.cc, opt_sum.cc, sql_handler.cc and sql_select.cc
4062 */
index_last(uchar * buf)4063 int ha_connect::index_last(uchar *buf)
4064 {
4065   DBUG_ENTER("ha_connect::index_last");
4066   int rc;
4067 
4068   if (indexing <= 0) {
4069     rc= HA_ERR_INTERNAL_ERROR;
4070   } else
4071     rc= ReadIndexed(buf, OP_LAST);
4072 
4073   DBUG_RETURN(rc);
4074 }
4075 
4076 
4077 /****************************************************************************/
4078 /*  This is called to get more rows having the same index value.            */
4079 /****************************************************************************/
4080 //t ha_connect::index_next_same(uchar *buf, const uchar *key, uint keylen)
index_next_same(uchar * buf,const uchar *,uint)4081 int ha_connect::index_next_same(uchar *buf, const uchar *, uint)
4082 {
4083   int rc;
4084   DBUG_ENTER("ha_connect::index_next_same");
4085 //statistic_increment(ha_read_next_count, &LOCK_status);
4086 
4087   if (!indexing)
4088     rc= rnd_next(buf);
4089   else if (indexing > 0)
4090     rc= ReadIndexed(buf, OP_SAME);
4091   else
4092     rc= HA_ERR_INTERNAL_ERROR;
4093 
4094   DBUG_RETURN(rc);
4095 } // end of index_next_same
4096 
4097 
4098 /**
4099   @brief
4100   rnd_init() is called when the system wants the storage engine to do a table
4101   scan. See the example in the introduction at the top of this file to see when
4102   rnd_init() is called.
4103 
4104     @details
4105   Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc, sql_table.cc,
4106   and sql_update.cc.
4107 
4108     @note
4109   We always call open and extern_lock/start_stmt before comming here.
4110 
4111     @see
4112   filesort.cc, records.cc, sql_handler.cc, sql_select.cc, sql_table.cc and sql_update.cc
4113 */
rnd_init(bool scan)4114 int ha_connect::rnd_init(bool scan)
4115 {
4116   PGLOBAL g= ((table && table->in_use) ? GetPlug(table->in_use, xp) :
4117               (xp) ? xp->g : NULL);
4118   DBUG_ENTER("ha_connect::rnd_init");
4119 
4120   // This is not tested yet
4121   if (xmod == MODE_ALTER) {
4122     xmod= MODE_READ;
4123     alter= 1;
4124     } // endif xmod
4125 
4126   if (trace(1))
4127     htrc("rnd_init: this=%p scan=%d xmod=%d alter=%d\n",
4128             this, scan, xmod, alter);
4129 
4130   if (!g || !table || xmod == MODE_INSERT)
4131     DBUG_RETURN(HA_ERR_INITIALIZATION);
4132 
4133   // Do not close the table if it was opened yet (locked?)
4134   if (IsOpened()) {
4135     if (IsPartitioned() && xmod != MODE_INSERT)
4136       if (CheckColumnList(g)) // map can have been changed
4137         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4138 
4139     if (tdbp->OpenDB(g))      // Rewind table
4140       DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4141     else
4142       DBUG_RETURN(0);
4143 
4144   } else if (xp->CheckQuery(valid_query_id))
4145     tdbp= NULL;       // Not valid anymore
4146 
4147   // When updating, to avoid skipped update, force the table
4148   // handler to retrieve write-only fields to be able to compare
4149   // records and detect data change.
4150   if (xmod == MODE_UPDATE)
4151     bitmap_union(table->read_set, table->write_set);
4152 
4153   if (OpenTable(g, xmod == MODE_DELETE))
4154     DBUG_RETURN(HA_ERR_INITIALIZATION);
4155 
4156   xp->nrd= xp->fnd= xp->nfd= 0;
4157   xp->tb1= my_interval_timer();
4158   DBUG_RETURN(0);
4159 } // end of rnd_init
4160 
4161 /**
4162   @brief
4163   Not described.
4164 
4165   @note
4166   The previous version said:
4167   Stop scanning of table. Note that this may be called several times during
4168   execution of a sub select.
4169   =====> This has been moved to external lock to avoid closing subselect tables.
4170 */
rnd_end()4171 int ha_connect::rnd_end()
4172 {
4173   int rc= 0;
4174   DBUG_ENTER("ha_connect::rnd_end");
4175 
4176   // If this is called by a later query, the table may have
4177   // been already closed and the tdbp is not valid anymore.
4178 //  if (tdbp && xp->last_query_id == valid_query_id)
4179 //    rc= CloseTable(xp->g);
4180 
4181   ds_mrr.dsmrr_close();
4182   DBUG_RETURN(rc);
4183 } // end of rnd_end
4184 
4185 
4186 /**
4187   @brief
4188   This is called for each row of the table scan. When you run out of records
4189   you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
4190   The Field structure for the table is the key to getting data into buf
4191   in a manner that will allow the server to understand it.
4192 
4193     @details
4194   Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc, sql_table.cc,
4195   and sql_update.cc.
4196 
4197     @see
4198   filesort.cc, records.cc, sql_handler.cc, sql_select.cc, sql_table.cc and sql_update.cc
4199 */
rnd_next(uchar * buf)4200 int ha_connect::rnd_next(uchar *buf)
4201 {
4202   int rc;
4203   DBUG_ENTER("ha_connect::rnd_next");
4204 //statistic_increment(ha_read_rnd_next_count, &LOCK_status);
4205 
4206   if (tdbp->GetMode() == MODE_ANY) {
4207     // We will stop on next read
4208     if (!stop) {
4209       stop= true;
4210       DBUG_RETURN(RC_OK);
4211     } else
4212       DBUG_RETURN(HA_ERR_END_OF_FILE);
4213 
4214     } // endif Mode
4215 
4216   switch (CntReadNext(xp->g, tdbp)) {
4217     case RC_OK:
4218       rc= MakeRecord((char*)buf);
4219       break;
4220     case RC_EF:         // End of file
4221       rc= HA_ERR_END_OF_FILE;
4222       break;
4223     case RC_NF:         // Not found
4224       rc= HA_ERR_RECORD_DELETED;
4225       break;
4226     default:            // Read error
4227       htrc("rnd_next CONNECT: %s\n", xp->g->Message);
4228       rc= (records()) ? HA_ERR_INTERNAL_ERROR : HA_ERR_END_OF_FILE;
4229       break;
4230     } // endswitch RC
4231 
4232   if (trace(2) && (rc || !(xp->nrd++ % 16384))) {
4233     ulonglong tb2= my_interval_timer();
4234     double elapsed= (double) (tb2 - xp->tb1) / 1000000000ULL;
4235     DBUG_PRINT("rnd_next", ("rc=%d nrd=%u fnd=%u nfd=%u sec=%.3lf\n",
4236                              rc, (uint)xp->nrd, (uint)xp->fnd,
4237                              (uint)xp->nfd, elapsed));
4238     htrc("rnd_next: rc=%d nrd=%u fnd=%u nfd=%u sec=%.3lf\n",
4239                              rc, (uint)xp->nrd, (uint)xp->fnd,
4240                              (uint)xp->nfd, elapsed);
4241     xp->tb1= tb2;
4242     xp->fnd= xp->nfd= 0;
4243     } // endif nrd
4244 
4245   table->status= (!rc) ? 0 : STATUS_NOT_FOUND;
4246   DBUG_RETURN(rc);
4247 } // end of rnd_next
4248 
4249 
4250 /**
4251   @brief
4252   position() is called after each call to rnd_next() if the data needs
4253   to be ordered. You can do something like the following to store
4254   the position:
4255     @code
4256   my_store_ptr(ref, ref_length, current_position);
4257     @endcode
4258 
4259     @details
4260   The server uses ref to store data. ref_length in the above case is
4261   the size needed to store current_position. ref is just a byte array
4262   that the server will maintain. If you are using offsets to mark rows, then
4263   current_position should be the offset. If it is a primary key like in
4264   BDB, then it needs to be a primary key.
4265 
4266   Called from filesort.cc, sql_select.cc, sql_delete.cc, and sql_update.cc.
4267 
4268     @see
4269   filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc
4270 */
position(const uchar *)4271 void ha_connect::position(const uchar *)
4272 {
4273   DBUG_ENTER("ha_connect::position");
4274   my_store_ptr(ref, ref_length, (my_off_t)tdbp->GetRecpos());
4275 
4276   if (trace(2))
4277     htrc("position: pos=%d\n", tdbp->GetRecpos());
4278 
4279   DBUG_VOID_RETURN;
4280 } // end of position
4281 
4282 
4283 /**
4284   @brief
4285   This is like rnd_next, but you are given a position to use
4286   to determine the row. The position will be of the type that you stored in
4287   ref. You can use my_get_ptr(pos,ref_length) to retrieve whatever key
4288   or position you saved when position() was called.
4289 
4290     @details
4291   Called from filesort.cc, records.cc, sql_insert.cc, sql_select.cc, and sql_update.cc.
4292 
4293     @note
4294   Is this really useful? It was never called even when sorting.
4295 
4296     @see
4297   filesort.cc, records.cc, sql_insert.cc, sql_select.cc and sql_update.cc
4298 */
rnd_pos(uchar * buf,uchar * pos)4299 int ha_connect::rnd_pos(uchar *buf, uchar *pos)
4300 {
4301   int     rc;
4302 //PTDBASE tp= (PTDBASE)tdbp;
4303   DBUG_ENTER("ha_connect::rnd_pos");
4304 
4305   if (!tdbp->SetRecpos(xp->g, (int)my_get_ptr(pos, ref_length))) {
4306     if (trace(1))
4307       htrc("rnd_pos: %d\n", tdbp->GetRecpos());
4308 
4309     tdbp->SetFilter(NULL);
4310     rc= rnd_next(buf);
4311 	} else {
4312 		PGLOBAL g= GetPlug((table) ? table->in_use : NULL, xp);
4313 //	strcpy(g->Message, "Not supported by this table type");
4314 		my_message(ER_ILLEGAL_HA, g->Message, MYF(0));
4315 		rc= HA_ERR_INTERNAL_ERROR;
4316 	}	// endif SetRecpos
4317 
4318   DBUG_RETURN(rc);
4319 } // end of rnd_pos
4320 
4321 
4322 /**
4323   @brief
4324   ::info() is used to return information to the optimizer. See my_base.h for
4325   the complete description.
4326 
4327     @details
4328   Currently this table handler doesn't implement most of the fields really needed.
4329   SHOW also makes use of this data.
4330 
4331   You will probably want to have the following in your code:
4332     @code
4333   if (records < 2)
4334     records= 2;
4335     @endcode
4336   The reason is that the server will optimize for cases of only a single
4337   record. If, in a table scan, you don't know the number of records, it
4338   will probably be better to set records to two so you can return as many
4339   records as you need. Along with records, a few more variables you may wish
4340   to set are:
4341     records
4342     deleted
4343     data_file_length
4344     index_file_length
4345     delete_length
4346     check_time
4347   Take a look at the public variables in handler.h for more information.
4348 
4349   Called in filesort.cc, ha_heap.cc, item_sum.cc, opt_sum.cc, sql_delete.cc,
4350   sql_delete.cc, sql_derived.cc, sql_select.cc, sql_select.cc, sql_select.cc,
4351   sql_select.cc, sql_select.cc, sql_show.cc, sql_show.cc, sql_show.cc, sql_show.cc,
4352   sql_table.cc, sql_union.cc, and sql_update.cc.
4353 
4354     @see
4355   filesort.cc, ha_heap.cc, item_sum.cc, opt_sum.cc, sql_delete.cc, sql_delete.cc,
4356   sql_derived.cc, sql_select.cc, sql_select.cc, sql_select.cc, sql_select.cc,
4357   sql_select.cc, sql_show.cc, sql_show.cc, sql_show.cc, sql_show.cc, sql_table.cc,
4358   sql_union.cc and sql_update.cc
4359 */
info(uint flag)4360 int ha_connect::info(uint flag)
4361 {
4362   bool    pure= false;
4363   PGLOBAL g= GetPlug((table) ? table->in_use : NULL, xp);
4364 
4365   DBUG_ENTER("ha_connect::info");
4366 
4367 	if (!g) {
4368 		my_message(ER_UNKNOWN_ERROR, "Cannot get g pointer", MYF(0));
4369 		DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4370 	}	// endif g
4371 
4372 	if (trace(1))
4373     htrc("%p In info: flag=%u valid_info=%d\n", this, flag, valid_info);
4374 
4375   // tdbp must be available to get updated info
4376   if (xp->CheckQuery(valid_query_id) || !tdbp) {
4377 
4378     if (xmod == MODE_ANY || xmod == MODE_ALTER) {
4379       // Pure info, not a query
4380       pure= true;
4381       xp->CheckCleanup(xmod == MODE_ANY && valid_query_id == 0);
4382       } // endif xmod
4383 
4384     // This is necessary for getting file length
4385 		if (table) {
4386 			if (SetDataPath(g, table->s->db.str)) {
4387 				my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
4388 				DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4389 			}	// endif SetDataPath
4390 
4391 		} else
4392       DBUG_RETURN(HA_ERR_INTERNAL_ERROR);       // Should never happen
4393 
4394 		if (!(tdbp= GetTDB(g))) {
4395 			my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
4396 			DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4397 		} // endif tdbp
4398 
4399     valid_info= false;
4400     } // endif tdbp
4401 
4402   if (!valid_info) {
4403     valid_info= CntInfo(g, tdbp, &xinfo);
4404 
4405     if (((signed)xinfo.records) < 0)
4406       DBUG_RETURN(HA_ERR_INITIALIZATION);  // Error in Cardinality
4407 
4408     } // endif valid_info
4409 
4410   if (flag & HA_STATUS_VARIABLE) {
4411     stats.records= xinfo.records;
4412     stats.deleted= 0;
4413     stats.data_file_length= xinfo.data_file_length;
4414     stats.index_file_length= 0;
4415     stats.delete_length= 0;
4416     stats.check_time= 0;
4417     stats.mean_rec_length= xinfo.mean_rec_length;
4418     } // endif HA_STATUS_VARIABLE
4419 
4420   if (flag & HA_STATUS_CONST) {
4421     // This is imported from the previous handler and must be reconsidered
4422     stats.max_data_file_length= 4294967295LL;
4423     stats.max_index_file_length= 4398046510080LL;
4424     stats.create_time= 0;
4425     data_file_name= xinfo.data_file_name;
4426     index_file_name= NULL;
4427 //  sortkey= (uint) - 1;           // Table is not sorted
4428     ref_length= sizeof(int);      // Pointer size to row
4429     table->s->db_options_in_use= 03;
4430     stats.block_size= 1024;
4431     table->s->keys_in_use.set_prefix(table->s->keys);
4432     table->s->keys_for_keyread= table->s->keys_in_use;
4433 //  table->s->keys_for_keyread.subtract(table->s->read_only_keys);
4434     table->s->db_record_offset= 0;
4435     } // endif HA_STATUS_CONST
4436 
4437   if (flag & HA_STATUS_ERRKEY) {
4438     errkey= 0;
4439     } // endif HA_STATUS_ERRKEY
4440 
4441   if (flag & HA_STATUS_TIME)
4442     stats.update_time= 0;
4443 
4444   if (flag & HA_STATUS_AUTO)
4445     stats.auto_increment_value= 1;
4446 
4447   if (tdbp && pure)
4448     CloseTable(g);        // Not used anymore
4449 
4450   DBUG_RETURN(0);
4451 } // end of info
4452 
4453 
4454 /**
4455   @brief
4456   extra() is called whenever the server wishes to send a hint to
4457   the storage engine. The myisam engine implements the most hints.
4458   ha_innodb.cc has the most exhaustive list of these hints.
4459 
4460   @note
4461   This is not yet implemented for CONNECT.
4462 
4463   @see
4464   ha_innodb.cc
4465 */
extra(enum ha_extra_function)4466 int ha_connect::extra(enum ha_extra_function /*operation*/)
4467 {
4468   DBUG_ENTER("ha_connect::extra");
4469   DBUG_RETURN(0);
4470 } // end of extra
4471 
4472 
4473 /**
4474   @brief
4475   Used to delete all rows in a table, including cases of truncate and cases where
4476   the optimizer realizes that all rows will be removed as a result of an SQL statement.
4477 
4478     @details
4479   Called from item_sum.cc by Item_func_group_concat::clear(),
4480   Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
4481   Called from sql_delete.cc by mysql_delete().
4482   Called from sql_select.cc by JOIN::reinit().
4483   Called from sql_union.cc by st_select_lex_unit::exec().
4484 
4485     @see
4486   Item_func_group_concat::clear(), Item_sum_count_distinct::clear() and
4487   Item_func_group_concat::clear() in item_sum.cc;
4488   mysql_delete() in sql_delete.cc;
4489   JOIN::reinit() in sql_select.cc and
4490   st_select_lex_unit::exec() in sql_union.cc.
4491 */
delete_all_rows()4492 int ha_connect::delete_all_rows()
4493 {
4494   int     rc= 0;
4495   PGLOBAL g= xp->g;
4496   DBUG_ENTER("ha_connect::delete_all_rows");
4497 
4498   if (tdbp && tdbp->GetUse() == USE_OPEN &&
4499       tdbp->GetAmType() != TYPE_AM_XML &&
4500       tdbp->GetFtype() != RECFM_NAF)
4501     // Close and reopen the table so it will be deleted
4502     rc= CloseTable(g);
4503 
4504   if (!(rc= OpenTable(g))) {
4505     if (CntDeleteRow(g, tdbp, true)) {
4506       htrc("%s\n", g->Message);
4507       rc= HA_ERR_INTERNAL_ERROR;
4508     } else
4509       nox= false;
4510 
4511     } // endif rc
4512 
4513   DBUG_RETURN(rc);
4514 } // end of delete_all_rows
4515 
4516 
checkPrivileges(THD * thd,TABTYPE type,PTOS options,const char * db,TABLE * table,bool quick)4517 static bool checkPrivileges(THD *thd, TABTYPE type, PTOS options,
4518                             const char *db, TABLE *table, bool quick)
4519 {
4520   switch (type) {
4521     case TAB_UNDEF:
4522 //  case TAB_CATLG:
4523     case TAB_PLG:
4524     case TAB_JCT:
4525     case TAB_DMY:
4526     case TAB_NIY:
4527       my_printf_error(ER_UNKNOWN_ERROR,
4528                       "Unsupported table type %s", MYF(0), options->type);
4529       return true;
4530 
4531     case TAB_DOS:
4532     case TAB_FIX:
4533     case TAB_BIN:
4534     case TAB_CSV:
4535     case TAB_FMT:
4536     case TAB_DBF:
4537     case TAB_XML:
4538     case TAB_INI:
4539     case TAB_VEC:
4540 		case TAB_REST:
4541     case TAB_JSON:
4542 #if defined(BSON_SUPPORT)
4543     case TAB_BSON:
4544 #endif   // BSON_SUPPORT
4545       if (options->filename && *options->filename) {
4546 				if (!quick) {
4547 					char path[FN_REFLEN], dbpath[FN_REFLEN];
4548 
4549  					strcpy(dbpath, mysql_real_data_home);
4550 
4551 					if (db)
4552 #if defined(_WIN32)
4553 						strcat(strcat(dbpath, db), "\\");
4554 #else   // !_WIN32
4555 						strcat(strcat(dbpath, db), "/");
4556 #endif  // !_WIN32
4557 
4558 					(void)fn_format(path, options->filename, dbpath, "",
4559 						MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
4560 
4561 					if (!is_secure_file_path(path)) {
4562 						my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
4563 						return true;
4564 					} // endif path
4565 
4566 				}	// endif !quick
4567 
4568 			} else
4569         return false;
4570 
4571 			// Fall through
4572 		case TAB_MYSQL:
4573 		case TAB_DIR:
4574 		case TAB_ZIP:
4575 		case TAB_OEM:
4576       if (table && table->pos_in_table_list) { // if SELECT
4577 #if MYSQL_VERSION_ID > 100200
4578 				Switch_to_definer_security_ctx backup_ctx(thd, table->pos_in_table_list);
4579 #endif // VERSION_ID > 100200
4580         return check_global_access(thd, FILE_ACL);
4581       }	else
4582         return check_global_access(thd, FILE_ACL);
4583     case TAB_ODBC:
4584 		case TAB_JDBC:
4585 		case TAB_MONGO:
4586     case TAB_MAC:
4587     case TAB_WMI:
4588 			return false;
4589     case TAB_TBL:
4590     case TAB_XCL:
4591     case TAB_PRX:
4592     case TAB_OCCUR:
4593     case TAB_PIVOT:
4594     case TAB_VIR:
4595     default:
4596 			// This is temporary until a solution is found
4597 			return false;
4598     } // endswitch type
4599 
4600   my_printf_error(ER_UNKNOWN_ERROR, "check_privileges failed", MYF(0));
4601   return true;
4602 } // end of checkPrivileges
4603 
4604 // Check whether the user has required (file) privileges
check_privileges(THD * thd,PTOS options,const char * dbn,bool quick)4605 bool ha_connect::check_privileges(THD *thd, PTOS options, const char *dbn,
4606 				  bool quick)
4607 {
4608   const char *db= (dbn && *dbn) ? dbn : NULL;
4609   TABTYPE     type=GetRealType(options);
4610 
4611   return checkPrivileges(thd, type, options, db, table, quick);
4612 } // end of check_privileges
4613 
4614 // Check that two indexes are equivalent
IsSameIndex(PIXDEF xp1,PIXDEF xp2)4615 bool ha_connect::IsSameIndex(PIXDEF xp1, PIXDEF xp2)
4616 {
4617   bool   b= true;
4618   PKPDEF kp1, kp2;
4619 
4620   if (stricmp(xp1->Name, xp2->Name))
4621     b= false;
4622   else if (xp1->Nparts  != xp2->Nparts  ||
4623            xp1->MaxSame != xp2->MaxSame ||
4624            xp1->Unique  != xp2->Unique)
4625     b= false;
4626   else for (kp1= xp1->ToKeyParts, kp2= xp2->ToKeyParts;
4627             b && (kp1 || kp2);
4628             kp1= kp1->Next, kp2= kp2->Next)
4629     if (!kp1 || !kp2)
4630       b= false;
4631     else if (stricmp(kp1->Name, kp2->Name))
4632       b= false;
4633     else if (kp1->Klen != kp2->Klen)
4634       b= false;
4635 
4636   return b;
4637 } // end of IsSameIndex
4638 
CheckMode(PGLOBAL g,THD * thd,MODE newmode,bool * chk,bool * cras)4639 MODE ha_connect::CheckMode(PGLOBAL g, THD *thd,
4640 	                         MODE newmode, bool *chk, bool *cras)
4641 {
4642 #if defined(DEVELOPMENT)
4643 	if (true) {
4644 #else
4645   if (trace(65)) {
4646 #endif
4647     LEX_STRING *query_string= thd_query_string(thd);
4648     htrc("%p check_mode: cmdtype=%d\n", this, thd_sql_command(thd));
4649     htrc("Cmd=%.*s\n", (int) query_string->length, query_string->str);
4650     } // endif trace
4651 
4652   // Next code is temporarily replaced until sql_command is set
4653   stop= false;
4654 
4655   if (newmode == MODE_WRITE) {
4656     switch (thd_sql_command(thd)) {
4657       case SQLCOM_LOCK_TABLES:
4658         locked= 2; // fall through
4659       case SQLCOM_CREATE_TABLE:
4660       case SQLCOM_INSERT:
4661       case SQLCOM_LOAD:
4662       case SQLCOM_INSERT_SELECT:
4663         newmode= MODE_INSERT;
4664         break;
4665 //    case SQLCOM_REPLACE:
4666 //    case SQLCOM_REPLACE_SELECT:
4667 //      newmode= MODE_UPDATE;               // To be checked
4668 //      break;
4669 			case SQLCOM_DELETE_MULTI:
4670 				*cras= true;
4671                                 // fall through
4672 			case SQLCOM_DELETE:
4673       case SQLCOM_TRUNCATE:
4674         newmode= MODE_DELETE;
4675         break;
4676       case SQLCOM_UPDATE_MULTI:
4677 				*cras= true;
4678                                 // fall through
4679 			case SQLCOM_UPDATE:
4680 				newmode= MODE_UPDATE;
4681         break;
4682       case SQLCOM_SELECT:
4683       case SQLCOM_OPTIMIZE:
4684         newmode= MODE_READ;
4685         break;
4686       case SQLCOM_FLUSH:
4687         locked= 0;
4688         // fall through
4689       case SQLCOM_DROP_TABLE:
4690       case SQLCOM_RENAME_TABLE:
4691         newmode= MODE_ANY;
4692         break;
4693       case SQLCOM_CREATE_VIEW:
4694       case SQLCOM_DROP_VIEW:
4695         newmode= MODE_ANY;
4696         break;
4697       case SQLCOM_ALTER_TABLE:
4698         newmode= MODE_ALTER;
4699         break;
4700       case SQLCOM_DROP_INDEX:
4701       case SQLCOM_CREATE_INDEX:
4702 //      if (!IsPartitioned()) {
4703           newmode= MODE_ANY;
4704           break;
4705 //        } // endif partitioned
4706 			case SQLCOM_REPAIR: // TODO implement it
4707 				newmode= MODE_UPDATE;
4708 				break;
4709 			default:
4710         htrc("Unsupported sql_command=%d\n", thd_sql_command(thd));
4711         strcpy(g->Message, "CONNECT Unsupported command");
4712         my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
4713         newmode= MODE_ERROR;
4714         break;
4715       } // endswitch newmode
4716 
4717   } else if (newmode == MODE_READ) {
4718     switch (thd_sql_command(thd)) {
4719       case SQLCOM_CREATE_TABLE:
4720         *chk= true;
4721 				break;
4722 			case SQLCOM_UPDATE_MULTI:
4723 			case SQLCOM_DELETE_MULTI:
4724 				*cras= true;
4725       case SQLCOM_INSERT:
4726       case SQLCOM_LOAD:
4727       case SQLCOM_INSERT_SELECT:
4728 //    case SQLCOM_REPLACE:
4729 //    case SQLCOM_REPLACE_SELECT:
4730       case SQLCOM_DELETE:
4731       case SQLCOM_TRUNCATE:
4732       case SQLCOM_UPDATE:
4733       case SQLCOM_SELECT:
4734       case SQLCOM_OPTIMIZE:
4735       case SQLCOM_SET_OPTION:
4736         break;
4737       case SQLCOM_LOCK_TABLES:
4738         locked= 1;
4739         break;
4740       case SQLCOM_DROP_TABLE:
4741       case SQLCOM_RENAME_TABLE:
4742         newmode= MODE_ANY;
4743         break;
4744       case SQLCOM_CREATE_VIEW:
4745       case SQLCOM_DROP_VIEW:
4746 			case SQLCOM_CREATE_TRIGGER:
4747 			case SQLCOM_DROP_TRIGGER:
4748 				newmode= MODE_ANY;
4749         break;
4750       case SQLCOM_ALTER_TABLE:
4751         *chk= true;
4752         newmode= MODE_ALTER;
4753         break;
4754       case SQLCOM_DROP_INDEX:
4755       case SQLCOM_CREATE_INDEX:
4756 //      if (!IsPartitioned()) {
4757           *chk= true;
4758           newmode= MODE_ANY;
4759           break;
4760 //        } // endif partitioned
4761 
4762 			case SQLCOM_CHECK:   // TODO implement it
4763 			case SQLCOM_ANALYZE: // TODO implement it
4764 			case SQLCOM_END:	   // Met in procedures: IF(EXISTS(SELECT...
4765 				newmode= MODE_READ;
4766         break;
4767       default:
4768         htrc("Unsupported sql_command=%d\n", thd_sql_command(thd));
4769         strcpy(g->Message, "CONNECT Unsupported command");
4770         my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
4771         newmode= MODE_ERROR;
4772         break;
4773       } // endswitch newmode
4774 
4775   } // endif's newmode
4776 
4777   if (trace(1))
4778     htrc("New mode=%d\n", newmode);
4779 
4780   return newmode;
4781 } // end of check_mode
4782 
4783 int ha_connect::start_stmt(THD *thd, thr_lock_type lock_type)
4784 {
4785   int     rc= 0;
4786   bool    chk=false, cras= false;
4787   MODE    newmode;
4788   PGLOBAL g= GetPlug(thd, xp);
4789   DBUG_ENTER("ha_connect::start_stmt");
4790 
4791   if (check_privileges(thd, GetTableOptionStruct(), table->s->db.str, true))
4792     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4793 
4794   // Action will depend on lock_type
4795   switch (lock_type) {
4796     case TL_WRITE_ALLOW_WRITE:
4797     case TL_WRITE_CONCURRENT_INSERT:
4798     case TL_WRITE_DELAYED:
4799     case TL_WRITE_DEFAULT:
4800     case TL_WRITE_LOW_PRIORITY:
4801     case TL_WRITE:
4802     case TL_WRITE_ONLY:
4803       newmode= MODE_WRITE;
4804       break;
4805     case TL_READ:
4806     case TL_READ_WITH_SHARED_LOCKS:
4807     case TL_READ_HIGH_PRIORITY:
4808     case TL_READ_NO_INSERT:
4809     case TL_READ_DEFAULT:
4810       newmode= MODE_READ;
4811       break;
4812     case TL_UNLOCK:
4813     default:
4814       newmode= MODE_ANY;
4815       break;
4816     } // endswitch mode
4817 
4818 	if (newmode == MODE_ANY) {
4819 		if (CloseTable(g)) {
4820 			// Make error a warning to avoid crash
4821 			push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
4822 			rc= 0;
4823 		} // endif Close
4824 
4825 		locked= 0;
4826 		xmod= MODE_ANY;              // For info commands
4827 		DBUG_RETURN(rc);
4828 	} // endif MODE_ANY
4829 
4830 	newmode= CheckMode(g, thd, newmode, &chk, &cras);
4831 
4832 	if (newmode == MODE_ERROR)
4833 		DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4834 
4835 	DBUG_RETURN(check_stmt(g, newmode, cras));
4836 } // end of start_stmt
4837 
4838 /**
4839   @brief
4840   This create a lock on the table. If you are implementing a storage engine
4841   that can handle transacations look at ha_berkely.cc to see how you will
4842   want to go about doing this. Otherwise you should consider calling flock()
4843   here. Hint: Read the section "locking functions for mysql" in lock.cc to understand
4844   this.
4845 
4846     @details
4847   Called from lock.cc by lock_external() and unlock_external(). Also called
4848   from sql_table.cc by copy_data_between_tables().
4849 
4850     @note
4851   Following what we did in the MySQL XDB handler, we use this call to actually
4852   physically open the table. This could be reconsider when finalizing this handler
4853   design, which means we have a better understanding of what MariaDB does.
4854 
4855     @see
4856   lock.cc by lock_external() and unlock_external() in lock.cc;
4857   the section "locking functions for mysql" in lock.cc;
4858   copy_data_between_tables() in sql_table.cc.
4859 
4860 */
4861 int ha_connect::external_lock(THD *thd, int lock_type)
4862 {
4863   int     rc= 0;
4864   bool    xcheck=false, cras= false;
4865   MODE    newmode;
4866   PTOS    options= GetTableOptionStruct();
4867   PGLOBAL g= GetPlug(thd, xp);
4868   DBUG_ENTER("ha_connect::external_lock");
4869 
4870   DBUG_ASSERT(thd == current_thd);
4871 
4872   if (trace(1))
4873     htrc("external_lock: this=%p thd=%p xp=%p g=%p lock_type=%d\n",
4874             this, thd, xp, g, lock_type);
4875 
4876   if (!g)
4877     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
4878 
4879   // Action will depend on lock_type
4880   switch (lock_type) {
4881     case F_WRLCK:
4882       newmode= MODE_WRITE;
4883       break;
4884     case F_RDLCK:
4885       newmode= MODE_READ;
4886       break;
4887     case F_UNLCK:
4888     default:
4889       newmode= MODE_ANY;
4890       break;
4891     } // endswitch mode
4892 
4893   if (newmode == MODE_ANY) {
4894     int sqlcom= thd_sql_command(thd);
4895 
4896     // This is unlocking, do it by closing the table
4897     if (xp->CheckQueryID() && sqlcom != SQLCOM_UNLOCK_TABLES
4898                            && sqlcom != SQLCOM_LOCK_TABLES
4899                            && sqlcom != SQLCOM_FLUSH
4900                            && sqlcom != SQLCOM_BEGIN
4901                            && sqlcom != SQLCOM_DROP_TABLE) {
4902       sprintf(g->Message, "external_lock: unexpected command %d", sqlcom);
4903       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
4904       DBUG_RETURN(0);
4905     } else if (g->Xchk) {
4906       if (!tdbp) {
4907 				if (!(tdbp= GetTDB(g))) {
4908 //        DBUG_RETURN(HA_ERR_INTERNAL_ERROR);  causes assert error
4909 					push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
4910 					DBUG_RETURN(0);
4911 				} else if (!tdbp->GetDef()->Indexable()) {
4912           sprintf(g->Message, "external_lock: Table %s is not indexable", tdbp->GetName());
4913 //        DBUG_RETURN(HA_ERR_INTERNAL_ERROR);  causes assert error
4914           push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
4915           DBUG_RETURN(0);
4916         } else if (tdbp->GetDef()->Indexable() == 1) {
4917           bool    oldsep= ((PCHK)g->Xchk)->oldsep;
4918           bool    newsep= ((PCHK)g->Xchk)->newsep;
4919           PTDBDOS tdp= (PTDBDOS)tdbp;
4920 
4921           PDOSDEF ddp= (PDOSDEF)tdp->GetDef();
4922           PIXDEF  xp, xp1, xp2, drp=NULL, adp= NULL;
4923           PIXDEF  oldpix= ((PCHK)g->Xchk)->oldpix;
4924           PIXDEF  newpix= ((PCHK)g->Xchk)->newpix;
4925           PIXDEF *xlst, *xprc;
4926 
4927           ddp->SetIndx(oldpix);
4928 
4929           if (oldsep != newsep) {
4930             // All indexes have to be remade
4931             ddp->DeleteIndexFile(g, NULL);
4932             oldpix= NULL;
4933             ddp->SetIndx(NULL);
4934             SetBooleanOption("Sepindex", newsep);
4935           } else if (newsep) {
4936             // Make the list of dropped indexes
4937             xlst= &drp; xprc= &oldpix;
4938 
4939             for (xp2= oldpix; xp2; xp2= xp) {
4940               for (xp1= newpix; xp1; xp1= xp1->Next)
4941                 if (IsSameIndex(xp1, xp2))
4942                   break;        // Index not to drop
4943 
4944               xp= xp2->GetNext();
4945 
4946               if (!xp1) {
4947                 *xlst= xp2;
4948                 *xprc= xp;
4949                 *(xlst= &xp2->Next)= NULL;
4950               } else
4951                 xprc= &xp2->Next;
4952 
4953               } // endfor xp2
4954 
4955             if (drp) {
4956               // Here we erase the index files
4957               ddp->DeleteIndexFile(g, drp);
4958               } // endif xp1
4959 
4960           } else if (oldpix) {
4961             // TODO: optimize the case of just adding new indexes
4962             if (!newpix)
4963               ddp->DeleteIndexFile(g, NULL);
4964 
4965             oldpix= NULL;     // To remake all indexes
4966             ddp->SetIndx(NULL);
4967           } // endif sepindex
4968 
4969           // Make the list of new created indexes
4970           xlst= &adp; xprc= &newpix;
4971 
4972           for (xp1= newpix; xp1; xp1= xp) {
4973             for (xp2= oldpix; xp2; xp2= xp2->Next)
4974               if (IsSameIndex(xp1, xp2))
4975                 break;        // Index already made
4976 
4977             xp= xp1->Next;
4978 
4979             if (!xp2) {
4980               *xlst= xp1;
4981               *xprc= xp;
4982               *(xlst= &xp1->Next)= NULL;
4983             } else
4984               xprc= &xp1->Next;
4985 
4986             } // endfor xp1
4987 
4988           if (adp)
4989             // Here we do make the new indexes
4990             if (tdp->MakeIndex(g, adp, true) == RC_FX) {
4991               // Make it a warning to avoid crash
4992               //push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
4993               //                  0, g->Message);
4994               //rc= 0;
4995 							my_message(ER_TOO_MANY_KEYS, g->Message, MYF(0));
4996 							rc= HA_ERR_INDEX_CORRUPT;
4997 						} // endif MakeIndex
4998 
4999         } else if (tdbp->GetDef()->Indexable() == 3) {
5000           if (CheckVirtualIndex(NULL)) {
5001             // Make it a warning to avoid crash
5002             push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
5003                               0, g->Message);
5004             rc= 0;
5005             } // endif Check
5006 
5007         } // endif indexable
5008 
5009         } // endif Tdbp
5010 
5011       } // endelse Xchk
5012 
5013     if (CloseTable(g)) {
5014       // This is an error while builing index
5015       // Make it a warning to avoid crash
5016       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
5017       rc= 0;
5018 		} // endif Close
5019 
5020     locked= 0;
5021     xmod= MODE_ANY;              // For info commands
5022     DBUG_RETURN(rc);
5023 	} else if (check_privileges(thd, options, table->s->db.str)) {
5024 		strcpy(g->Message, "This operation requires the FILE privilege");
5025 		htrc("%s\n", g->Message);
5026 		DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
5027 	} // endif check_privileges
5028 
5029 
5030   DBUG_ASSERT(table && table->s);
5031 
5032   // Table mode depends on the query type
5033   newmode= CheckMode(g, thd, newmode, &xcheck, &cras);
5034 
5035   if (newmode == MODE_ERROR)
5036     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
5037 
5038 	DBUG_RETURN(check_stmt(g, newmode, cras));
5039 } // end of external_lock
5040 
5041 
5042 int ha_connect::check_stmt(PGLOBAL g, MODE newmode, bool cras)
5043 {
5044 	int rc= 0;
5045 	DBUG_ENTER("ha_connect::check_stmt");
5046 
5047 	// If this is the start of a new query, cleanup the previous one
5048   if (xp->CheckCleanup()) {
5049     tdbp= NULL;
5050     valid_info= false;
5051 	} // endif CheckCleanup
5052 
5053   if (cras)
5054     g->Createas= true;  // To tell external tables of a multi-table command
5055 
5056 	if (trace(1))
5057 		htrc("Calling CntCheckDB db=%s cras=%d\n", GetDBName(NULL), cras);
5058 
5059   // Set or reset the good database environment
5060   if (CntCheckDB(g, this, GetDBName(NULL))) {
5061 		htrc("%p check_stmt: %s\n", this, g->Message);
5062 		rc= HA_ERR_INTERNAL_ERROR;
5063   // This can NOT be called without open called first, but
5064   // the table can have been closed since then
5065   } else if (!tdbp || xp->CheckQuery(valid_query_id) || xmod != newmode) {
5066     if (tdbp) {
5067       // If this is called by a later query, the table may have
5068       // been already closed and the tdbp is not valid anymore.
5069       if (xp->last_query_id == valid_query_id)
5070         rc= CloseTable(g);
5071       else
5072         tdbp= NULL;
5073 
5074       } // endif tdbp
5075 
5076     xmod= newmode;
5077 
5078     // Delay open until used fields are known
5079   } // endif tdbp
5080 
5081   if (trace(1))
5082 		htrc("check_stmt: rc=%d\n", rc);
5083 
5084   DBUG_RETURN(rc);
5085 } // end of check_stmt
5086 
5087 
5088 /**
5089   @brief
5090   The idea with handler::store_lock() is: The statement decides which locks
5091   should be needed for the table. For updates/deletes/inserts we get WRITE
5092   locks, for SELECT... we get read locks.
5093 
5094     @details
5095   Before adding the lock into the table lock handler (see thr_lock.c),
5096   mysqld calls store lock with the requested locks. Store lock can now
5097   modify a write lock to a read lock (or some other lock), ignore the
5098   lock (if we don't want to use MySQL table locks at all), or add locks
5099   for many tables (like we do when we are using a MERGE handler).
5100 
5101   Berkeley DB, for example, changes all WRITE locks to TL_WRITE_ALLOW_WRITE
5102   (which signals that we are doing WRITES, but are still allowing other
5103   readers and writers).
5104 
5105   When releasing locks, store_lock() is also called. In this case one
5106   usually doesn't have to do anything.
5107 
5108   In some exceptional cases MySQL may send a request for a TL_IGNORE;
5109   This means that we are requesting the same lock as last time and this
5110   should also be ignored. (This may happen when someone does a flush
5111   table when we have opened a part of the tables, in which case mysqld
5112   closes and reopens the tables and tries to get the same locks at last
5113   time). In the future we will probably try to remove this.
5114 
5115   Called from lock.cc by get_lock_data().
5116 
5117     @note
5118   In this method one should NEVER rely on table->in_use, it may, in fact,
5119   refer to a different thread! (this happens if get_lock_data() is called
5120   from mysql_lock_abort_for_thread() function)
5121 
5122     @see
5123   get_lock_data() in lock.cc
5124 */
5125 THR_LOCK_DATA **ha_connect::store_lock(THD *,
5126                                        THR_LOCK_DATA **to,
5127                                        enum thr_lock_type lock_type)
5128 {
5129   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
5130     lock.type=lock_type;
5131   *to++= &lock;
5132   return to;
5133 }
5134 
5135 
5136 /**
5137   Searches for a pointer to the last occurrence of  the
5138   character c in the string src.
5139   Returns true on failure, false on success.
5140 */
5141 static bool
5142 strnrchr(LEX_CSTRING *ls, const char *src, size_t length, int c)
5143 {
5144   const char *srcend, *s;
5145   for (s= srcend= src + length; s > src; s--)
5146   {
5147     if (s[-1] == c)
5148     {
5149       ls->str= s;
5150       ls->length= srcend - s;
5151       return false;
5152     }
5153   }
5154   return true;
5155 }
5156 
5157 
5158 /**
5159   Split filename into database and table name.
5160 */
5161 static bool
5162 filename_to_dbname_and_tablename(const char *filename,
5163                                  char *database, size_t database_size,
5164                                  char *table, size_t table_size)
5165 {
5166   LEX_CSTRING d, t;
5167   size_t length= strlen(filename);
5168 
5169   /* Find filename - the rightmost directory part */
5170   if (strnrchr(&t, filename, length, slash) || t.length + 1 > table_size)
5171     return true;
5172   memcpy(table, t.str, t.length);
5173   table[t.length]= '\0';
5174   if (!(length-= t.length))
5175     return true;
5176 
5177   length--; /* Skip slash */
5178 
5179   /* Find database name - the second rightmost directory part */
5180   if (strnrchr(&d, filename, length, slash) || d.length + 1 > database_size)
5181     return true;
5182   memcpy(database, d.str, d.length);
5183   database[d.length]= '\0';
5184   return false;
5185 } // end of filename_to_dbname_and_tablename
5186 
5187 /**
5188   @brief
5189   Used to delete or rename a table. By the time delete_table() has been
5190   called all opened references to this table will have been closed
5191   (and your globally shared references released) ===> too bad!!!
5192   The variable name will just be the name of the table.
5193   You will need to remove or rename any files you have created at
5194   this point.
5195 
5196     @details
5197   If you do not implement this, the default delete_table() is called from
5198   handler.cc and it will delete all files with the file extensions returned
5199   by bas_ext().
5200 
5201   Called from handler.cc by delete_table and ha_create_table(). Only used
5202   during create if the table_flag HA_DROP_BEFORE_CREATE was specified for
5203   the storage engine.
5204 
5205     @see
5206   delete_table and ha_create_table() in handler.cc
5207 */
5208 int ha_connect::delete_or_rename_table(const char *name, const char *to)
5209 {
5210   DBUG_ENTER("ha_connect::delete_or_rename_table");
5211   char db[128], tabname[128];
5212   int  rc= 0;
5213   bool ok= false;
5214   THD *thd= current_thd;
5215   int  sqlcom= thd_sql_command(thd);
5216 
5217   if (trace(1)) {
5218     if (to)
5219       htrc("rename_table: this=%p thd=%p sqlcom=%d from=%s to=%s\n",
5220               this, thd, sqlcom, name, to);
5221     else
5222       htrc("delete_table: this=%p thd=%p sqlcom=%d name=%s\n",
5223               this, thd, sqlcom, name);
5224 
5225     } // endif trace
5226 
5227   if (to && (filename_to_dbname_and_tablename(to, db, sizeof(db),
5228                                              tabname, sizeof(tabname))
5229       || (*tabname == '#' && sqlcom == SQLCOM_CREATE_INDEX)))
5230     DBUG_RETURN(0);
5231 
5232   if (filename_to_dbname_and_tablename(name, db, sizeof(db),
5233                                        tabname, sizeof(tabname))
5234       || (*tabname == '#' && sqlcom == SQLCOM_CREATE_INDEX))
5235     DBUG_RETURN(0);
5236 
5237   // If a temporary file exists, all the tests below were passed
5238   // successfully when making it, so they are not needed anymore
5239   // in particular because they sometimes cause DBUG_ASSERT crash.
5240   // Also, for partitioned tables, no test can be done because when
5241   // this function is called, the .par file is already deleted and
5242   // this causes the open_table_def function to fail.
5243   // Not having any other clues (table and table_share are NULL)
5244   // the only mean we have to test for partitioning is this:
5245   if (*tabname != '#' && !strstr(tabname, "#P#")) {
5246     // We have to retrieve the information about this table options.
5247     ha_table_option_struct *pos;
5248     char         key[MAX_DBKEY_LENGTH];
5249     uint         key_length;
5250     TABLE_SHARE *share;
5251 
5252 //  if ((p= strstr(tabname, "#P#")))   won't work, see above
5253 //    *p= 0;             // Get the main the table name
5254 
5255     key_length= tdc_create_key(key, db, tabname);
5256 
5257     // share contains the option struct that we need
5258     if (!(share= alloc_table_share(db, tabname, key, key_length)))
5259       DBUG_RETURN(rc);
5260 
5261     // Get the share info from the .frm file
5262     Dummy_error_handler error_handler;
5263     thd->push_internal_handler(&error_handler);
5264     bool got_error= open_table_def(thd, share);
5265     thd->pop_internal_handler();
5266     if (!got_error) {
5267       // Now we can work
5268       if ((pos= share->option_struct)) {
5269         if (check_privileges(thd, pos, db))
5270           rc= HA_ERR_INTERNAL_ERROR;         // ???
5271         else
5272           if (IsFileType(GetRealType(pos)) && !pos->filename)
5273             ok= true;
5274 
5275         } // endif pos
5276 
5277       } // endif open_table_def
5278 
5279     free_table_share(share);
5280   } else              // Temporary file
5281     ok= true;
5282 
5283   if (ok) {
5284     // Let the base handler do the job
5285     if (to)
5286       rc= handler::rename_table(name, to);
5287     else if ((rc= handler::delete_table(name)) == ENOENT)
5288       rc= 0;        // No files is not an error for CONNECT
5289 
5290     } // endif ok
5291 
5292   DBUG_RETURN(rc);
5293 } // end of delete_or_rename_table
5294 
5295 int ha_connect::delete_table(const char *name)
5296 {
5297   return delete_or_rename_table(name, NULL);
5298 } // end of delete_table
5299 
5300 int ha_connect::rename_table(const char *from, const char *to)
5301 {
5302   return delete_or_rename_table(from, to);
5303 } // end of rename_table
5304 
5305 /**
5306   @brief
5307   Given a starting key and an ending key, estimate the number of rows that
5308   will exist between the two keys.
5309 
5310   @details
5311   end_key may be empty, in which case determine if start_key matches any rows.
5312 
5313   Called from opt_range.cc by check_quick_keys().
5314 
5315   @see
5316   check_quick_keys() in opt_range.cc
5317 */
5318 ha_rows ha_connect::records_in_range(uint inx, key_range *min_key,
5319                                                key_range *max_key)
5320 {
5321   ha_rows rows;
5322   DBUG_ENTER("ha_connect::records_in_range");
5323 
5324   if (indexing < 0 || inx != active_index)
5325     if (index_init(inx, false))
5326       DBUG_RETURN(HA_POS_ERROR);
5327 
5328   if (trace(1))
5329     htrc("records_in_range: inx=%d indexing=%d\n", inx, indexing);
5330 
5331   if (indexing > 0) {
5332     int          nval;
5333     uint         len[2];
5334     const uchar *key[2];
5335     bool         incl[2];
5336     key_part_map kmap[2];
5337 
5338     key[0]= (min_key) ? min_key->key : NULL;
5339     key[1]= (max_key) ? max_key->key : NULL;
5340     len[0]= (min_key) ? min_key->length : 0;
5341     len[1]= (max_key) ? max_key->length : 0;
5342     incl[0]= (min_key) ? (min_key->flag == HA_READ_KEY_EXACT) : false;
5343     incl[1]= (max_key) ? (max_key->flag == HA_READ_AFTER_KEY) : false;
5344     kmap[0]= (min_key) ? min_key->keypart_map : 0;
5345     kmap[1]= (max_key) ? max_key->keypart_map : 0;
5346 
5347     if ((nval= CntIndexRange(xp->g, tdbp, key, len, incl, kmap)) < 0)
5348       rows= HA_POS_ERROR;
5349     else
5350       rows= (ha_rows)nval;
5351 
5352   } else if (indexing == 0)
5353     rows= 100000000;        // Don't use missing index
5354   else
5355     rows= HA_POS_ERROR;
5356 
5357   if (trace(1))
5358     htrc("records_in_range: rows=%llu\n", rows);
5359 
5360   DBUG_RETURN(rows);
5361 } // end of records_in_range
5362 
5363 // Used to check whether a MYSQL table is created on itself
5364 bool CheckSelf(PGLOBAL g, TABLE_SHARE *s, PCSZ host,
5365 	             PCSZ db, PCSZ tab, PCSZ src, int port)
5366 {
5367   if (src)
5368     return false;
5369   else if (host && stricmp(host, "localhost") && strcmp(host, "127.0.0.1"))
5370     return false;
5371   else if (db && stricmp(db, s->db.str))
5372     return false;
5373   else if (tab && stricmp(tab, s->table_name.str))
5374     return false;
5375   else if (port && port != (signed)GetDefaultPort())
5376     return false;
5377 
5378   strcpy(g->Message, "This MySQL table is defined on itself");
5379   return true;
5380 } // end of CheckSelf
5381 
5382 /**
5383   Convert an ISO-8859-1 column name to UTF-8
5384 */
5385 static char *encode(PGLOBAL g, const char *cnm)
5386   {
5387   char  *buf= (char*)PlugSubAlloc(g, NULL, strlen(cnm) * 3);
5388   uint   dummy_errors;
5389   uint32 len= copy_and_convert(buf, strlen(cnm) * 3,
5390                                &my_charset_utf8_general_ci,
5391                                cnm, strlen(cnm),
5392                                &my_charset_latin1,
5393                                &dummy_errors);
5394   buf[len]= '\0';
5395   return buf;
5396   } // end of encode
5397 
5398 /**
5399   Store field definition for create.
5400 
5401   @return
5402     Return 0 if ok
5403 */
5404 static bool add_field(String* sql, TABTYPE ttp, const char* field_name, int typ,
5405 	                    int len, int dec, char* key, uint tm, const char* rem,
5406 	                    char* dft, char* xtra, char* fmt, int flag, bool dbf, char v)
5407 {
5408 	char var = (len > 255) ? 'V' : v;
5409 	bool q, error = false;
5410 	const char* type = PLGtoMYSQLtype(typ, dbf, var);
5411 
5412 	error |= sql->append('`');
5413 	error |= sql->append(field_name);
5414 	error |= sql->append("` ");
5415 	error |= sql->append(type);
5416 
5417 	if (typ == TYPE_STRING ||
5418 		(len && typ != TYPE_DATE && (typ != TYPE_DOUBLE || dec >= 0))) {
5419 		error |= sql->append('(');
5420 		error |= sql->append_ulonglong(len);
5421 
5422 		if (typ == TYPE_DOUBLE) {
5423 			error |= sql->append(',');
5424 			// dec must be < len and < 31
5425 			error |= sql->append_ulonglong(MY_MIN(dec, (MY_MIN(len, 31) - 1)));
5426 		} else if (dec > 0 && !strcmp(type, "DECIMAL")) {
5427 			error |= sql->append(',');
5428 			// dec must be < len
5429 			error |= sql->append_ulonglong(MY_MIN(dec, len - 1));
5430 		} // endif dec
5431 
5432 		error |= sql->append(')');
5433 	} // endif len
5434 
5435 	if (v == 'U')
5436 		error |= sql->append(" UNSIGNED");
5437 	else if (v == 'Z')
5438 		error |= sql->append(" ZEROFILL");
5439 
5440 	if (key && *key) {
5441 		error |= sql->append(" ");
5442 		error |= sql->append(key);
5443 	} // endif key
5444 
5445 	if (tm)
5446 		error |= sql->append(STRING_WITH_LEN(" NOT NULL"), system_charset_info);
5447 
5448 	if (dft && *dft) {
5449 		error |= sql->append(" DEFAULT ");
5450 
5451 		if (typ == TYPE_DATE)
5452 			q = (strspn(dft, "0123456789 -:/") == strlen(dft));
5453 		else
5454 			q = !IsTypeNum(typ);
5455 
5456 		if (q) {
5457 			error |= sql->append("'");
5458 			error |= sql->append_for_single_quote(dft, strlen(dft));
5459 			error |= sql->append("'");
5460 		} else
5461 			error |= sql->append(dft);
5462 
5463 	} // endif dft
5464 
5465 	if (xtra && *xtra) {
5466 		error |= sql->append(" ");
5467 		error |= sql->append(xtra);
5468 	} // endif rem
5469 
5470 	if (rem && *rem) {
5471 		error |= sql->append(" COMMENT '");
5472 		error |= sql->append_for_single_quote(rem, strlen(rem));
5473 		error |= sql->append("'");
5474 	} // endif rem
5475 
5476 	if (fmt && *fmt) {
5477     switch (ttp) {
5478       case TAB_MONGO:
5479       case TAB_BSON:
5480       case TAB_JSON: error |= sql->append(" JPATH='"); break;
5481       case TAB_XML:  error |= sql->append(" XPATH='"); break;
5482       default:	     error |= sql->append(" FIELD_FORMAT='");
5483     } // endswitch ttp
5484 
5485 		error |= sql->append_for_single_quote(fmt, strlen(fmt));
5486 		error |= sql->append("'");
5487 	} // endif flag
5488 
5489 	if (flag) {
5490 		error |= sql->append(" FLAG=");
5491 		error |= sql->append_ulonglong(flag);
5492 	} // endif flag
5493 
5494 	error |= sql->append(',');
5495 	return error;
5496 } // end of add_field
5497 
5498 /**
5499   Initialise the table share with the new columns.
5500 
5501   @return
5502     Return 0 if ok
5503 */
5504 static int init_table_share(THD* thd,
5505                             TABLE_SHARE *table_s,
5506                             HA_CREATE_INFO *create_info,
5507                             String *sql)
5508 {
5509   bool oom= false;
5510   PTOS topt= table_s->option_struct;
5511 
5512   sql->length(sql->length()-1); // remove the trailing comma
5513   sql->append(')');
5514 
5515   for (ha_create_table_option *opt= connect_table_option_list;
5516        opt->name; opt++) {
5517     ulonglong   vull;
5518     const char *vstr;
5519 
5520     switch (opt->type) {
5521       case HA_OPTION_TYPE_ULL:
5522         vull= *(ulonglong*)(((char*)topt) + opt->offset);
5523 
5524         if (vull != opt->def_value) {
5525           oom|= sql->append(' ');
5526           oom|= sql->append(opt->name);
5527           oom|= sql->append('=');
5528           oom|= sql->append_ulonglong(vull);
5529           } // endif vull
5530 
5531         break;
5532       case HA_OPTION_TYPE_STRING:
5533         vstr= *(char**)(((char*)topt) + opt->offset);
5534 
5535         if (vstr) {
5536           oom|= sql->append(' ');
5537           oom|= sql->append(opt->name);
5538           oom|= sql->append("='");
5539           oom|= sql->append_for_single_quote(vstr, strlen(vstr));
5540           oom|= sql->append('\'');
5541           } // endif vstr
5542 
5543         break;
5544       case HA_OPTION_TYPE_BOOL:
5545         vull= *(bool*)(((char*)topt) + opt->offset);
5546 
5547         if (vull != opt->def_value) {
5548           oom|= sql->append(' ');
5549           oom|= sql->append(opt->name);
5550           oom|= sql->append('=');
5551           oom|= sql->append(vull ? "YES" : "NO");
5552           } // endif vull
5553 
5554         break;
5555       default: // no enums here, good :)
5556         break;
5557       } // endswitch type
5558 
5559     if (oom)
5560       return HA_ERR_OUT_OF_MEM;
5561 
5562     } // endfor opt
5563 
5564   if (create_info->connect_string.length) {
5565     oom|= sql->append(' ');
5566     oom|= sql->append("CONNECTION='");
5567     oom|= sql->append_for_single_quote(create_info->connect_string.str,
5568                                        create_info->connect_string.length);
5569     oom|= sql->append('\'');
5570 
5571     if (oom)
5572       return HA_ERR_OUT_OF_MEM;
5573 
5574     } // endif string
5575 
5576   if (create_info->default_table_charset) {
5577     oom|= sql->append(' ');
5578     oom|= sql->append("CHARSET=");
5579     oom|= sql->append(create_info->default_table_charset->csname);
5580 
5581     if (oom)
5582       return HA_ERR_OUT_OF_MEM;
5583 
5584     } // endif charset
5585 
5586   if (trace(1))
5587     htrc("s_init: %.*s\n", sql->length(), sql->ptr());
5588 
5589   return table_s->init_from_sql_statement_string(thd, true,
5590                                                  sql->ptr(), sql->length());
5591 } // end of init_table_share
5592 
5593 /**
5594   @brief
5595   connect_assisted_discovery() is called when creating a table with no columns.
5596 
5597   @details
5598   When assisted discovery is used the .frm file have not already been
5599   created. You can overwrite some definitions at this point but the
5600   main purpose of it is to define the columns for some table types.
5601 
5602   @note
5603   this function is no more called in case of CREATE .. SELECT
5604 */
5605 static int connect_assisted_discovery(handlerton *, THD* thd,
5606                                       TABLE_SHARE *table_s,
5607                                       HA_CREATE_INFO *create_info)
5608 {
5609   char     v=0;
5610 	PCSZ     fncn= "?";
5611 	PCSZ     user, fn, db, host, pwd, sep, tbl, src;
5612 	PCSZ     col, ocl, rnk, pic, fcl, skc, zfn;
5613 	char    *tab, *dsn, *shm, *dpath, *url;
5614 #if defined(_WIN32)
5615 	PCSZ     nsp= NULL, cls= NULL;
5616 #endif   // _WIN32
5617 //int      hdr, mxe;
5618 	int      port= 0, mxr __attribute__((unused)) = 0, rc= 0, mul= 0;
5619 //PCSZ     tabtyp= NULL;
5620 #if defined(ODBC_SUPPORT)
5621   POPARM   sop= NULL;
5622 	PCSZ     ucnc= NULL;
5623 	bool     cnc= false;
5624   int      cto= -1, qto= -1;
5625 #endif   // ODBC_SUPPORT
5626 #if defined(JAVA_SUPPORT)
5627 	PJPARM   sjp= NULL;
5628 	PCSZ     driver= NULL;
5629 #endif   // JAVA_SUPPORT
5630   uint     tm, fnc= FNC_NO, supfnc= (FNC_NO | FNC_COL);
5631   bool     bif, ok= false, dbf= false;
5632   TABTYPE  ttp= TAB_UNDEF, ttr=TAB_UNDEF;
5633   PQRYRES  qrp= NULL;
5634   PCOLRES  crp;
5635   PCONNECT xp= NULL;
5636   PGLOBAL  g= GetPlug(thd, xp);
5637 
5638   if (!g)
5639     return HA_ERR_INTERNAL_ERROR;
5640 
5641   PTOS     topt= table_s->option_struct;
5642   char     buf[1024];
5643   String   sql(buf, sizeof(buf), system_charset_info);
5644 
5645   sql.copy(STRING_WITH_LEN("CREATE TABLE whatever ("), system_charset_info);
5646 	user= host= pwd= tbl= src= col= ocl= pic= fcl= skc= rnk= zfn= NULL;
5647 	dsn= url= NULL;
5648 
5649   // Get the useful create options
5650   ttp= GetTypeID(topt->type);
5651   fn=  topt->filename;
5652   tab= (char*)topt->tabname;
5653   src= topt->srcdef;
5654   db=  topt->dbname;
5655   fncn= topt->catfunc;
5656   fnc= GetFuncID(fncn);
5657   sep= topt->separator;
5658 	mul= (int)topt->multiple;
5659 	tbl= topt->tablist;
5660   col= topt->colist;
5661 
5662   if (topt->oplist) {
5663     host= GetListOption(g, "host", topt->oplist, "localhost");
5664     user= GetListOption(g, "user", topt->oplist,
5665           ((ttp == TAB_ODBC || ttp == TAB_JDBC) ? NULL : "root"));
5666     // Default value db can come from the DBNAME=xxx option.
5667     db= GetListOption(g, "database", topt->oplist, db);
5668     col= GetListOption(g, "colist", topt->oplist, col);
5669     ocl= GetListOption(g, "occurcol", topt->oplist, NULL);
5670     pic= GetListOption(g, "pivotcol", topt->oplist, NULL);
5671     fcl= GetListOption(g, "fnccol", topt->oplist, NULL);
5672     skc= GetListOption(g, "skipcol", topt->oplist, NULL);
5673     rnk= GetListOption(g, "rankcol", topt->oplist, NULL);
5674     pwd= GetListOption(g, "password", topt->oplist);
5675 #if defined(_WIN32)
5676     nsp= GetListOption(g, "namespace", topt->oplist);
5677     cls= GetListOption(g, "class", topt->oplist);
5678 #endif   // _WIN32
5679     port= atoi(GetListOption(g, "port", topt->oplist, "0"));
5680 #if defined(ODBC_SUPPORT)
5681 //	tabtyp= GetListOption(g, "Tabtype", topt->oplist, NULL);
5682 		mxr= atoi(GetListOption(g,"maxres", topt->oplist, "0"));
5683     cto= atoi(GetListOption(g,"ConnectTimeout", topt->oplist, "-1"));
5684     qto= atoi(GetListOption(g,"QueryTimeout", topt->oplist, "-1"));
5685 
5686     if ((ucnc= GetListOption(g, "UseDSN", topt->oplist)))
5687       cnc= (!*ucnc || *ucnc == 'y' || *ucnc == 'Y' || atoi(ucnc) != 0);
5688 #endif
5689 #if defined(JAVA_SUPPORT)
5690 		driver= GetListOption(g, "Driver", topt->oplist, NULL);
5691 #endif   // JAVA_SUPPORT
5692 #if defined(PROMPT_OK)
5693     cop= atoi(GetListOption(g, "checkdsn", topt->oplist, "0"));
5694 #endif   // PROMPT_OK
5695 #if defined(ZIP_SUPPORT)
5696 		zfn= GetListOption(g, "Zipfile", topt->oplist, NULL);
5697 #endif   // ZIP_SUPPORT
5698 	} else {
5699     host= "localhost";
5700     user= ((ttp == TAB_ODBC || ttp == TAB_JDBC) ? NULL : "root");
5701   } // endif option_list
5702 
5703   if (!(shm= (char*)db))
5704     db= table_s->db.str;                   // Default value
5705 
5706 	try {
5707 		// Check table type
5708 		if (ttp == TAB_UNDEF && !topt->http) {
5709 			topt->type= (src) ? "MYSQL" : (tab) ? "PROXY" : "DOS";
5710 			ttp= GetTypeID(topt->type);
5711 			snprintf(g->Message, sizeof(g->Message), "No table_type. Was set to %s", topt->type);
5712 			push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 0, g->Message);
5713 		} else if (ttp == TAB_NIY) {
5714 			snprintf(g->Message, sizeof(g->Message), "Unsupported table type %s", topt->type);
5715 			rc= HA_ERR_INTERNAL_ERROR;
5716 			goto err;
5717 #if defined(REST_SUPPORT)
5718 		} else if (topt->http) {
5719       if (ttp == TAB_UNDEF) {
5720         ttr= TAB_JSON;
5721         strcpy(g->Message, "No table_type. Was set to JSON");
5722         push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 0, g->Message);
5723       } else
5724         ttr= ttp;
5725 
5726       switch (ttr) {
5727 				case TAB_JSON:
5728 #if defined(BSON_SUPPORT)
5729         case TAB_BSON:
5730 #endif   // BSON_SUPPORT
5731         case TAB_XML:
5732 				case TAB_CSV:
5733           ttp = TAB_REST;
5734 					break;
5735 				default:
5736 					break;
5737 			}	// endswitch type
5738 #endif   // REST_SUPPORT
5739 		} // endif ttp
5740 
5741     if (fn && *fn)
5742       switch (ttp) {
5743         case TAB_FMT:
5744         case TAB_DBF:
5745         case TAB_XML:
5746         case TAB_INI:
5747         case TAB_VEC:
5748         case TAB_REST:
5749         case TAB_JSON:
5750 #if defined(BSON_SUPPORT)
5751         case TAB_BSON:
5752 #endif   // BSON_SUPPORT
5753           if (checkPrivileges(thd, ttp, topt, db)) {
5754             strcpy(g->Message, "This operation requires the FILE privilege");
5755             rc= HA_ERR_INTERNAL_ERROR;
5756             goto err;
5757           } // endif check_privileges
5758 
5759           break;
5760         default:
5761           break;
5762       } // endswitch ttp
5763 
5764 		if (!tab) {
5765 			if (ttp == TAB_TBL) {
5766 				// Make tab the first table of the list
5767 				char *p;
5768 
5769 				if (!tbl) {
5770 					strcpy(g->Message, "Missing table list");
5771 					rc= HA_ERR_INTERNAL_ERROR;
5772 					goto err;
5773 				} // endif tbl
5774 
5775 				tab= PlugDup(g, tbl);
5776 
5777 				if ((p= strchr(tab, ',')))
5778 					*p= 0;
5779 
5780 				if ((p= strchr(tab, '.'))) {
5781 					*p= 0;
5782 					db= tab;
5783 					tab= p + 1;
5784 				} // endif p
5785 
5786 			} else if (ttp != TAB_ODBC || !(fnc & (FNC_TABLE | FNC_COL)))
5787 			  tab= (char*)table_s->table_name.str;   // Default value
5788 
5789 		} // endif tab
5790 
5791 		switch (ttp) {
5792 #if defined(ODBC_SUPPORT)
5793 			case TAB_ODBC:
5794 				dsn= strz(g, create_info->connect_string);
5795 
5796 				if (fnc & (FNC_DSN | FNC_DRIVER)) {
5797 					ok= true;
5798 #if defined(PROMPT_OK)
5799 				} else if (!stricmp(thd->main_security_ctx.host, "localhost")
5800 					&& cop == 1) {
5801 					if ((dsn= ODBCCheckConnection(g, dsn, cop)) != NULL) {
5802 						thd->make_lex_string(&create_info->connect_string, dsn, strlen(dsn));
5803 						ok= true;
5804 					} // endif dsn
5805 #endif   // PROMPT_OK
5806 
5807 				} else if (!dsn) {
5808 					sprintf(g->Message, "Missing %s connection string", topt->type);
5809 				} else {
5810 					// Store ODBC additional parameters
5811 					sop= (POPARM)PlugSubAlloc(g, NULL, sizeof(ODBCPARM));
5812 					sop->User= (char*)user;
5813 					sop->Pwd= (char*)pwd;
5814 					sop->Cto= cto;
5815 					sop->Qto= qto;
5816 					sop->UseCnc= cnc;
5817 					ok= true;
5818 				} // endif's
5819 
5820 				supfnc |= (FNC_TABLE | FNC_DSN | FNC_DRIVER);
5821 				break;
5822 #endif   // ODBC_SUPPORT
5823 #if defined(JAVA_SUPPORT)
5824 			case TAB_JDBC:
5825 				if (fnc & FNC_DRIVER) {
5826 					ok= true;
5827 				} else if (!(url= strz(g, create_info->connect_string))) {
5828 					strcpy(g->Message, "Missing URL");
5829 				} else {
5830 					// Store JDBC additional parameters
5831 					int      rc;
5832 					PJDBCDEF jdef= new(g) JDBCDEF();
5833 
5834 					jdef->SetName(create_info->alias.str);
5835 					sjp = (PJPARM)PlugSubAlloc(g, NULL, sizeof(JDBCPARM));
5836 					sjp->Driver = driver;
5837 					//		sjp->Properties = prop;
5838 					sjp->Fsize = 0;
5839 					sjp->Scrollable = false;
5840 
5841 					if ((rc = jdef->ParseURL(g, url, false)) == RC_OK) {
5842 						sjp->Url = url;
5843 						sjp->User = (char*)user;
5844 						sjp->Pwd = (char*)pwd;
5845 						ok = true;
5846 					} else if (rc == RC_NF) {
5847 						if (jdef->GetTabname())
5848 							tab= (char*)jdef->GetTabname();
5849 
5850 						ok= jdef->SetParms(sjp);
5851 					} // endif rc
5852 
5853 				} // endif's
5854 
5855 				supfnc |= (FNC_DRIVER | FNC_TABLE);
5856 				break;
5857 #endif   // JAVA_SUPPORT
5858 			case TAB_DBF:
5859 				dbf= true;
5860 				// fall through
5861 			case TAB_CSV:
5862 				if (!fn && fnc != FNC_NO)
5863 					sprintf(g->Message, "Missing %s file name", topt->type);
5864 				else if (sep && strlen(sep) > 1)
5865 					sprintf(g->Message, "Invalid separator %s", sep);
5866 				else
5867 					ok= true;
5868 
5869 				break;
5870 			case TAB_MYSQL:
5871 				ok= true;
5872 
5873 				if (create_info->connect_string.str &&
5874 					create_info->connect_string.length) {
5875 					PMYDEF  mydef= new(g) MYSQLDEF();
5876 
5877 					dsn= strz(g, create_info->connect_string);
5878 					mydef->SetName(create_info->alias.str);
5879 
5880 					if (!mydef->ParseURL(g, dsn, false)) {
5881 						if (mydef->GetHostname())
5882 							host= mydef->GetHostname();
5883 
5884 						if (mydef->GetUsername())
5885 							user= mydef->GetUsername();
5886 
5887 						if (mydef->GetPassword())
5888 							pwd= mydef->GetPassword();
5889 
5890 						if (mydef->GetTabschema())
5891 							db= mydef->GetTabschema();
5892 
5893 						if (mydef->GetTabname())
5894 							tab= (char*)mydef->GetTabname();
5895 
5896 						if (mydef->GetPortnumber())
5897 							port= mydef->GetPortnumber();
5898 
5899 					} else
5900 						ok= false;
5901 
5902 				} else if (!user)
5903 					user= "root";
5904 
5905 				if (ok && CheckSelf(g, table_s, host, db, tab, src, port))
5906 					ok= false;
5907 
5908 				break;
5909 #if defined(_WIN32)
5910 			case TAB_WMI:
5911 				ok= true;
5912 				break;
5913 #endif   // _WIN32
5914 			case TAB_PIVOT:
5915 				supfnc= FNC_NO;
5916                                 // fall through
5917 			case TAB_PRX:
5918 			case TAB_TBL:
5919 			case TAB_XCL:
5920 			case TAB_OCCUR:
5921 				if (!src && !stricmp(tab, create_info->alias.str) &&
5922 					(!db || !stricmp(db, table_s->db.str)))
5923 					sprintf(g->Message, "A %s table cannot refer to itself", topt->type);
5924 				else
5925 					ok= true;
5926 
5927 				break;
5928 			case TAB_OEM:
5929 				if (topt->module && topt->subtype)
5930 					ok= true;
5931 				else
5932 					strcpy(g->Message, "Missing OEM module or subtype");
5933 
5934 				break;
5935 #if defined(LIBXML2_SUPPORT) || defined(DOMDOC_SUPPORT)
5936 			case TAB_XML:
5937 #endif   // LIBXML2_SUPPORT  ||         DOMDOC_SUPPORT
5938 			case TAB_JSON:
5939 #if defined(BSON_SUPPORT)
5940       case TAB_BSON:
5941 #endif   // BSON_SUPPORT
5942         dsn= strz(g, create_info->connect_string);
5943 
5944 				if (!fn && !zfn && !mul && !dsn)
5945 					sprintf(g->Message, "Missing %s file name", topt->type);
5946 				else if (dsn && !topt->tabname)
5947           topt->tabname= tab;
5948 
5949 				ok= true;
5950 				break;
5951 #if defined(JAVA_SUPPORT)
5952 			case TAB_MONGO:
5953 				if (!topt->tabname)
5954 					topt->tabname= tab;
5955 
5956 				ok= true;
5957 				break;
5958 #endif   // JAVA_SUPPORT
5959 #if defined(REST_SUPPORT)
5960 			case TAB_REST:
5961 				if (!topt->http)
5962 					strcpy(g->Message, "Missing REST HTTP option");
5963 				else
5964 					ok = true;
5965 
5966 				break;
5967 #endif   // REST_SUPPORT
5968 			case TAB_VIR:
5969 				ok= true;
5970 				break;
5971 			default:
5972 				sprintf(g->Message, "Cannot get column info for table type %s", topt->type);
5973 				break;
5974 		} // endif ttp
5975 
5976 	// Check for supported catalog function
5977 		if (ok && !(supfnc & fnc)) {
5978 			sprintf(g->Message, "Unsupported catalog function %s for table type %s",
5979 				fncn, topt->type);
5980 			ok= false;
5981 		} // endif supfnc
5982 
5983 		if (src && fnc != FNC_NO) {
5984 			strcpy(g->Message, "Cannot make catalog table from srcdef");
5985 			ok= false;
5986 		} // endif src
5987 
5988 		if (ok) {
5989 			const char *cnm, *rem;
5990 			char *dft, *xtra, *key, *fmt;
5991 			int   i, len, prec, dec, typ, flg;
5992 
5993 			if (!(dpath= SetPath(g, table_s->db.str))) {
5994 				rc= HA_ERR_INTERNAL_ERROR;
5995 				goto err;
5996 			}	// endif dpath
5997 
5998 			if (src && ttp != TAB_PIVOT && ttp != TAB_ODBC && ttp != TAB_JDBC) {
5999 				qrp= SrcColumns(g, host, db, user, pwd, src, port);
6000 
6001 				if (qrp && ttp == TAB_OCCUR)
6002 					if (OcrSrcCols(g, qrp, col, ocl, rnk)) {
6003 						rc= HA_ERR_INTERNAL_ERROR;
6004 						goto err;
6005 					} // endif OcrSrcCols
6006 
6007 			} else switch (ttp) {
6008 				case TAB_DBF:
6009 					qrp= DBFColumns(g, dpath, fn, topt, fnc == FNC_COL);
6010 					break;
6011 #if defined(ODBC_SUPPORT)
6012 				case TAB_ODBC:
6013 					switch (fnc) {
6014 						case FNC_NO:
6015 						case FNC_COL:
6016 							if (src) {
6017 								qrp= ODBCSrcCols(g, dsn, (char*)src, sop);
6018 								src= NULL;     // for next tests
6019 							} else
6020 								qrp= ODBCColumns(g, dsn, shm, tab, NULL,
6021 									mxr, fnc == FNC_COL, sop);
6022 
6023 							break;
6024 						case FNC_TABLE:
6025 							qrp= ODBCTables(g, dsn, shm, tab, NULL, mxr, true, sop);
6026 							break;
6027 						case FNC_DSN:
6028 							qrp= ODBCDataSources(g, mxr, true);
6029 							break;
6030 						case FNC_DRIVER:
6031 							qrp= ODBCDrivers(g, mxr, true);
6032 							break;
6033 						default:
6034 							sprintf(g->Message, "invalid catfunc %s", fncn);
6035 							break;
6036 					} // endswitch info
6037 
6038 					break;
6039 #endif   // ODBC_SUPPORT
6040 #if defined(JAVA_SUPPORT)
6041 				case TAB_JDBC:
6042 					switch (fnc) {
6043 						case FNC_NO:
6044 						case FNC_COL:
6045 							if (src) {
6046 								qrp= JDBCSrcCols(g, (char*)src, sjp);
6047 								src= NULL;     // for next tests
6048 							} else
6049 								qrp= JDBCColumns(g, shm, tab, NULL, mxr, fnc == FNC_COL, sjp);
6050 
6051 							break;
6052 						case FNC_TABLE:
6053 //						qrp= JDBCTables(g, shm, tab, tabtyp, mxr, true, sjp);
6054 							qrp= JDBCTables(g, shm, tab, NULL, mxr, true, sjp);
6055 							break;
6056 #if 0
6057 						case FNC_DSN:
6058 							qrp= JDBCDataSources(g, mxr, true);
6059 							break;
6060 #endif // 0
6061 						case FNC_DRIVER:
6062 							qrp= JDBCDrivers(g, mxr, true);
6063 							break;
6064 						default:
6065 							sprintf(g->Message, "invalid catfunc %s", fncn);
6066 							break;
6067 					} // endswitch info
6068 
6069 					break;
6070 #endif   // JAVA_SUPPORT
6071 				case TAB_MYSQL:
6072 					qrp= MyColumns(g, thd, host, db, user, pwd, tab,
6073 						NULL, port, fnc == FNC_COL);
6074 					break;
6075 				case TAB_CSV:
6076 					qrp= CSVColumns(g, dpath, topt, fnc == FNC_COL);
6077 					break;
6078 #if defined(_WIN32)
6079 				case TAB_WMI:
6080 					qrp= WMIColumns(g, nsp, cls, fnc == FNC_COL);
6081 					break;
6082 #endif   // _WIN32
6083 				case TAB_PRX:
6084 				case TAB_TBL:
6085 				case TAB_XCL:
6086 				case TAB_OCCUR:
6087 					bif= fnc == FNC_COL;
6088 					qrp= TabColumns(g, thd, db, tab, bif);
6089 
6090 					if (!qrp && bif && fnc != FNC_COL)         // tab is a view
6091 						qrp= MyColumns(g, thd, host, db, user, pwd, tab, NULL, port, false);
6092 
6093 					if (qrp && ttp == TAB_OCCUR && fnc != FNC_COL)
6094 						if (OcrColumns(g, qrp, col, ocl, rnk)) {
6095 							rc= HA_ERR_INTERNAL_ERROR;
6096 							goto err;
6097 						} // endif OcrColumns
6098 
6099 					break;
6100 				case TAB_PIVOT:
6101 					qrp= PivotColumns(g, tab, src, pic, fcl, skc, host, db, user, pwd, port);
6102 					break;
6103 				case TAB_VIR:
6104 					qrp= VirColumns(g, fnc == FNC_COL);
6105 					break;
6106 				case TAB_JSON:
6107 #if !defined(FORCE_BSON)
6108 					qrp= JSONColumns(g, db, dsn, topt, fnc == FNC_COL);
6109 					break;
6110 #endif   // !FORCE_BSON
6111 #if defined(BSON_SUPPORT)
6112         case TAB_BSON:
6113           qrp= BSONColumns(g, db, dsn, topt, fnc == FNC_COL);
6114           break;
6115 #endif   // BSON_SUPPORT
6116 #if defined(JAVA_SUPPORT)
6117 				case TAB_MONGO:
6118 					url= strz(g, create_info->connect_string);
6119 					qrp= MGOColumns(g, db, url, topt, fnc == FNC_COL);
6120 					break;
6121 #endif   // JAVA_SUPPORT
6122 #if defined(LIBXML2_SUPPORT) || defined(DOMDOC_SUPPORT)
6123 				case TAB_XML:
6124 					qrp= XMLColumns(g, (char*)db, tab, topt, fnc == FNC_COL);
6125 					break;
6126 #endif   // LIBXML2_SUPPORT  ||         DOMDOC_SUPPORT
6127 #if defined(REST_SUPPORT)
6128 				case TAB_REST:
6129 					qrp = RESTColumns(g, topt, tab, (char *)db, fnc == FNC_COL);
6130 					break;
6131 #endif   // REST_SUPPORT
6132 				case TAB_OEM:
6133 					qrp= OEMColumns(g, topt, tab, (char*)db, fnc == FNC_COL);
6134 					break;
6135 				default:
6136 					strcpy(g->Message, "System error during assisted discovery");
6137 					break;
6138 			} // endswitch ttp
6139 
6140 			if (!qrp) {
6141 				rc= HA_ERR_INTERNAL_ERROR;
6142 				goto err;
6143 			} // endif !qrp
6144 
6145 			if (fnc != FNC_NO || src || ttp == TAB_PIVOT) {
6146 				// Catalog like table
6147 				for (crp= qrp->Colresp; !rc && crp; crp= crp->Next) {
6148 					cnm= (ttp == TAB_PIVOT) ? crp->Name : encode(g, crp->Name);
6149 					typ= crp->Type;
6150 					len= crp->Length;
6151 					dec= crp->Prec;
6152 					flg= crp->Flag;
6153 					v= (crp->Kdata->IsUnsigned()) ? 'U' : crp->Var;
6154 					tm= (crp->Kdata->IsNullable()) ? 0 : NOT_NULL_FLAG;
6155 
6156 					if (!len && typ == TYPE_STRING)
6157 						len= 256;      // STRBLK's have 0 length
6158 
6159 					// Now add the field
6160 					if (add_field(&sql, ttp, cnm, typ, len, dec, NULL, tm,
6161 						NULL, NULL, NULL, NULL, flg, dbf, v))
6162 						rc= HA_ERR_OUT_OF_MEM;
6163 				} // endfor crp
6164 
6165 			} else {
6166                                 char *schem __attribute__((unused)) = NULL;
6167 				char *tn= NULL;
6168 
6169 				// Not a catalog table
6170 				if (!qrp->Nblin) {
6171 					if (tab)
6172 						sprintf(g->Message, "Cannot get columns from %s", tab);
6173 					else
6174 						strcpy(g->Message, "Fail to retrieve columns");
6175 
6176 					rc= HA_ERR_INTERNAL_ERROR;
6177 					goto err;
6178 				} // endif !nblin
6179 
6180 				// Restore language type
6181 				if (ttp == TAB_REST)
6182           ttp = ttr;
6183 
6184 				for (i= 0; !rc && i < qrp->Nblin; i++) {
6185 					typ= len= prec= dec= flg= 0;
6186 					tm= NOT_NULL_FLAG;
6187 					cnm= (char*)"noname";
6188 					dft= xtra= key= fmt= tn= NULL;
6189 					v= ' ';
6190 					rem= NULL;
6191 
6192 					for (crp= qrp->Colresp; crp; crp= crp->Next)
6193 						switch (crp->Fld) {
6194 							case FLD_NAME:
6195 								if (ttp == TAB_PRX ||
6196 									(ttp == TAB_CSV && topt->data_charset &&
6197 									(!stricmp(topt->data_charset, "UTF8") ||
6198 										!stricmp(topt->data_charset, "UTF-8"))))
6199 									cnm= crp->Kdata->GetCharValue(i);
6200 								else
6201 									cnm= encode(g, crp->Kdata->GetCharValue(i));
6202 
6203 								break;
6204 							case FLD_TYPE:
6205 								typ= crp->Kdata->GetIntValue(i);
6206 								v= (crp->Nulls) ? crp->Nulls[i] : 0;
6207 								break;
6208 							case FLD_TYPENAME:
6209 								tn= crp->Kdata->GetCharValue(i);
6210 								break;
6211 							case FLD_PREC:
6212 								// PREC must be always before LENGTH
6213 								len= prec= crp->Kdata->GetIntValue(i);
6214 								break;
6215 							case FLD_LENGTH:
6216 								len= crp->Kdata->GetIntValue(i);
6217 								break;
6218 							case FLD_SCALE:
6219 								dec= (!crp->Kdata->IsNull(i)) ? crp->Kdata->GetIntValue(i) : -1;
6220 								break;
6221 							case FLD_NULL:
6222 								if (crp->Kdata->GetIntValue(i))
6223 									tm= 0;               // Nullable
6224 
6225 								break;
6226 							case FLD_FLAG:
6227 								flg = crp->Kdata->GetIntValue(i);
6228 								break;
6229 							case FLD_FORMAT:
6230 								fmt= (crp->Kdata) ? crp->Kdata->GetCharValue(i) : NULL;
6231 								break;
6232 							case FLD_REM:
6233 								rem= crp->Kdata->GetCharValue(i);
6234 								break;
6235 								//          case FLD_CHARSET:
6236 															// No good because remote table is already translated
6237 								//            if (*(csn= crp->Kdata->GetCharValue(i)))
6238 								//              cs= get_charset_by_name(csn, 0);
6239 
6240 								//            break;
6241 							case FLD_DEFAULT:
6242 								dft= crp->Kdata->GetCharValue(i);
6243 								break;
6244 							case FLD_EXTRA:
6245 								xtra= crp->Kdata->GetCharValue(i);
6246 
6247 								// Auto_increment is not supported yet
6248 								if (!stricmp(xtra, "AUTO_INCREMENT"))
6249 									xtra= NULL;
6250 
6251 								break;
6252 							case FLD_KEY:
6253 								if (ttp == TAB_VIR)
6254 									key= crp->Kdata->GetCharValue(i);
6255 
6256 								break;
6257 							case FLD_SCHEM:
6258 #if defined(ODBC_SUPPORT) || defined(JAVA_SUPPORT)
6259 								if ((ttp == TAB_ODBC || ttp == TAB_JDBC) && crp->Kdata) {
6260 									if (schem && stricmp(schem, crp->Kdata->GetCharValue(i))) {
6261 										sprintf(g->Message,
6262 											"Several %s tables found, specify DBNAME", tab);
6263 										rc= HA_ERR_INTERNAL_ERROR;
6264 										goto err;
6265 									} else if (!schem)
6266 										schem= crp->Kdata->GetCharValue(i);
6267 
6268 								} // endif ttp
6269 #endif   // ODBC_SUPPORT	||				 JAVA_SUPPORT
6270 							default:
6271 								break;                 // Ignore
6272 						} // endswitch Fld
6273 
6274 #if defined(ODBC_SUPPORT)
6275 					if (ttp == TAB_ODBC) {
6276 						int  plgtyp;
6277 						bool w= false;            // Wide character type
6278 
6279 						// typ must be PLG type, not SQL type
6280 						if (!(plgtyp= TranslateSQLType(typ, dec, prec, v, w))) {
6281 							if (GetTypeConv() == TPC_SKIP) {
6282 								// Skip this column
6283 								sprintf(g->Message, "Column %s skipped (unsupported type %d)",
6284 									cnm, typ);
6285 								push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6286 								continue;
6287 							} else {
6288 								sprintf(g->Message, "Unsupported SQL type %d", typ);
6289 								rc= HA_ERR_INTERNAL_ERROR;
6290 								goto err;
6291 							} // endif type_conv
6292 
6293 						} else
6294 							typ= plgtyp;
6295 
6296 						switch (typ) {
6297 							case TYPE_STRING:
6298 								if (w) {
6299 									sprintf(g->Message, "Column %s is wide characters", cnm);
6300 									push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 0, g->Message);
6301 								} // endif w
6302 
6303 								break;
6304 							case TYPE_DOUBLE:
6305 								// Some data sources do not count dec in length (prec)
6306 								prec += (dec + 2);        // To be safe
6307 								break;
6308 							case TYPE_DECIM:
6309 								prec= len;
6310 								break;
6311 							default:
6312 								dec= 0;
6313 						} // endswitch typ
6314 
6315 					} else
6316 #endif   // ODBC_SUPPORT
6317 #if defined(JAVA_SUPPORT)
6318 						if (ttp == TAB_JDBC) {
6319 							int  plgtyp;
6320 
6321 							// typ must be PLG type, not SQL type
6322 							if (!(plgtyp= TranslateJDBCType(typ, tn, dec, prec, v))) {
6323 								if (GetTypeConv() == TPC_SKIP) {
6324 									// Skip this column
6325 									sprintf(g->Message, "Column %s skipped (unsupported type %d)",
6326 										cnm, typ);
6327 									push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6328 									continue;
6329 								} else {
6330 									sprintf(g->Message, "Unsupported SQL type %d", typ);
6331 									rc= HA_ERR_INTERNAL_ERROR;
6332 									goto err;
6333 								} // endif type_conv
6334 
6335 							} else
6336 								typ= plgtyp;
6337 
6338 							switch (typ) {
6339 								case TYPE_DOUBLE:
6340 								case TYPE_DECIM:
6341 									// Some data sources do not count dec in length (prec)
6342 									prec += (dec + 2);        // To be safe
6343 									break;
6344 								default:
6345 									dec= 0;
6346 							} // endswitch typ
6347 
6348 						} else
6349 #endif   // ODBC_SUPPORT
6350 							// Make the arguments as required by add_fields
6351 							if (typ == TYPE_DOUBLE)
6352 								prec= len;
6353 
6354 						if (typ == TYPE_DATE)
6355 							prec= 0;
6356 
6357 						// Now add the field
6358 						if (add_field(&sql, ttp, cnm, typ, prec, dec, key, tm, rem, dft, xtra,
6359 								fmt, flg, dbf, v))
6360 							rc= HA_ERR_OUT_OF_MEM;
6361 				} // endfor i
6362 
6363 			} // endif fnc
6364 
6365 			if (!rc)
6366 				rc= init_table_share(thd, table_s, create_info, &sql);
6367 
6368 			//g->jump_level--;
6369 			//PopUser(xp);
6370 			//return rc;
6371 		} else {
6372 			rc= HA_ERR_UNSUPPORTED;
6373 		} // endif ok
6374 
6375 	} catch (int n) {
6376 		if (trace(1))
6377 			htrc("Exception %d: %s\n", n, g->Message);
6378 		rc= HA_ERR_INTERNAL_ERROR;
6379 	} catch (const char *msg) {
6380 		strcpy(g->Message, msg);
6381 		rc= HA_ERR_INTERNAL_ERROR;
6382 	} // end catch
6383 
6384  err:
6385   if (rc)
6386     my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6387 
6388 	PopUser(xp);
6389 	return rc;
6390 } // end of connect_assisted_discovery
6391 
6392 /**
6393   Get the database name from a qualified table name.
6394 */
6395 char *ha_connect::GetDBfromName(const char *name)
6396 {
6397   char *db, dbname[128], tbname[128];
6398 
6399   if (filename_to_dbname_and_tablename(name, dbname, sizeof(dbname),
6400                                              tbname, sizeof(tbname)))
6401     *dbname= 0;
6402 
6403   if (*dbname) {
6404     assert(xp && xp->g);
6405     db= (char*)PlugSubAlloc(xp->g, NULL, strlen(dbname + 1));
6406     strcpy(db, dbname);
6407   } else
6408     db= NULL;
6409 
6410   return db;
6411 } // end of GetDBfromName
6412 
6413 
6414 /**
6415   @brief
6416   create() is called to create a database. The variable name will have the name
6417   of the table.
6418 
6419   @details
6420   When create() is called you do not need to worry about
6421   opening the table. Also, the .frm file will have already been
6422   created so adjusting create_info is not necessary. You can overwrite
6423   the .frm file at this point if you wish to change the table
6424   definition, but there are no methods currently provided for doing
6425   so.
6426 
6427   Called from handle.cc by ha_create_table().
6428 
6429   @note
6430   Currently we do some checking on the create definitions and stop
6431   creating if an error is found. We wish we could change the table
6432   definition such as providing a default table type. However, as said
6433   above, there are no method to do so.
6434 
6435   @see
6436   ha_create_table() in handle.cc
6437 */
6438 
6439 int ha_connect::create(const char *name, TABLE *table_arg,
6440                        HA_CREATE_INFO *create_info)
6441 {
6442   int     rc= RC_OK;
6443   bool    dbf, inward;
6444   Field* *field;
6445   Field  *fp;
6446   TABTYPE type;
6447   TABLE  *st= table;                       // Probably unuseful
6448   THD    *thd= ha_thd();
6449   LEX_CSTRING cnc = table_arg->s->connect_string;
6450 #if defined(WITH_PARTITION_STORAGE_ENGINE)
6451   partition_info *part_info= table_arg->part_info;
6452 #else		// !WITH_PARTITION_STORAGE_ENGINE
6453 #define part_info 0
6454 #endif  // !WITH_PARTITION_STORAGE_ENGINE
6455   xp= GetUser(thd, xp);
6456   PGLOBAL g= xp->g;
6457 
6458   DBUG_ENTER("ha_connect::create");
6459   /*
6460     This assignment fixes test failures if some
6461     "ALTER TABLE t1 ADD KEY(a)" query exits on ER_ACCESS_DENIED_ERROR
6462     (e.g. on missing FILE_ACL). All following "CREATE TABLE" failed with
6463     "ERROR 1105: CONNECT index modification should be in-place"
6464     TODO: check with Olivier.
6465   */
6466   g->Xchk= NULL;
6467   int  sqlcom= thd_sql_command(table_arg->in_use);
6468   PTOS options= GetTableOptionStruct(table_arg->s);
6469 
6470   table= table_arg;         // Used by called functions
6471 
6472   if (trace(1))
6473     htrc("create: this=%p thd=%p xp=%p g=%p sqlcom=%d name=%s\n",
6474            this, thd, xp, g, sqlcom, GetTableName());
6475 
6476   // CONNECT engine specific table options:
6477   DBUG_ASSERT(options);
6478   type= GetTypeID(options->type);
6479 
6480   // Check table type
6481   if (type == TAB_UNDEF) {
6482     options->type= (options->srcdef)  ? "MYSQL" :
6483 #if defined(REST_SUPPORT)
6484                    (options->http) ? "JSON" :
6485 #endif   // REST_SUPPORT
6486                    (options->tabname) ? "PROXY" : "DOS";
6487     type= GetTypeID(options->type);
6488     sprintf(g->Message, "No table_type. Will be set to %s", options->type);
6489 
6490     if (sqlcom == SQLCOM_CREATE_TABLE)
6491       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6492 
6493   } else if (type == TAB_NIY) {
6494     sprintf(g->Message, "Unsupported table type %s", options->type);
6495     my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6496     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6497   } // endif ttp
6498 
6499   if (check_privileges(thd, options, GetDBfromName(name)))
6500     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6501 
6502   inward= IsFileType(type) && !options->filename &&
6503 		     ((type != TAB_JSON && type != TAB_BSON) || !cnc.length);
6504 
6505   if (options->data_charset) {
6506     const CHARSET_INFO *data_charset;
6507 
6508     if (!(data_charset= get_charset_by_csname(options->data_charset,
6509                                               MY_CS_PRIMARY, MYF(0)))) {
6510       my_error(ER_UNKNOWN_CHARACTER_SET, MYF(0), options->data_charset);
6511       DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6512       } // endif charset
6513 
6514     if (type == TAB_XML && data_charset != &my_charset_utf8_general_ci) {
6515       my_printf_error(ER_UNKNOWN_ERROR,
6516                       "DATA_CHARSET='%s' is not supported for TABLE_TYPE=XML",
6517                         MYF(0), options->data_charset);
6518       DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6519       } // endif utf8
6520 
6521     } // endif charset
6522 
6523   if (!g) {
6524     rc= HA_ERR_INTERNAL_ERROR;
6525     DBUG_RETURN(rc);
6526   } else
6527     dbf= (GetTypeID(options->type) == TAB_DBF && !options->catfunc);
6528 
6529   // Can be null in ALTER TABLE
6530   if (create_info->alias.str)
6531     // Check whether a table is defined on itself
6532     switch (type) {
6533       case TAB_PRX:
6534       case TAB_XCL:
6535       case TAB_PIVOT:
6536       case TAB_OCCUR:
6537         if (options->srcdef) {
6538           strcpy(g->Message, "Cannot check looping reference");
6539           push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6540         } else if (options->tabname) {
6541           if (!stricmp(options->tabname, create_info->alias.str) &&
6542              (!options->dbname ||
6543               !stricmp(options->dbname, table_arg->s->db.str))) {
6544             sprintf(g->Message, "A %s table cannot refer to itself",
6545                                 options->type);
6546             my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6547             DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6548             } // endif tab
6549 
6550         } else {
6551           strcpy(g->Message, "Missing object table name or definition");
6552           my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6553           DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6554         } // endif tabname
6555 
6556 				// fall through
6557       case TAB_MYSQL:
6558         if (!part_info)
6559        {const char *src= options->srcdef;
6560 				PCSZ host, db, tab= options->tabname;
6561         int  port;
6562 
6563         host= GetListOption(g, "host", options->oplist, NULL);
6564         db= GetStringOption("database", NULL);
6565         port= atoi(GetListOption(g, "port", options->oplist, "0"));
6566 
6567         if (create_info->connect_string.str &&
6568             create_info->connect_string.length) {
6569           char   *dsn= strz(g, create_info->connect_string);
6570           PMYDEF  mydef= new(g) MYSQLDEF();
6571 
6572           mydef->SetName(create_info->alias.str);
6573 
6574           if (!mydef->ParseURL(g, dsn, false)) {
6575             if (mydef->GetHostname())
6576               host= mydef->GetHostname();
6577 
6578 						if (mydef->GetTabschema())
6579 							db= mydef->GetTabschema();
6580 
6581             if (mydef->GetTabname())
6582               tab= mydef->GetTabname();
6583 
6584             if (mydef->GetPortnumber())
6585               port= mydef->GetPortnumber();
6586 
6587           } else {
6588             my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6589             DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6590           } // endif ParseURL
6591 
6592           } // endif connect_string
6593 
6594         if (CheckSelf(g, table_arg->s, host, db, tab, src, port)) {
6595           my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6596           DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6597           } // endif CheckSelf
6598 
6599        } break;
6600       default: /* do nothing */;
6601         break;
6602      } // endswitch ttp
6603 
6604   if (type == TAB_XML) {
6605     bool dom;                  // True: MS-DOM, False libxml2
6606 		PCSZ xsup= GetListOption(g, "Xmlsup", options->oplist, "*");
6607 
6608     // Note that if no support is specified, the default is MS-DOM
6609     // on Windows and libxml2 otherwise
6610     switch (toupper(*xsup)) {
6611       case '*':
6612 #if defined(_WIN32)
6613         dom= true;
6614 #else   // !_WIN32
6615         dom= false;
6616 #endif  // !_WIN32
6617         break;
6618       case 'M':
6619       case 'D':
6620         dom= true;
6621         break;
6622       default:
6623         dom= false;
6624         break;
6625       } // endswitch xsup
6626 
6627 #if !defined(DOMDOC_SUPPORT)
6628     if (dom) {
6629       strcpy(g->Message, "MS-DOM not supported by this version");
6630       xsup= NULL;
6631       } // endif DomDoc
6632 #endif   // !DOMDOC_SUPPORT
6633 
6634 #if !defined(LIBXML2_SUPPORT)
6635     if (!dom) {
6636       strcpy(g->Message, "libxml2 not supported by this version");
6637       xsup= NULL;
6638       } // endif Libxml2
6639 #endif   // !LIBXML2_SUPPORT
6640 
6641     if (!xsup) {
6642       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6643       rc= HA_ERR_INTERNAL_ERROR;
6644       DBUG_RETURN(rc);
6645       } // endif xsup
6646 
6647     } // endif type
6648 
6649   if (type == TAB_JSON) {
6650     int pretty= atoi(GetListOption(g, "Pretty", options->oplist, "2"));
6651 
6652     if (!options->lrecl && pretty != 2) {
6653       sprintf(g->Message, "LRECL must be specified for pretty=%d", pretty);
6654       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6655       rc= HA_ERR_INTERNAL_ERROR;
6656       DBUG_RETURN(rc);
6657       } // endif lrecl
6658 
6659     } // endif type	JSON
6660 
6661 	if (type == TAB_CSV) {
6662 		const char *sep= options->separator;
6663 
6664 		if (sep && strlen(sep) > 1) {
6665 			sprintf(g->Message, "Invalid separator %s", sep);
6666 			my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6667 			rc= HA_ERR_INTERNAL_ERROR;
6668 			DBUG_RETURN(rc);
6669 		} // endif sep
6670 
6671 	} // endif type	CSV
6672 
6673   // Check column types
6674   for (field= table_arg->field; *field; field++) {
6675     fp= *field;
6676 
6677     if (fp->vcol_info && !fp->stored_in_db)
6678       continue;            // This is a virtual column
6679 
6680     if (fp->flags & AUTO_INCREMENT_FLAG) {
6681       strcpy(g->Message, "Auto_increment is not supported yet");
6682       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6683       rc= HA_ERR_INTERNAL_ERROR;
6684       DBUG_RETURN(rc);
6685       } // endif flags
6686 
6687     if (fp->flags & (BLOB_FLAG | ENUM_FLAG | SET_FLAG)) {
6688       sprintf(g->Message, "Unsupported type for column %s",
6689                           fp->field_name.str);
6690       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6691       rc= HA_ERR_INTERNAL_ERROR;
6692       DBUG_RETURN(rc);
6693       } // endif flags
6694 
6695     if (type == TAB_VIR)
6696       if (!fp->option_struct || !fp->option_struct->special) {
6697         strcpy(g->Message, "Virtual tables accept only special or virtual columns");
6698         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6699         rc= HA_ERR_INTERNAL_ERROR;
6700         DBUG_RETURN(rc);
6701         } // endif special
6702 
6703     switch (fp->type()) {
6704       case MYSQL_TYPE_SHORT:
6705       case MYSQL_TYPE_LONG:
6706       case MYSQL_TYPE_FLOAT:
6707       case MYSQL_TYPE_DOUBLE:
6708       case MYSQL_TYPE_TIMESTAMP:
6709       case MYSQL_TYPE_DATE:
6710       case MYSQL_TYPE_TIME:
6711       case MYSQL_TYPE_DATETIME:
6712       case MYSQL_TYPE_YEAR:
6713       case MYSQL_TYPE_NEWDATE:
6714       case MYSQL_TYPE_LONGLONG:
6715       case MYSQL_TYPE_TINY:
6716       case MYSQL_TYPE_DECIMAL:
6717       case MYSQL_TYPE_NEWDECIMAL:
6718       case MYSQL_TYPE_INT24:
6719         break;                     // Ok
6720       case MYSQL_TYPE_VARCHAR:
6721       case MYSQL_TYPE_VAR_STRING:
6722       case MYSQL_TYPE_STRING:
6723 #if 0
6724         if (!fp->field_length) {
6725           sprintf(g->Message, "Unsupported 0 length for column %s",
6726                               fp->field_name.str);
6727           rc= HA_ERR_INTERNAL_ERROR;
6728           my_printf_error(ER_UNKNOWN_ERROR,
6729                           "Unsupported 0 length for column %s",
6730                           MYF(0), fp->field_name.str);
6731           DBUG_RETURN(rc);
6732           } // endif fp
6733 #endif // 0
6734         break;                     // To be checked
6735       case MYSQL_TYPE_BIT:
6736       case MYSQL_TYPE_NULL:
6737       case MYSQL_TYPE_ENUM:
6738       case MYSQL_TYPE_SET:
6739       case MYSQL_TYPE_TINY_BLOB:
6740       case MYSQL_TYPE_MEDIUM_BLOB:
6741       case MYSQL_TYPE_LONG_BLOB:
6742       case MYSQL_TYPE_BLOB:
6743       case MYSQL_TYPE_GEOMETRY:
6744       default:
6745 //      fprintf(stderr, "Unsupported type column %s\n", fp->field_name.str);
6746         sprintf(g->Message, "Unsupported type for column %s",
6747                             fp->field_name.str);
6748         rc= HA_ERR_INTERNAL_ERROR;
6749         my_printf_error(ER_UNKNOWN_ERROR, "Unsupported type for column %s",
6750                         MYF(0), fp->field_name.str);
6751         DBUG_RETURN(rc);
6752         break;
6753       } // endswitch type
6754 
6755     if ((fp)->real_maybe_null() && !IsTypeNullable(type)) {
6756       my_printf_error(ER_UNKNOWN_ERROR,
6757                       "Table type %s does not support nullable columns",
6758                       MYF(0), options->type);
6759       DBUG_RETURN(HA_ERR_UNSUPPORTED);
6760       } // endif !nullable
6761 
6762     if (dbf) {
6763       bool b= false;
6764 
6765       if ((b= fp->field_name.length > 10))
6766         sprintf(g->Message, "DBF: Column name '%s' is too long (max=10)",
6767                             fp->field_name.str);
6768       else if ((b= fp->field_length > 255))
6769         sprintf(g->Message, "DBF: Column length too big for '%s' (max=255)",
6770                             fp->field_name.str);
6771 
6772       if (b) {
6773         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6774         rc= HA_ERR_INTERNAL_ERROR;
6775         DBUG_RETURN(rc);
6776         } // endif b
6777 
6778       } // endif dbf
6779 
6780     } // endfor field
6781 
6782   if ((sqlcom == SQLCOM_CREATE_TABLE || *GetTableName() == '#') && inward) {
6783     // The file name is not specified, create a default file in
6784     // the database directory named table_name.table_type.
6785     // (temporarily not done for XML because a void file causes
6786     // the XML parsers to report an error on the first Insert)
6787     char buf[_MAX_PATH], fn[_MAX_PATH], dbpath[_MAX_PATH], lwt[12];
6788     int  h;
6789 
6790     // Check for incompatible options
6791     if (options->sepindex) {
6792       my_message(ER_UNKNOWN_ERROR,
6793             "SEPINDEX is incompatible with unspecified file name", MYF(0));
6794       DBUG_RETURN(HA_ERR_UNSUPPORTED);
6795 		} else if (GetTypeID(options->type) == TAB_VEC) {
6796 			if (!table->s->max_rows || options->split) {
6797 				my_printf_error(ER_UNKNOWN_ERROR,
6798 					"%s tables whose file name is unspecified cannot be split",
6799 					MYF(0), options->type);
6800 				DBUG_RETURN(HA_ERR_UNSUPPORTED);
6801 			} else if (options->header == 2) {
6802 				my_printf_error(ER_UNKNOWN_ERROR,
6803 					"header=2 is not allowed for %s tables whose file name is unspecified",
6804 					MYF(0), options->type);
6805 				DBUG_RETURN(HA_ERR_UNSUPPORTED);
6806 			} // endif's
6807 
6808 		} else if (options->zipped) {
6809 			my_message(ER_UNKNOWN_ERROR,
6810 				"ZIPPED is incompatible with unspecified file name", MYF(0));
6811 			DBUG_RETURN(HA_ERR_UNSUPPORTED);
6812 		}	// endif's options
6813 
6814     // Fold type to lower case
6815     for (int i= 0; i < 12; i++)
6816       if (!options->type[i]) {
6817         lwt[i]= 0;
6818         break;
6819       } else
6820         lwt[i]= tolower(options->type[i]);
6821 
6822     if (part_info) {
6823       char *p;
6824 
6825       strcpy(dbpath, name);
6826       p= strrchr(dbpath, slash);
6827       strncpy(partname, ++p, sizeof(partname) - 1);
6828       strcat(strcat(strcpy(buf, p), "."), lwt);
6829       *p= 0;
6830     } else {
6831       strcat(strcat(strcpy(buf, GetTableName()), "."), lwt);
6832       sprintf(g->Message, "No file name. Table will use %s", buf);
6833 
6834       if (sqlcom == SQLCOM_CREATE_TABLE)
6835         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6836 
6837       strcat(strcat(strcpy(dbpath, "./"), table->s->db.str), "/");
6838     } // endif part_info
6839 
6840     PlugSetPath(fn, buf, dbpath);
6841 
6842     if ((h= ::open(fn, O_CREAT | O_EXCL, 0666)) == -1) {
6843       if (errno == EEXIST)
6844         sprintf(g->Message, "Default file %s already exists", fn);
6845       else
6846         sprintf(g->Message, "Error %d creating file %s", errno, fn);
6847 
6848       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6849     } else
6850       ::close(h);
6851 
6852     if ((type == TAB_FMT || options->readonly) && sqlcom == SQLCOM_CREATE_TABLE)
6853       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0,
6854         "Congratulation, you just created a read-only void table!");
6855 
6856     } // endif sqlcom
6857 
6858   if (trace(1))
6859     htrc("xchk=%p createas=%d\n", g->Xchk, g->Createas);
6860 
6861 	if (options->zipped) {
6862 #if defined(ZIP_SUPPORT)
6863 		// Check whether the zip entry must be made from a file
6864 		PCSZ fn= GetListOption(g, "Load", options->oplist, NULL);
6865 
6866 		if (fn) {
6867 			char zbuf[_MAX_PATH], buf[_MAX_PATH], dbpath[_MAX_PATH];
6868 			PCSZ entry= GetListOption(g, "Entry", options->oplist, NULL);
6869 			PCSZ a= GetListOption(g, "Append", options->oplist, "NO");
6870 			bool append= *a == '1' || *a == 'Y' || *a == 'y' || !stricmp(a, "ON");
6871 			PCSZ m= GetListOption(g, "Mulentries", options->oplist, "NO");
6872 			bool mul= *m == '1' || *m == 'Y' || *m == 'y' || !stricmp(m, "ON");
6873 
6874 			strcat(strcat(strcpy(dbpath, "./"), table->s->db.str), "/");
6875 			PlugSetPath(zbuf, options->filename, dbpath);
6876 			PlugSetPath(buf, fn, dbpath);
6877 
6878 			if (ZipLoadFile(g, zbuf, buf, entry, append, mul)) {
6879 				my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6880 				DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6881 			}	// endif LoadFile
6882 
6883 		}	// endif fn
6884 #else   // !ZIP_SUPPORT
6885 		my_message(ER_UNKNOWN_ERROR, "Option ZIP not supported", MYF(0));
6886 		DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6887 #endif  // !ZIP_SUPPORT
6888 	}	// endif zipped
6889 
6890   // To check whether indexes have to be made or remade
6891   if (!g->Xchk) {
6892     PIXDEF xdp;
6893 
6894     // We should be in CREATE TABLE, ALTER_TABLE or CREATE INDEX
6895     if (!(sqlcom == SQLCOM_CREATE_TABLE || sqlcom == SQLCOM_ALTER_TABLE ||
6896           sqlcom == SQLCOM_CREATE_INDEX || sqlcom == SQLCOM_DROP_INDEX))
6897 //         (sqlcom == SQLCOM_CREATE_INDEX && part_info) ||
6898 //         (sqlcom == SQLCOM_DROP_INDEX && part_info)))
6899       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0,
6900         "Unexpected command in create, please contact CONNECT team");
6901 
6902     if (part_info && !inward)
6903       strncpy(partname, decode(g, strrchr(name, '#') + 1), sizeof(partname) - 1);
6904 //    strcpy(partname, part_info->curr_part_elem->partition_name);
6905 
6906     if (g->Alchecked == 0 &&
6907         (!IsFileType(type) || FileExists(options->filename, false))) {
6908       if (part_info) {
6909         sprintf(g->Message, "Data repartition in %s is unchecked", partname);
6910         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, g->Message);
6911       } else if (sqlcom == SQLCOM_ALTER_TABLE) {
6912         // This is an ALTER to CONNECT from another engine.
6913         // It cannot be accepted because the table data would be modified
6914         // except when the target file does not exist.
6915         strcpy(g->Message, "Operation denied. Table data would be modified.");
6916         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6917         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
6918       } // endif part_info
6919 
6920       } // endif outward
6921 
6922     // Get the index definitions
6923     if ((xdp= GetIndexInfo()) || sqlcom == SQLCOM_DROP_INDEX) {
6924       if (options->multiple) {
6925         strcpy(g->Message, "Multiple tables are not indexable");
6926         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6927         rc= HA_ERR_UNSUPPORTED;
6928       } else if (options->compressed) {
6929         strcpy(g->Message, "Compressed tables are not indexable");
6930         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6931         rc= HA_ERR_UNSUPPORTED;
6932       } else if (GetIndexType(type) == 1) {
6933         PDBUSER dup= PlgGetUser(g);
6934         PCATLG  cat= (dup) ? dup->Catalog : NULL;
6935 
6936 				if (SetDataPath(g, table_arg->s->db.str)) {
6937 					my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6938 					rc= HA_ERR_INTERNAL_ERROR;
6939 				} else if (cat) {
6940           if (part_info)
6941             strncpy(partname,
6942                     decode(g, strrchr(name, (inward ? slash : '#')) + 1),
6943 										sizeof(partname) - 1);
6944 
6945           if ((rc= optimize(table->in_use, NULL))) {
6946             htrc("Create rc=%d %s\n", rc, g->Message);
6947             my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6948             rc= HA_ERR_INTERNAL_ERROR;
6949           } else
6950             CloseTable(g);
6951 
6952           } // endif cat
6953 
6954       } else if (GetIndexType(type) == 3) {
6955         if (CheckVirtualIndex(table_arg->s)) {
6956           my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6957           rc= HA_ERR_UNSUPPORTED;
6958           } // endif Check
6959 
6960       } else if (!GetIndexType(type)) {
6961         sprintf(g->Message, "Table type %s is not indexable", options->type);
6962         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
6963         rc= HA_ERR_UNSUPPORTED;
6964       } // endif index type
6965 
6966       } // endif xdp
6967 
6968   } else {
6969     // This should not happen anymore with indexing new way
6970     my_message(ER_UNKNOWN_ERROR,
6971                "CONNECT index modification should be in-place", MYF(0));
6972     DBUG_RETURN(HA_ERR_UNSUPPORTED);
6973   } // endif Xchk
6974 
6975   table= st;
6976   DBUG_RETURN(rc);
6977 } // end of create
6978 
6979 /**
6980   Used to check whether a file based outward table can be populated by
6981   an ALTER TABLE command. The conditions are:
6982   - file does not exist or is void
6983   - user has file privilege
6984 */
6985 bool ha_connect::FileExists(const char *fn, bool bf)
6986 {
6987   if (!fn || !*fn)
6988     return false;
6989   else if (IsPartitioned() && bf)
6990     return true;
6991 
6992   if (table) {
6993     const char *s;
6994 		char  tfn[_MAX_PATH], filename[_MAX_PATH], path[_MAX_PATH];
6995 		bool  b= false;
6996     int   n;
6997     struct stat info;
6998 
6999 #if defined(_WIN32)
7000     s= "\\";
7001 #else   // !_WIN32
7002     s= "/";
7003 #endif  // !_WIN32
7004     if (IsPartitioned()) {
7005       sprintf(tfn, fn, GetPartName());
7006 
7007       // This is to avoid an initialization error raised by the
7008       // test on check_table_flags made in ha_partition::open
7009       // that can fail if some partition files are empty.
7010       b= true;
7011     } else
7012       strcpy(tfn, fn);
7013 
7014     strcat(strcat(strcat(strcpy(path, "."), s), table->s->db.str), s);
7015     PlugSetPath(filename, tfn, path);
7016     n= stat(filename, &info);
7017 
7018     if (n < 0) {
7019       if (errno != ENOENT) {
7020         char buf[_MAX_PATH + 20];
7021 
7022         sprintf(buf, "Error %d for file %s", errno, filename);
7023         push_warning(table->in_use, Sql_condition::WARN_LEVEL_WARN, 0, buf);
7024         return true;
7025       } else
7026         return false;
7027 
7028     } else
7029       return (info.st_size || b) ? true : false;
7030 
7031     } // endif table
7032 
7033   return true;
7034 } // end of FileExists
7035 
7036 // Called by SameString and NoFieldOptionChange
7037 bool ha_connect::CheckString(PCSZ str1, PCSZ str2)
7038 {
7039   bool  b1= (!str1 || !*str1), b2= (!str2 || !*str2);
7040 
7041   if (b1 && b2)
7042     return true;
7043   else if ((b1 && !b2) || (!b1 && b2) || stricmp(str1, str2))
7044     return false;
7045 
7046   return true;
7047 } // end of CheckString
7048 
7049 /**
7050   check whether a string option have changed
7051   */
7052 bool ha_connect::SameString(TABLE *tab, PCSZ opn)
7053 {
7054   PCSZ str1, str2;
7055 
7056   tshp= tab->s;                 // The altered table
7057   str1= GetStringOption(opn);
7058   tshp= NULL;
7059   str2= GetStringOption(opn);
7060   return CheckString(str1, str2);
7061 } // end of SameString
7062 
7063 /**
7064   check whether a Boolean option have changed
7065   */
7066 bool ha_connect::SameBool(TABLE *tab, PCSZ opn)
7067 {
7068   bool b1, b2;
7069 
7070   tshp= tab->s;                 // The altered table
7071   b1= GetBooleanOption(opn, false);
7072   tshp= NULL;
7073   b2= GetBooleanOption(opn, false);
7074   return (b1 == b2);
7075 } // end of SameBool
7076 
7077 /**
7078   check whether an integer option have changed
7079   */
7080 bool ha_connect::SameInt(TABLE *tab, PCSZ opn)
7081 {
7082   int i1, i2;
7083 
7084   tshp= tab->s;                 // The altered table
7085   i1= GetIntegerOption(opn);
7086   tshp= NULL;
7087   i2= GetIntegerOption(opn);
7088 
7089   if (!stricmp(opn, "lrecl"))
7090     return (i1 == i2 || !i1 || !i2);
7091   else if (!stricmp(opn, "ending"))
7092     return (i1 == i2 || i1 <= 0 || i2 <= 0);
7093   else
7094     return (i1 == i2);
7095 
7096 } // end of SameInt
7097 
7098 /**
7099   check whether a field option have changed
7100   */
7101 bool ha_connect::NoFieldOptionChange(TABLE *tab)
7102 {
7103   bool rc= true;
7104   ha_field_option_struct *fop1, *fop2;
7105   Field* *fld1= table->s->field;
7106   Field* *fld2= tab->s->field;
7107 
7108   for (; rc && *fld1 && *fld2; fld1++, fld2++) {
7109     fop1= (*fld1)->option_struct;
7110     fop2= (*fld2)->option_struct;
7111 
7112     rc= (fop1->offset == fop2->offset &&
7113          fop1->fldlen == fop2->fldlen &&
7114          CheckString(fop1->dateformat, fop2->dateformat) &&
7115          CheckString(fop1->fieldformat, fop2->fieldformat) &&
7116 			   CheckString(fop1->special, fop2->special));
7117     } // endfor fld
7118 
7119   return rc;
7120 } // end of NoFieldOptionChange
7121 
7122  /**
7123     Check if a storage engine supports a particular alter table in-place
7124 
7125     @param    altered_table     TABLE object for new version of table.
7126     @param    ha_alter_info     Structure describing changes to be done
7127                                 by ALTER TABLE and holding data used
7128                                 during in-place alter.
7129 
7130     @retval   HA_ALTER_ERROR                  Unexpected error.
7131     @retval   HA_ALTER_INPLACE_NOT_SUPPORTED  Not supported, must use copy.
7132     @retval   HA_ALTER_INPLACE_EXCLUSIVE_LOCK Supported, but requires X lock.
7133     @retval   HA_ALTER_INPLACE_COPY_LOCK
7134                                               Supported, but requires SNW lock
7135                                               during main phase. Prepare phase
7136                                               requires X lock.
7137     @retval   HA_ALTER_INPLACE_SHARED_LOCK    Supported, but requires SNW lock.
7138     @retval   HA_ALTER_INPLACE_COPY_NO_LOCK
7139                                               Supported, concurrent reads/writes
7140                                               allowed. However, prepare phase
7141                                               requires X lock.
7142     @retval   HA_ALTER_INPLACE_NO_LOCK        Supported, concurrent
7143                                               reads/writes allowed.
7144 
7145     @note The default implementation uses the old in-place ALTER API
7146     to determine if the storage engine supports in-place ALTER or not.
7147 
7148     @note Called without holding thr_lock.c lock.
7149  */
7150 enum_alter_inplace_result
7151 ha_connect::check_if_supported_inplace_alter(TABLE *altered_table,
7152                                 Alter_inplace_info *ha_alter_info)
7153 {
7154   DBUG_ENTER("check_if_supported_alter");
7155 
7156   bool            idx= false, outward= false;
7157   THD            *thd= ha_thd();
7158   int             sqlcom= thd_sql_command(thd);
7159   TABTYPE         newtyp, type= TAB_UNDEF;
7160   HA_CREATE_INFO *create_info= ha_alter_info->create_info;
7161   PTOS            newopt, oldopt;
7162   xp= GetUser(thd, xp);
7163   PGLOBAL         g= xp->g;
7164 
7165   if (!g || !table) {
7166     my_message(ER_UNKNOWN_ERROR, "Cannot check ALTER operations", MYF(0));
7167     DBUG_RETURN(HA_ALTER_ERROR);
7168     } // endif Xchk
7169 
7170   newopt= altered_table->s->option_struct;
7171   oldopt= table->s->option_struct;
7172 
7173   // If this is the start of a new query, cleanup the previous one
7174   if (xp->CheckCleanup()) {
7175     tdbp= NULL;
7176     valid_info= false;
7177     } // endif CheckCleanup
7178 
7179   g->Alchecked= 1;       // Tested in create
7180   g->Xchk= NULL;
7181   type= GetRealType(oldopt);
7182   newtyp= GetRealType(newopt);
7183 
7184   // No copy algorithm for outward tables
7185   outward= (!IsFileType(type) || (oldopt->filename && *oldopt->filename));
7186 
7187   // Index operations
7188   alter_table_operations index_operations=
7189     ALTER_ADD_INDEX |
7190     ALTER_DROP_INDEX |
7191     ALTER_ADD_NON_UNIQUE_NON_PRIM_INDEX |
7192     ALTER_DROP_NON_UNIQUE_NON_PRIM_INDEX |
7193     ALTER_ADD_UNIQUE_INDEX |
7194     ALTER_DROP_UNIQUE_INDEX |
7195     ALTER_ADD_PK_INDEX |
7196     ALTER_DROP_PK_INDEX;
7197 
7198   alter_table_operations inplace_offline_operations=
7199     ALTER_COLUMN_TYPE_CHANGE_BY_ENGINE |
7200     ALTER_COLUMN_NAME |
7201     ALTER_COLUMN_DEFAULT |
7202     ALTER_CHANGE_CREATE_OPTION |
7203     ALTER_RENAME |
7204     ALTER_PARTITIONED | index_operations;
7205 
7206   if (ha_alter_info->handler_flags & index_operations ||
7207       !SameString(altered_table, "optname") ||
7208       !SameBool(altered_table, "sepindex")) {
7209     if (newopt->multiple) {
7210       strcpy(g->Message, "Multiple tables are not indexable");
7211       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
7212       DBUG_RETURN(HA_ALTER_ERROR);
7213     } else if (newopt->compressed) {
7214       strcpy(g->Message, "Compressed tables are not indexable");
7215       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
7216       DBUG_RETURN(HA_ALTER_ERROR);
7217     } else if (GetIndexType(type) == 1) {
7218       g->Xchk= new(g) XCHK;
7219       PCHK xcp= (PCHK)g->Xchk;
7220 
7221       xcp->oldpix= GetIndexInfo(table->s);
7222       xcp->newpix= GetIndexInfo(altered_table->s);
7223       xcp->oldsep= GetBooleanOption("sepindex", false);
7224       xcp->oldsep= xcp->SetName(g, GetStringOption("optname"));
7225       tshp= altered_table->s;
7226       xcp->newsep= GetBooleanOption("sepindex", false);
7227       xcp->newsep= xcp->SetName(g, GetStringOption("optname"));
7228       tshp= NULL;
7229 
7230       if (trace(1) && g->Xchk)
7231         htrc(
7232           "oldsep=%d newsep=%d oldopn=%s newopn=%s oldpix=%p newpix=%p\n",
7233                 xcp->oldsep, xcp->newsep,
7234                 SVP(xcp->oldopn), SVP(xcp->newopn),
7235                 xcp->oldpix, xcp->newpix);
7236 
7237       if (sqlcom == SQLCOM_ALTER_TABLE)
7238         idx= true;
7239       else
7240         DBUG_RETURN(HA_ALTER_INPLACE_EXCLUSIVE_LOCK);
7241 
7242     } else if (GetIndexType(type) == 3) {
7243       if (CheckVirtualIndex(altered_table->s)) {
7244         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
7245         DBUG_RETURN(HA_ALTER_ERROR);
7246         } // endif Check
7247 
7248     } else if (!GetIndexType(type)) {
7249       sprintf(g->Message, "Table type %s is not indexable", oldopt->type);
7250       my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
7251       DBUG_RETURN(HA_ALTER_ERROR);
7252     } // endif index type
7253 
7254     } // endif index operation
7255 
7256   if (!SameString(altered_table, "filename")) {
7257     if (!outward) {
7258       // Conversion to outward table is only allowed for file based
7259       // tables whose file does not exist.
7260       tshp= altered_table->s;
7261 			PCSZ fn= GetStringOption("filename");
7262       tshp= NULL;
7263 
7264       if (FileExists(fn, false)) {
7265         strcpy(g->Message, "Operation denied. Table data would be lost.");
7266         my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
7267         DBUG_RETURN(HA_ALTER_ERROR);
7268       } else
7269         goto fin;
7270 
7271     } else
7272       goto fin;
7273 
7274     } // endif filename
7275 
7276   /* Is there at least one operation that requires copy algorithm? */
7277   if (ha_alter_info->handler_flags & ~inplace_offline_operations)
7278     goto fin;
7279 
7280   /*
7281     ALTER TABLE tbl_name CONVERT TO CHARACTER SET .. and
7282     ALTER TABLE table_name DEFAULT CHARSET= .. most likely
7283     change column charsets and so not supported in-place through
7284     old API.
7285 
7286     Changing of PACK_KEYS, MAX_ROWS and ROW_FORMAT options were
7287     not supported as in-place operations in old API either.
7288   */
7289   if (create_info->used_fields & (HA_CREATE_USED_CHARSET |
7290                                   HA_CREATE_USED_DEFAULT_CHARSET |
7291                                   HA_CREATE_USED_PACK_KEYS |
7292                                   HA_CREATE_USED_MAX_ROWS) ||
7293       (table->s->row_type != create_info->row_type))
7294     goto fin;
7295 
7296 #if 0
7297   uint table_changes= (ha_alter_info->handler_flags &
7298                        ALTER_COLUMN_TYPE_CHANGE_BY_ENGINE) ?
7299     IS_EQUAL_PACK_LENGTH : IS_EQUAL_YES;
7300 
7301   if (table->file->check_if_incompatible_data(create_info, table_changes)
7302       == COMPATIBLE_DATA_YES)
7303     DBUG_RETURN(HA_ALTER_INPLACE_EXCLUSIVE_LOCK);
7304 #endif // 0
7305 
7306   // This was in check_if_incompatible_data
7307   if (NoFieldOptionChange(altered_table) &&
7308       type == newtyp &&
7309       SameInt(altered_table, "lrecl") &&
7310       SameInt(altered_table, "elements") &&
7311       SameInt(altered_table, "header") &&
7312       SameInt(altered_table, "quoted") &&
7313       SameInt(altered_table, "ending") &&
7314       SameInt(altered_table, "compressed"))
7315     DBUG_RETURN(HA_ALTER_INPLACE_EXCLUSIVE_LOCK);
7316 
7317 fin:
7318   if (idx) {
7319     // Indexing is only supported inplace
7320     my_message(ER_ALTER_OPERATION_NOT_SUPPORTED,
7321       "Alter operations not supported together by CONNECT", MYF(0));
7322     DBUG_RETURN(HA_ALTER_ERROR);
7323   } else if (outward) {
7324     if (IsFileType(type))
7325       push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0,
7326         "This is an outward table, table data were not modified.");
7327 
7328     DBUG_RETURN(HA_ALTER_INPLACE_EXCLUSIVE_LOCK);
7329   } else
7330     DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
7331 
7332 } // end of check_if_supported_inplace_alter
7333 
7334 
7335 /**
7336   check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
7337   if new and old definition are compatible
7338 
7339   @details If there are no other explicit signs like changed number of
7340   fields this function will be called by compare_tables()
7341   (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
7342   file.
7343 
7344   @note: This function is no more called by check_if_supported_inplace_alter
7345 */
7346 
7347 bool ha_connect::check_if_incompatible_data(HA_CREATE_INFO *, uint)
7348 {
7349   DBUG_ENTER("ha_connect::check_if_incompatible_data");
7350   // TO DO: really implement and check it.
7351   push_warning(ha_thd(), Sql_condition::WARN_LEVEL_WARN, 0,
7352       "Unexpected call to check_if_incompatible_data.");
7353   DBUG_RETURN(COMPATIBLE_DATA_NO);
7354 } // end of check_if_incompatible_data
7355 
7356 /****************************************************************************
7357  * CONNECT MRR implementation: use DS-MRR
7358    This is just copied from myisam
7359  ***************************************************************************/
7360 
7361 int ha_connect::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
7362                                      uint n_ranges, uint mode,
7363                                      HANDLER_BUFFER *buf)
7364 {
7365   return ds_mrr.dsmrr_init(this, seq, seq_init_param, n_ranges, mode, buf);
7366 } // end of multi_range_read_init
7367 
7368 int ha_connect::multi_range_read_next(range_id_t *range_info)
7369 {
7370   return ds_mrr.dsmrr_next(range_info);
7371 } // end of multi_range_read_next
7372 
7373 ha_rows ha_connect::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
7374                                                void *seq_init_param,
7375                                                uint n_ranges, uint *bufsz,
7376                                                uint *flags, Cost_estimate *cost)
7377 {
7378   /*
7379     This call is here because there is no location where this->table would
7380     already be known.
7381     TODO: consider moving it into some per-query initialization call.
7382   */
7383   ds_mrr.init(this, table);
7384 
7385   // MMR is implemented for "local" file based tables only
7386   if (!IsFileType(GetRealType(GetTableOptionStruct())))
7387     *flags|= HA_MRR_USE_DEFAULT_IMPL;
7388 
7389   ha_rows rows= ds_mrr.dsmrr_info_const(keyno, seq, seq_init_param, n_ranges,
7390                                         bufsz, flags, cost);
7391   xp->g->Mrr= !(*flags & HA_MRR_USE_DEFAULT_IMPL);
7392   return rows;
7393 } // end of multi_range_read_info_const
7394 
7395 ha_rows ha_connect::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
7396                                          uint key_parts, uint *bufsz,
7397                                          uint *flags, Cost_estimate *cost)
7398 {
7399   ds_mrr.init(this, table);
7400 
7401   // MMR is implemented for "local" file based tables only
7402   if (!IsFileType(GetRealType(GetTableOptionStruct())))
7403     *flags|= HA_MRR_USE_DEFAULT_IMPL;
7404 
7405   ha_rows rows= ds_mrr.dsmrr_info(keyno, n_ranges, keys, key_parts, bufsz,
7406                                   flags, cost);
7407   xp->g->Mrr= !(*flags & HA_MRR_USE_DEFAULT_IMPL);
7408   return rows;
7409 } // end of multi_range_read_info
7410 
7411 
7412 int ha_connect::multi_range_read_explain_info(uint mrr_mode, char *str,
7413                                              size_t size)
7414 {
7415   return ds_mrr.dsmrr_explain_info(mrr_mode, str, size);
7416 } // end of multi_range_read_explain_info
7417 
7418 /* CONNECT MRR implementation ends */
7419 
7420 #if 0
7421 // Does this make sens for CONNECT?
7422 Item *ha_connect::idx_cond_push(uint keyno_arg, Item* idx_cond_arg)
7423 {
7424   pushed_idx_cond_keyno= keyno_arg;
7425   pushed_idx_cond= idx_cond_arg;
7426   in_range_check_pushed_down= TRUE;
7427   if (active_index == pushed_idx_cond_keyno)
7428     mi_set_index_cond_func(file, handler_index_cond_check, this);
7429   return NULL;
7430 }
7431 #endif // 0
7432 
7433 
7434 struct st_mysql_storage_engine connect_storage_engine=
7435 { MYSQL_HANDLERTON_INTERFACE_VERSION };
7436 
7437 /***********************************************************************/
7438 /*  CONNECT global variables definitions.                              */
7439 /***********************************************************************/
7440 #if defined(XMAP)
7441 // Using file mapping for indexes if true
7442 static MYSQL_SYSVAR_BOOL(indx_map, xmap, PLUGIN_VAR_RQCMDARG,
7443        "Using file mapping for indexes", NULL, NULL, 0);
7444 #endif   // XMAP
7445 
7446 #if defined(XMSG)
7447 static MYSQL_SYSVAR_STR(errmsg_dir_path, msg_path,
7448 //     PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
7449        PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
7450        "Path to the directory where are the message files",
7451 //     check_msg_path, update_msg_path,
7452        NULL, NULL,
7453        "../../../../storage/connect/");     // for testing
7454 #endif   // XMSG
7455 
7456 #if defined(JAVA_SUPPORT)
7457 static MYSQL_SYSVAR_STR(jvm_path, JvmPath,
7458 	PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
7459 	"Path to the directory where is the JVM lib",
7460 	//     check_jvm_path, update_jvm_path,
7461 	NULL, NULL,	NULL);
7462 
7463 static MYSQL_SYSVAR_STR(class_path, ClassPath,
7464 	PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
7465 	"Java class path",
7466 	//     check_class_path, update_class_path,
7467 	NULL, NULL, NULL);
7468 #endif   // JAVA_SUPPORT
7469 
7470 
7471 static struct st_mysql_sys_var* connect_system_variables[]= {
7472   MYSQL_SYSVAR(xtrace),
7473   MYSQL_SYSVAR(conv_size),
7474   MYSQL_SYSVAR(type_conv),
7475 #if defined(XMAP)
7476   MYSQL_SYSVAR(indx_map),
7477 #endif   // XMAP
7478   MYSQL_SYSVAR(work_size),
7479   MYSQL_SYSVAR(use_tempfile),
7480   MYSQL_SYSVAR(exact_info),
7481 #if defined(XMSG) || defined(NEWMSG)
7482   MYSQL_SYSVAR(msg_lang),
7483 #endif   // XMSG || NEWMSG
7484 #if defined(XMSG)
7485   MYSQL_SYSVAR(errmsg_dir_path),
7486 #endif   // XMSG
7487 	MYSQL_SYSVAR(json_null),
7488 	MYSQL_SYSVAR(json_all_path),
7489 	MYSQL_SYSVAR(default_depth),
7490   MYSQL_SYSVAR(default_prec),
7491   MYSQL_SYSVAR(json_grp_size),
7492 #if defined(JAVA_SUPPORT)
7493 	MYSQL_SYSVAR(jvm_path),
7494 	MYSQL_SYSVAR(class_path),
7495 	MYSQL_SYSVAR(java_wrapper),
7496 #endif   // JAVA_SUPPORT
7497 #if defined(JAVA_SUPPORT) || defined(CMGO_SUPPORT)
7498 	MYSQL_SYSVAR(enable_mongo),
7499 #endif   // JAVA_SUPPORT || CMGO_SUPPORT
7500 	MYSQL_SYSVAR(cond_push),
7501 #if defined(BSON_SUPPORT)
7502   MYSQL_SYSVAR(force_bson),
7503 #endif   // BSON_SUPPORT
7504   NULL
7505 };
7506 
7507 maria_declare_plugin(connect)
7508 {
7509   MYSQL_STORAGE_ENGINE_PLUGIN,
7510   &connect_storage_engine,
7511   "CONNECT",
7512   "Olivier Bertrand",
7513   "Management of External Data (SQL/NOSQL/MED), including Rest query results",
7514   PLUGIN_LICENSE_GPL,
7515   connect_init_func,                            /* Plugin Init */
7516   connect_done_func,                            /* Plugin Deinit */
7517   0x0107,                                       /* version number (1.07) */
7518   NULL,                                         /* status variables */
7519   connect_system_variables,                     /* system variables */
7520   "1.07.0003",                                  /* string version */
7521 	MariaDB_PLUGIN_MATURITY_STABLE                /* maturity */
7522 }
7523 maria_declare_plugin_end;
7524