1 /*-------------------------------------------------------------------------
2  *
3  * mysql_fdw.c
4  * 		Foreign-data wrapper for remote MySQL servers
5  *
6  * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 2004-2021, EnterpriseDB Corporation.
8  *
9  * IDENTIFICATION
10  * 		mysql_fdw.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 /*
17  * Must be included before mysql.h as it has some conflicting definitions like
18  * list_length, etc.
19  */
20 #include "mysql_fdw.h"
21 
22 #include <dlfcn.h>
23 #include <errmsg.h>
24 #include <mysql.h>
25 #include <stdio.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 
29 #include "access/htup_details.h"
30 #include "access/sysattr.h"
31 #include "access/reloptions.h"
32 #if PG_VERSION_NUM >= 120000
33 #include "access/table.h"
34 #endif
35 #include "commands/defrem.h"
36 #include "commands/explain.h"
37 #include "catalog/heap.h"
38 #include "foreign/fdwapi.h"
39 #include "miscadmin.h"
40 #include "mysql_query.h"
41 #include "nodes/makefuncs.h"
42 #include "nodes/nodeFuncs.h"
43 #include "nodes/nodes.h"
44 #include "optimizer/pathnode.h"
45 #include "optimizer/planmain.h"
46 #if PG_VERSION_NUM < 120000
47 #include "optimizer/var.h"
48 #else
49 #include "optimizer/optimizer.h"
50 #endif
51 #include "optimizer/restrictinfo.h"
52 #include "optimizer/tlist.h"
53 #include "parser/parsetree.h"
54 #include "storage/ipc.h"
55 #include "utils/builtins.h"
56 #include "utils/datum.h"
57 #include "utils/guc.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/syscache.h"
61 
62 /* Declarations for dynamic loading */
63 PG_MODULE_MAGIC;
64 
65 int ((mysql_options) (MYSQL *mysql, enum mysql_option option,
66 					  const void *arg));
67 int ((mysql_stmt_prepare) (MYSQL_STMT *stmt, const char *query,
68 						   unsigned long length));
69 int ((mysql_stmt_execute) (MYSQL_STMT *stmt));
70 int ((mysql_stmt_fetch) (MYSQL_STMT *stmt));
71 int ((mysql_query) (MYSQL *mysql, const char *q));
72 bool ((mysql_stmt_attr_set) (MYSQL_STMT *stmt,
73 							 enum enum_stmt_attr_type attr_type,
74 							 const void *attr));
75 bool ((mysql_stmt_close) (MYSQL_STMT *stmt));
76 bool ((mysql_stmt_reset) (MYSQL_STMT *stmt));
77 bool ((mysql_free_result) (MYSQL_RES *result));
78 bool ((mysql_stmt_bind_param) (MYSQL_STMT *stmt, MYSQL_BIND *bnd));
79 bool ((mysql_stmt_bind_result) (MYSQL_STMT *stmt, MYSQL_BIND *bnd));
80 
81 MYSQL_STMT *((mysql_stmt_init) (MYSQL *mysql));
82 MYSQL_RES *((mysql_stmt_result_metadata) (MYSQL_STMT *stmt));
83 int ((mysql_stmt_store_result) (MYSQL *mysql));
84 MYSQL_ROW((mysql_fetch_row) (MYSQL_RES *result));
85 MYSQL_FIELD *((mysql_fetch_field) (MYSQL_RES *result));
86 MYSQL_FIELD *((mysql_fetch_fields) (MYSQL_RES *result));
87 const char *((mysql_error) (MYSQL *mysql));
88 void ((mysql_close) (MYSQL *sock));
89 MYSQL_RES *((mysql_store_result) (MYSQL *mysql));
90 MYSQL *((mysql_init) (MYSQL *mysql));
91 bool ((mysql_ssl_set) (MYSQL *mysql, const char *key, const char *cert,
92 					   const char *ca, const char *capath,
93 					   const char *cipher));
94 MYSQL *((mysql_real_connect) (MYSQL *mysql, const char *host, const char *user,
95 							  const char *passwd, const char *db,
96 							  unsigned int port, const char *unix_socket,
97 							  unsigned long clientflag));
98 
99 const char *((mysql_get_host_info) (MYSQL *mysql));
100 const char *((mysql_get_server_info) (MYSQL *mysql));
101 int ((mysql_get_proto_info) (MYSQL *mysql));
102 
103 unsigned int ((mysql_stmt_errno) (MYSQL_STMT *stmt));
104 unsigned int ((mysql_errno) (MYSQL *mysql));
105 unsigned int ((mysql_num_fields) (MYSQL_RES *result));
106 unsigned int ((mysql_num_rows) (MYSQL_RES *result));
107 
108 #define DEFAULTE_NUM_ROWS    1000
109 
110 /*
111  * In PG 9.5.1 the number will be 90501,
112  * our version is 2.6.0 so number will be 20600
113  */
114 #define CODE_VERSION   20600
115 
116 /*
117  * Indexes of FDW-private information stored in fdw_private lists.
118  *
119  * These items are indexed with the enum mysqlFdwScanPrivateIndex, so an item
120  * can be fetched with list_nth().  For example, to get the SELECT statement:
121  *		sql = strVal(list_nth(fdw_private, mysqlFdwScanPrivateSelectSql));
122  */
123 enum mysqlFdwScanPrivateIndex
124 {
125 	/* SQL statement to execute remotely (as a String node) */
126 	mysqlFdwScanPrivateSelectSql,
127 
128 	/* Integer list of attribute numbers retrieved by the SELECT */
129 	mysqlFdwScanPrivateRetrievedAttrs,
130 
131 	/*
132 	 * String describing join i.e. names of relations being joined and types
133 	 * of join, added when the scan is join
134 	 */
135 	mysqlFdwScanPrivateRelations,
136 
137 	/*
138 	 * List of Var node lists for constructing the whole-row references of
139 	 * base relations involved in pushed down join.
140 	 */
141 	mysqlFdwPrivateWholeRowLists,
142 
143 	/*
144 	 * Targetlist representing the result fetched from the foreign server if
145 	 * whole-row references are involved.
146 	 */
147 	mysqlFdwPrivateScanTList
148 };
149 
150 extern PGDLLEXPORT void _PG_init(void);
151 extern Datum mysql_fdw_handler(PG_FUNCTION_ARGS);
152 
153 PG_FUNCTION_INFO_V1(mysql_fdw_handler);
154 PG_FUNCTION_INFO_V1(mysql_fdw_version);
155 
156 /*
157  * FDW callback routines
158  */
159 static void mysqlExplainForeignScan(ForeignScanState *node, ExplainState *es);
160 static void mysqlBeginForeignScan(ForeignScanState *node, int eflags);
161 static TupleTableSlot *mysqlIterateForeignScan(ForeignScanState *node);
162 static void mysqlReScanForeignScan(ForeignScanState *node);
163 static void mysqlEndForeignScan(ForeignScanState *node);
164 
165 static List *mysqlPlanForeignModify(PlannerInfo *root, ModifyTable *plan,
166 									Index resultRelation, int subplan_index);
167 static void mysqlBeginForeignModify(ModifyTableState *mtstate,
168 									ResultRelInfo *resultRelInfo,
169 									List *fdw_private, int subplan_index,
170 									int eflags);
171 static TupleTableSlot *mysqlExecForeignInsert(EState *estate,
172 											  ResultRelInfo *resultRelInfo,
173 											  TupleTableSlot *slot,
174 											  TupleTableSlot *planSlot);
175 static void mysqlAddForeignUpdateTargets(Query *parsetree,
176 										 RangeTblEntry *target_rte,
177 										 Relation target_relation);
178 static TupleTableSlot *mysqlExecForeignUpdate(EState *estate,
179 											  ResultRelInfo *resultRelInfo,
180 											  TupleTableSlot *slot,
181 											  TupleTableSlot *planSlot);
182 static TupleTableSlot *mysqlExecForeignDelete(EState *estate,
183 											  ResultRelInfo *resultRelInfo,
184 											  TupleTableSlot *slot,
185 											  TupleTableSlot *planSlot);
186 static void mysqlEndForeignModify(EState *estate,
187 								  ResultRelInfo *resultRelInfo);
188 
189 static void mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
190 								   Oid foreigntableid);
191 static void mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
192 								 Oid foreigntableid);
193 static bool mysqlAnalyzeForeignTable(Relation relation,
194 									 AcquireSampleRowsFunc *func,
195 									 BlockNumber *totalpages);
196 #if PG_VERSION_NUM >= 90500
197 static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root,
198 										RelOptInfo *foreignrel,
199 										Oid foreigntableid,
200 										ForeignPath *best_path, List *tlist,
201 										List *scan_clauses, Plan *outer_plan);
202 #else
203 static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root,
204 										RelOptInfo *foreignrel,
205 										Oid foreigntableid,
206 										ForeignPath *best_path, List *tlist,
207 										List *scan_clauses);
208 #endif
209 static void mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel,
210 							   Cost *startup_cost, Cost *total_cost,
211 							   Oid foreigntableid);
212 #if PG_VERSION_NUM >= 90600
213 static void mysqlGetForeignJoinPaths(PlannerInfo *root,
214 									 RelOptInfo *joinrel,
215 									 RelOptInfo *outerrel,
216 									 RelOptInfo *innerrel,
217 									 JoinType jointype,
218 									 JoinPathExtraData *extra);
219 static bool mysqlRecheckForeignScan(ForeignScanState *node,
220 									TupleTableSlot *slot);
221 #endif
222 
223 #if PG_VERSION_NUM >= 90500
224 static List *mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt,
225 									  Oid serverOid);
226 #endif
227 
228 #if PG_VERSION_NUM >= 110000
229 static void mysqlBeginForeignInsert(ModifyTableState *mtstate,
230 									ResultRelInfo *resultRelInfo);
231 static void mysqlEndForeignInsert(EState *estate,
232 								  ResultRelInfo *resultRelInfo);
233 #endif
234 
235 /*
236  * Helper functions
237  */
238 bool mysql_load_library(void);
239 static void mysql_fdw_exit(int code, Datum arg);
240 static bool mysql_is_column_unique(Oid foreigntableid);
241 
242 static void prepare_query_params(PlanState *node,
243 								 List *fdw_exprs,
244 								 int numParams,
245 								 FmgrInfo **param_flinfo,
246 								 List **param_exprs,
247 								 const char ***param_values,
248 								 Oid **param_types);
249 
250 static void process_query_params(ExprContext *econtext,
251 								 FmgrInfo *param_flinfo,
252 								 List *param_exprs,
253 								 const char **param_values,
254 								 MYSQL_BIND **mysql_bind_buf,
255 								 Oid *param_types);
256 
257 static void bind_stmt_params_and_exec(ForeignScanState *node);
258 #if PG_VERSION_NUM >= 90600
259 static bool mysql_foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
260 								  JoinType jointype, RelOptInfo *outerrel,
261 								  RelOptInfo *innerrel,
262 								  JoinPathExtraData *extra);
263 static List *mysql_adjust_whole_row_ref(PlannerInfo *root,
264 										List *scan_var_list,
265 										List **whole_row_lists,
266 										Bitmapset *relids);
267 static List *mysql_build_scan_list_for_baserel(Oid relid, Index varno,
268 											   Bitmapset *attrs_used,
269 											   List **retrieved_attrs);
270 #endif
271 static void mysql_build_whole_row_constr_info(MySQLFdwExecState *festate,
272 											  TupleDesc tupdesc,
273 											  Bitmapset *relids,
274 											  int max_relid,
275 											  List *whole_row_lists,
276 											  List *scan_tlist,
277 											  List *fdw_scan_tlist);
278 static HeapTuple mysql_get_tuple_with_whole_row(MySQLFdwExecState *festate,
279 												Datum *values,bool *nulls);
280 static HeapTuple mysql_form_whole_row(MySQLWRState *wr_state, Datum *values,
281 									  bool *nulls);
282 
283 void *mysql_dll_handle = NULL;
284 static int wait_timeout = WAIT_TIMEOUT;
285 static int interactive_timeout = INTERACTIVE_TIMEOUT;
286 static void mysql_error_print(MYSQL *conn);
287 static void mysql_stmt_error_print(MySQLFdwExecState *festate,
288 								   const char *msg);
289 static List *getUpdateTargetAttrs(RangeTblEntry *rte);
290 
291 /*
292  * mysql_load_library function dynamically load the mysql's library
293  * libmysqlclient.so. The only reason to load the library using dlopen
294  * is that, mysql and postgres both have function with same name like
295  * "list_delete", "list_delete" and "list_free" which cause compiler
296  * error "duplicate function name" and erroneously linking with a function.
297  * This port of the code is used to avoid the compiler error.
298  *
299  * #define list_delete mysql_list_delete
300  * #include <mysql.h>
301  * #undef list_delete
302  *
303  * But system crashed on function mysql_stmt_close function because
304  * mysql_stmt_close internally calling "list_delete" function which
305  * wrongly binds to postgres' "list_delete" function.
306  *
307  * The dlopen function provides a parameter "RTLD_DEEPBIND" which
308  * solved the binding issue.
309  *
310  * RTLD_DEEPBIND:
311  * Place the lookup scope of the symbols in this library ahead of the
312  * global scope. This means that a self-contained library will use its
313  * own symbols in preference to global symbols with the same name contained
314  * in libraries that have already been loaded.
315  */
316 bool
mysql_load_library(void)317 mysql_load_library(void)
318 {
319 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__FreeBSD__)
320 	/*
321 	 * Mac OS/BSD does not support RTLD_DEEPBIND, but it still works without
322 	 * the RTLD_DEEPBIND
323 	 */
324 	mysql_dll_handle = dlopen(_MYSQL_LIBNAME, RTLD_LAZY);
325 #else
326 	mysql_dll_handle = dlopen(_MYSQL_LIBNAME, RTLD_LAZY | RTLD_DEEPBIND);
327 #endif
328 	if (mysql_dll_handle == NULL)
329 		return false;
330 
331 	_mysql_stmt_bind_param = dlsym(mysql_dll_handle, "mysql_stmt_bind_param");
332 	_mysql_stmt_bind_result = dlsym(mysql_dll_handle, "mysql_stmt_bind_result");
333 	_mysql_stmt_init = dlsym(mysql_dll_handle, "mysql_stmt_init");
334 	_mysql_stmt_prepare = dlsym(mysql_dll_handle, "mysql_stmt_prepare");
335 	_mysql_stmt_execute = dlsym(mysql_dll_handle, "mysql_stmt_execute");
336 	_mysql_stmt_fetch = dlsym(mysql_dll_handle, "mysql_stmt_fetch");
337 	_mysql_query = dlsym(mysql_dll_handle, "mysql_query");
338 	_mysql_stmt_result_metadata = dlsym(mysql_dll_handle, "mysql_stmt_result_metadata");
339 	_mysql_stmt_store_result = dlsym(mysql_dll_handle, "mysql_stmt_store_result");
340 	_mysql_fetch_row = dlsym(mysql_dll_handle, "mysql_fetch_row");
341 	_mysql_fetch_field = dlsym(mysql_dll_handle, "mysql_fetch_field");
342 	_mysql_fetch_fields = dlsym(mysql_dll_handle, "mysql_fetch_fields");
343 	_mysql_stmt_close = dlsym(mysql_dll_handle, "mysql_stmt_close");
344 	_mysql_stmt_reset = dlsym(mysql_dll_handle, "mysql_stmt_reset");
345 	_mysql_free_result = dlsym(mysql_dll_handle, "mysql_free_result");
346 	_mysql_error = dlsym(mysql_dll_handle, "mysql_error");
347 	_mysql_options = dlsym(mysql_dll_handle, "mysql_options");
348 	_mysql_ssl_set = dlsym(mysql_dll_handle, "mysql_ssl_set");
349 	_mysql_real_connect = dlsym(mysql_dll_handle, "mysql_real_connect");
350 	_mysql_close = dlsym(mysql_dll_handle, "mysql_close");
351 	_mysql_init = dlsym(mysql_dll_handle, "mysql_init");
352 	_mysql_stmt_attr_set = dlsym(mysql_dll_handle, "mysql_stmt_attr_set");
353 	_mysql_store_result = dlsym(mysql_dll_handle, "mysql_store_result");
354 	_mysql_stmt_errno = dlsym(mysql_dll_handle, "mysql_stmt_errno");
355 	_mysql_errno = dlsym(mysql_dll_handle, "mysql_errno");
356 	_mysql_num_fields = dlsym(mysql_dll_handle, "mysql_num_fields");
357 	_mysql_num_rows = dlsym(mysql_dll_handle, "mysql_num_rows");
358 	_mysql_get_host_info = dlsym(mysql_dll_handle, "mysql_get_host_info");
359 	_mysql_get_server_info = dlsym(mysql_dll_handle, "mysql_get_server_info");
360 	_mysql_get_proto_info = dlsym(mysql_dll_handle, "mysql_get_proto_info");
361 
362 	if (_mysql_stmt_bind_param == NULL ||
363 		_mysql_stmt_bind_result == NULL ||
364 		_mysql_stmt_init == NULL ||
365 		_mysql_stmt_prepare == NULL ||
366 		_mysql_stmt_execute == NULL ||
367 		_mysql_stmt_fetch == NULL ||
368 		_mysql_query == NULL ||
369 		_mysql_stmt_result_metadata == NULL ||
370 		_mysql_stmt_store_result == NULL ||
371 		_mysql_fetch_row == NULL ||
372 		_mysql_fetch_field == NULL ||
373 		_mysql_fetch_fields == NULL ||
374 		_mysql_stmt_close == NULL ||
375 		_mysql_stmt_reset == NULL ||
376 		_mysql_free_result == NULL ||
377 		_mysql_error == NULL ||
378 		_mysql_options == NULL ||
379 		_mysql_ssl_set == NULL ||
380 		_mysql_real_connect == NULL ||
381 		_mysql_close == NULL ||
382 		_mysql_init == NULL ||
383 		_mysql_stmt_attr_set == NULL ||
384 		_mysql_store_result == NULL ||
385 		_mysql_stmt_errno == NULL ||
386 		_mysql_errno == NULL ||
387 		_mysql_num_fields == NULL ||
388 		_mysql_num_rows == NULL ||
389 		_mysql_get_host_info == NULL ||
390 		_mysql_get_server_info == NULL ||
391 		_mysql_get_proto_info == NULL)
392 		return false;
393 
394 	return true;
395 }
396 
397 /*
398  * Library load-time initialization, sets on_proc_exit() callback for
399  * backend shutdown.
400  */
401 void
_PG_init(void)402 _PG_init(void)
403 {
404 	if (!mysql_load_library())
405 		ereport(ERROR,
406 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
407 				 errmsg("failed to load the mysql query: \n%s", dlerror()),
408 				 errhint("Export LD_LIBRARY_PATH to locate the library.")));
409 
410 	DefineCustomIntVariable("mysql_fdw.wait_timeout",
411 							"Server-side wait_timeout",
412 							"Set the maximum wait_timeout"
413 							"use to set the MySQL session timeout",
414 							&wait_timeout,
415 							WAIT_TIMEOUT,
416 							0,
417 							INT_MAX,
418 							PGC_USERSET,
419 							0,
420 							NULL,
421 							NULL,
422 							NULL);
423 
424 	DefineCustomIntVariable("mysql_fdw.interactive_timeout",
425 							"Server-side interactive timeout",
426 							"Set the maximum interactive timeout"
427 							"use to set the MySQL session timeout",
428 							&interactive_timeout,
429 							INTERACTIVE_TIMEOUT,
430 							0,
431 							INT_MAX,
432 							PGC_USERSET,
433 							0,
434 							NULL,
435 							NULL,
436 							NULL);
437 
438 	on_proc_exit(&mysql_fdw_exit, PointerGetDatum(NULL));
439 }
440 
441 /*
442  * mysql_fdw_exit
443  * 		Exit callback function.
444  */
445 static void
mysql_fdw_exit(int code,Datum arg)446 mysql_fdw_exit(int code, Datum arg)
447 {
448 	mysql_cleanup_connection();
449 }
450 
451 /*
452  * Foreign-data wrapper handler function: return
453  * a struct with pointers to my callback routines.
454  */
455 Datum
mysql_fdw_handler(PG_FUNCTION_ARGS)456 mysql_fdw_handler(PG_FUNCTION_ARGS)
457 {
458 	FdwRoutine *fdwroutine = makeNode(FdwRoutine);
459 
460 	/* Functions for scanning foreign tables */
461 	fdwroutine->GetForeignRelSize = mysqlGetForeignRelSize;
462 	fdwroutine->GetForeignPaths = mysqlGetForeignPaths;
463 	fdwroutine->GetForeignPlan = mysqlGetForeignPlan;
464 	fdwroutine->BeginForeignScan = mysqlBeginForeignScan;
465 	fdwroutine->IterateForeignScan = mysqlIterateForeignScan;
466 	fdwroutine->ReScanForeignScan = mysqlReScanForeignScan;
467 	fdwroutine->EndForeignScan = mysqlEndForeignScan;
468 
469 	/* Functions for updating foreign tables */
470 	fdwroutine->AddForeignUpdateTargets = mysqlAddForeignUpdateTargets;
471 	fdwroutine->PlanForeignModify = mysqlPlanForeignModify;
472 	fdwroutine->BeginForeignModify = mysqlBeginForeignModify;
473 	fdwroutine->ExecForeignInsert = mysqlExecForeignInsert;
474 	fdwroutine->ExecForeignUpdate = mysqlExecForeignUpdate;
475 	fdwroutine->ExecForeignDelete = mysqlExecForeignDelete;
476 	fdwroutine->EndForeignModify = mysqlEndForeignModify;
477 
478 	/* Function for EvalPlanQual rechecks */
479 #if PG_VERSION_NUM >= 90600
480 	fdwroutine->RecheckForeignScan = mysqlRecheckForeignScan;
481 #endif
482 
483 	/* Support functions for EXPLAIN */
484 	fdwroutine->ExplainForeignScan = mysqlExplainForeignScan;
485 
486 	/* Support functions for ANALYZE */
487 	fdwroutine->AnalyzeForeignTable = mysqlAnalyzeForeignTable;
488 
489 	/* Support functions for IMPORT FOREIGN SCHEMA */
490 #if PG_VERSION_NUM >= 90500
491 	fdwroutine->ImportForeignSchema = mysqlImportForeignSchema;
492 #endif
493 
494 #if PG_VERSION_NUM >= 110000
495 	/* Partition routing and/or COPY from */
496 	fdwroutine->BeginForeignInsert = mysqlBeginForeignInsert;
497 	fdwroutine->EndForeignInsert = mysqlEndForeignInsert;
498 #endif
499 
500 #if PG_VERSION_NUM >= 90600
501 	/* Support functions for join push-down */
502 	fdwroutine->GetForeignJoinPaths = mysqlGetForeignJoinPaths;
503 #endif
504 
505 	PG_RETURN_POINTER(fdwroutine);
506 }
507 
508 /*
509  * mysqlBeginForeignScan
510  * 		Initiate access to the database
511  */
512 static void
mysqlBeginForeignScan(ForeignScanState * node,int eflags)513 mysqlBeginForeignScan(ForeignScanState *node, int eflags)
514 {
515 	TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
516 	TupleDesc	tupleDescriptor = tupleSlot->tts_tupleDescriptor;
517 	MYSQL	   *conn;
518 	RangeTblEntry *rte;
519 	MySQLFdwExecState *festate;
520 	EState	   *estate = node->ss.ps.state;
521 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
522 	mysql_opt  *options;
523 	ListCell   *lc;
524 	int			atindex = 0;
525 	unsigned long prefetch_rows = MYSQL_PREFETCH_ROWS;
526 	unsigned long type = (unsigned long) CURSOR_TYPE_READ_ONLY;
527 	Oid			userid;
528 	ForeignServer *server;
529 	UserMapping *user;
530 	ForeignTable *table;
531 	char		timeout[255];
532 	int			numParams;
533 	int			rtindex;
534 	List	   *fdw_private = fsplan->fdw_private;
535 
536 	/*
537 	 * We'll save private state in node->fdw_state.
538 	 */
539 	festate = (MySQLFdwExecState *) palloc(sizeof(MySQLFdwExecState));
540 	node->fdw_state = (void *) festate;
541 
542 	/*
543 	 * If whole-row references are involved in pushed down join extract the
544 	 * information required to construct those.
545 	 */
546 	if (list_length(fdw_private) >= mysqlFdwPrivateScanTList)
547 	{
548 		List	   *whole_row_lists = list_nth(fdw_private,
549 											   mysqlFdwPrivateWholeRowLists);
550 		List	   *scan_tlist = list_nth(fdw_private,
551 										  mysqlFdwPrivateScanTList);
552 #if PG_VERSION_NUM >= 120000
553 		TupleDesc	scan_tupdesc = ExecTypeFromTL(scan_tlist);
554 #else
555 		TupleDesc	scan_tupdesc = ExecTypeFromTL(scan_tlist, false);
556 #endif
557 
558 		mysql_build_whole_row_constr_info(festate, tupleDescriptor,
559 										  fsplan->fs_relids,
560 										  list_length(node->ss.ps.state->es_range_table),
561 										  whole_row_lists, scan_tlist,
562 										  fsplan->fdw_scan_tlist);
563 
564 		/* Change tuple descriptor to match the result from foreign server. */
565 		tupleDescriptor = scan_tupdesc;
566 	}
567 
568 	/*
569 	 * Identify which user to do the remote access as.  This should match what
570 	 * ExecCheckRTEPerms() does.  In case of a join use the lowest-numbered
571 	 * member RTE as a representative; we would get the same result from any.
572 	 */
573 	if (fsplan->scan.scanrelid > 0)
574 		rtindex = fsplan->scan.scanrelid;
575 	else
576 		rtindex = bms_next_member(fsplan->fs_relids, -1);
577 	rte = rt_fetch(rtindex, estate->es_range_table);
578 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
579 
580 	/* Get info about foreign table. */
581 	table = GetForeignTable(rte->relid);
582 	server = GetForeignServer(table->serverid);
583 	user = GetUserMapping(userid, server->serverid);
584 
585 	/* Fetch the options */
586 	options = mysql_get_options(rte->relid, true);
587 
588 	/*
589 	 * Get the already connected connection, otherwise connect and get the
590 	 * connection handle.
591 	 */
592 	conn = mysql_get_connection(server, user, options);
593 
594 	/* Stash away the state info we have already */
595 	festate->query = strVal(list_nth(fsplan->fdw_private,
596 									 mysqlFdwScanPrivateSelectSql));
597 	festate->retrieved_attrs = list_nth(fsplan->fdw_private,
598 										mysqlFdwScanPrivateRetrievedAttrs);
599 	festate->conn = conn;
600 	festate->query_executed = false;
601 	festate->attinmeta = TupleDescGetAttInMetadata(tupleDescriptor);
602 
603 #if PG_VERSION_NUM >= 110000
604 	festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
605 											  "mysql_fdw temporary data",
606 											  ALLOCSET_DEFAULT_SIZES);
607 #else
608 	festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
609 											  "mysql_fdw temporary data",
610 											  ALLOCSET_SMALL_MINSIZE,
611 											  ALLOCSET_SMALL_INITSIZE,
612 											  ALLOCSET_SMALL_MAXSIZE);
613 #endif
614 
615 	if (wait_timeout > 0)
616 	{
617 		/* Set the session timeout in seconds */
618 		sprintf(timeout, "SET wait_timeout = %d", wait_timeout);
619 		mysql_query(festate->conn, timeout);
620 	}
621 
622 	if (interactive_timeout > 0)
623 	{
624 		/* Set the session timeout in seconds */
625 		sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout);
626 		mysql_query(festate->conn, timeout);
627 	}
628 
629 	mysql_query(festate->conn, "SET sql_mode='ANSI_QUOTES'");
630 
631 	/* Initialize the MySQL statement */
632 	festate->stmt = mysql_stmt_init(festate->conn);
633 	if (festate->stmt == NULL)
634 		ereport(ERROR,
635 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
636 				 errmsg("failed to initialize the mysql query: \n%s",
637 						mysql_error(festate->conn))));
638 
639 	/* Prepare MySQL statement */
640 	if (mysql_stmt_prepare(festate->stmt, festate->query,
641 						   strlen(festate->query)) != 0)
642 		mysql_stmt_error_print(festate, "failed to prepare the MySQL query");
643 
644 	/* Prepare for output conversion of parameters used in remote query. */
645 	numParams = list_length(fsplan->fdw_exprs);
646 	festate->numParams = numParams;
647 	if (numParams > 0)
648 		prepare_query_params((PlanState *) node,
649 							 fsplan->fdw_exprs,
650 							 numParams,
651 							 &festate->param_flinfo,
652 							 &festate->param_exprs,
653 							 &festate->param_values,
654 							 &festate->param_types);
655 
656 	/* int column_count = mysql_num_fields(festate->meta); */
657 
658 	/* Set the statement as cursor type */
659 	mysql_stmt_attr_set(festate->stmt, STMT_ATTR_CURSOR_TYPE, (void *) &type);
660 
661 	/* Set the pre-fetch rows */
662 	mysql_stmt_attr_set(festate->stmt, STMT_ATTR_PREFETCH_ROWS,
663 						(void *) &prefetch_rows);
664 
665 	festate->table = (mysql_table *) palloc0(sizeof(mysql_table));
666 	festate->table->column = (mysql_column *) palloc0(sizeof(mysql_column) * tupleDescriptor->natts);
667 	festate->table->mysql_bind = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * tupleDescriptor->natts);
668 
669 	festate->table->mysql_res = mysql_stmt_result_metadata(festate->stmt);
670 	if (NULL == festate->table->mysql_res)
671 		ereport(ERROR,
672 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
673 				 errmsg("failed to retrieve query result set metadata: \n%s",
674 						mysql_error(festate->conn))));
675 
676 	festate->table->mysql_fields = mysql_fetch_fields(festate->table->mysql_res);
677 
678 	foreach(lc, festate->retrieved_attrs)
679 	{
680 		int			attnum = lfirst_int(lc) - 1;
681 		Oid			pgtype = TupleDescAttr(tupleDescriptor, attnum)->atttypid;
682 		int32		pgtypmod = TupleDescAttr(tupleDescriptor, attnum)->atttypmod;
683 
684 		if (TupleDescAttr(tupleDescriptor, attnum)->attisdropped)
685 			continue;
686 
687 		festate->table->column[atindex].mysql_bind = &festate->table->mysql_bind[atindex];
688 
689 		mysql_bind_result(pgtype, pgtypmod,
690 						  &festate->table->mysql_fields[atindex],
691 						  &festate->table->column[atindex]);
692 		atindex++;
693 	}
694 
695 	/* Bind the results pointers for the prepare statements */
696 	if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0)
697 		mysql_stmt_error_print(festate, "failed to bind the MySQL query");
698 }
699 
700 /*
701  * mysqlIterateForeignScan
702  * 		Iterate and get the rows one by one from  MySQL and placed in tuple
703  * 		slot
704  */
705 static TupleTableSlot *
mysqlIterateForeignScan(ForeignScanState * node)706 mysqlIterateForeignScan(ForeignScanState *node)
707 {
708 	MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
709 	TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
710 	int			attid;
711 	ListCell   *lc;
712 	int			rc = 0;
713 	Datum	   *dvalues;
714 	bool	   *nulls;
715 	int			natts;
716 	AttInMetadata *attinmeta = festate->attinmeta;
717 	HeapTuple	tup;
718 	int			i;
719 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
720 	List	   *fdw_private = fsplan->fdw_private;
721 
722 	natts = attinmeta->tupdesc->natts;
723 
724 	dvalues = palloc0(natts * sizeof(Datum));
725 	nulls = palloc(natts * sizeof(bool));
726 	/* Initialize to nulls for any columns not present in result */
727 	memset(nulls, true, natts * sizeof(bool));
728 
729 	ExecClearTuple(tupleSlot);
730 
731 	/*
732 	 * If this is the first call after Begin or ReScan, we need to bind the
733 	 * params and execute the query.
734 	 */
735 	if (!festate->query_executed)
736 		bind_stmt_params_and_exec(node);
737 
738 	attid = 0;
739 	rc = mysql_stmt_fetch(festate->stmt);
740 	if (rc == 0)
741 	{
742 		foreach(lc, festate->retrieved_attrs)
743 		{
744 			int			attnum = lfirst_int(lc) - 1;
745 			Oid			pgtype = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypid;
746 			int32		pgtypmod = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypmod;
747 
748 			nulls[attnum] = festate->table->column[attid].is_null;
749 			if (!festate->table->column[attid].is_null)
750 				dvalues[attnum] = mysql_convert_to_pg(pgtype, pgtypmod,
751 													  &festate->table->column[attid]);
752 
753 			attid++;
754 		}
755 
756 		ExecClearTuple(tupleSlot);
757 
758 		if (list_length(fdw_private) >= mysqlFdwPrivateScanTList)
759 		{
760 			/* Construct tuple with whole-row references. */
761 			tup = mysql_get_tuple_with_whole_row(festate, dvalues, nulls);
762 		}
763 		else
764 		{
765 			/* Form the Tuple using Datums */
766 			tup = heap_form_tuple(attinmeta->tupdesc, dvalues, nulls);
767 		}
768 
769 		if (tup)
770 #if PG_VERSION_NUM >= 120000
771 			ExecStoreHeapTuple(tup, tupleSlot, false);
772 #else
773 			ExecStoreTuple(tup, tupleSlot, InvalidBuffer, false);
774 #endif
775 		else
776 			mysql_stmt_close(festate->stmt);
777 
778 		/*
779 		 * Release locally palloc'd space and values of pass-by-reference
780 		 * datums, as well.
781 		 */
782 		for (i = 0; i < natts; i++)
783 		{
784 			if (dvalues[i] && !TupleDescAttr(attinmeta->tupdesc, i)->attbyval)
785 				pfree(DatumGetPointer(dvalues[i]));
786 		}
787 		pfree(dvalues);
788 		pfree(nulls);
789 	}
790 	else if (rc == 1)
791 	{
792 		/*
793 		 * Error occurred. Error code and message can be obtained by calling
794 		 * mysql_stmt_errno() and mysql_stmt_error().
795 		 */
796 	}
797 	else if (rc == MYSQL_NO_DATA)
798 	{
799 		/*
800 		 * No more rows/data exists
801 		 */
802 	}
803 	else if (rc == MYSQL_DATA_TRUNCATED)
804 	{
805 		/* Data truncation occurred */
806 		/*
807 		 * MYSQL_DATA_TRUNCATED is returned when truncation reporting is
808 		 * enabled. To determine which column values were truncated when this
809 		 * value is returned, check the error members of the MYSQL_BIND
810 		 * structures used for fetching values. Truncation reporting is
811 		 * enabled by default, but can be controlled by calling
812 		 * mysql_options() with the MYSQL_REPORT_DATA_TRUNCATION option.
813 		 */
814 	}
815 
816 	return tupleSlot;
817 }
818 
819 
820 /*
821  * mysqlExplainForeignScan
822  * 		Produce extra output for EXPLAIN
823  */
824 static void
mysqlExplainForeignScan(ForeignScanState * node,ExplainState * es)825 mysqlExplainForeignScan(ForeignScanState *node, ExplainState *es)
826 {
827 	MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
828 	RangeTblEntry *rte;
829 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
830 	int			rtindex;
831 	EState	   *estate = node->ss.ps.state;
832 	List	   *fdw_private = fsplan->fdw_private;
833 
834 	if (fsplan->scan.scanrelid > 0)
835 		rtindex = fsplan->scan.scanrelid;
836 	else
837 		rtindex = bms_next_member(fsplan->fs_relids, -1);
838 	rte = rt_fetch(rtindex, estate->es_range_table);
839 
840 	if (list_length(fdw_private) > mysqlFdwScanPrivateRelations)
841 	{
842 		char	   *relations = strVal(list_nth(fdw_private,
843 												mysqlFdwScanPrivateRelations));
844 
845 		ExplainPropertyText("Relations", relations, es);
846 	}
847 
848 	/* Give some possibly useful info about startup costs */
849 	if (es->costs)
850 	{
851 		mysql_opt  *options = mysql_get_options(rte->relid, true);
852 
853 		if (strcmp(options->svr_address, "127.0.0.1") == 0 ||
854 			strcmp(options->svr_address, "localhost") == 0)
855 #if PG_VERSION_NUM >= 110000
856 			ExplainPropertyInteger("Local server startup cost", NULL, 10, es);
857 #else
858 			ExplainPropertyLong("Local server startup cost", 10, es);
859 #endif
860 		else
861 #if PG_VERSION_NUM >= 110000
862 			ExplainPropertyInteger("Remote server startup cost", NULL, 25, es);
863 #else
864 			ExplainPropertyLong("Remote server startup cost", 25, es);
865 #endif
866 	}
867 
868 	/* Show the remote query in verbose mode */
869 	if (es->verbose)
870 		ExplainPropertyText("Remote query", festate->query, es);
871 }
872 
873 /*
874  * mysqlEndForeignScan
875  * 		Finish scanning foreign table and dispose objects used for this scan
876  */
877 static void
mysqlEndForeignScan(ForeignScanState * node)878 mysqlEndForeignScan(ForeignScanState *node)
879 {
880 	MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
881 
882 	if (festate->table && festate->table->mysql_res)
883 	{
884 		mysql_free_result(festate->table->mysql_res);
885 		festate->table->mysql_res = NULL;
886 	}
887 
888 	if (festate->stmt)
889 	{
890 		mysql_stmt_close(festate->stmt);
891 		festate->stmt = NULL;
892 	}
893 }
894 
895 /*
896  * mysqlReScanForeignScan
897  * 		Rescan table, possibly with new parameters
898  */
899 static void
mysqlReScanForeignScan(ForeignScanState * node)900 mysqlReScanForeignScan(ForeignScanState *node)
901 {
902 	MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
903 
904 	/*
905 	 * Set the query_executed flag to false so that the query will be executed
906 	 * in mysqlIterateForeignScan().
907 	 */
908 	festate->query_executed = false;
909 
910 }
911 
912 /*
913  * mysqlGetForeignRelSize
914  * 		Create a FdwPlan for a scan on the foreign table
915  */
916 static void
mysqlGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)917 mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
918 					   Oid foreigntableid)
919 {
920 	double		rows = 0;
921 	double		filtered = 0;
922 	MYSQL	   *conn;
923 	MYSQL_ROW	row;
924 	Bitmapset  *attrs_used = NULL;
925 	mysql_opt  *options;
926 	Oid			userid = GetUserId();
927 	ForeignServer *server;
928 	UserMapping *user;
929 	ForeignTable *table;
930 	MySQLFdwRelationInfo *fpinfo;
931 	ListCell   *lc;
932 	RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
933 	const char *database;
934 	const char *relname;
935 	const char *refname;
936 
937 	fpinfo = (MySQLFdwRelationInfo *) palloc0(sizeof(MySQLFdwRelationInfo));
938 	baserel->fdw_private = (void *) fpinfo;
939 
940 	/* Base foreign tables need to be push down always. */
941 	fpinfo->pushdown_safe = true;
942 
943 	table = GetForeignTable(foreigntableid);
944 	server = GetForeignServer(table->serverid);
945 	user = GetUserMapping(userid, server->serverid);
946 
947 	/* Fetch options */
948 	options = mysql_get_options(foreigntableid, true);
949 
950 	/* Connect to the server */
951 	conn = mysql_get_connection(server, user, options);
952 
953 	mysql_query(conn, "SET sql_mode='ANSI_QUOTES'");
954 
955 #if PG_VERSION_NUM >= 90600
956 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
957 				   &attrs_used);
958 #else
959 	pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
960 				   &attrs_used);
961 #endif
962 
963 	foreach(lc, baserel->baserestrictinfo)
964 	{
965 		RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
966 
967 		if (mysql_is_foreign_expr(root, baserel, ri->clause, false))
968 			fpinfo->remote_conds = lappend(fpinfo->remote_conds, ri);
969 		else
970 			fpinfo->local_conds = lappend(fpinfo->local_conds, ri);
971 	}
972 
973 #if PG_VERSION_NUM >= 90600
974 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
975 				   &fpinfo->attrs_used);
976 #else
977 	pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
978 				   &fpinfo->attrs_used);
979 #endif
980 
981 	foreach(lc, fpinfo->local_conds)
982 	{
983 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
984 
985 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
986 					   &fpinfo->attrs_used);
987 	}
988 
989 	if (options->use_remote_estimate)
990 	{
991 		StringInfoData sql;
992 		MYSQL_RES  *result = NULL;
993 		List	   *retrieved_attrs = NULL;
994 		List	   *params_list = NULL;
995 
996 		initStringInfo(&sql);
997 		appendStringInfo(&sql, "EXPLAIN ");
998 
999 		mysql_deparse_select_stmt_for_rel(&sql, root, baserel, NULL,
1000 										  fpinfo->remote_conds,
1001 										  &retrieved_attrs, &params_list);
1002 
1003 		if (mysql_query(conn, sql.data) != 0)
1004 			mysql_error_print(conn);
1005 
1006 		result = mysql_store_result(conn);
1007 		if (result)
1008 		{
1009 			int			num_fields;
1010 
1011 			/*
1012 			 * MySQL provide numbers of rows per table invole in the statement,
1013 			 * but we don't have problem with it because we are sending
1014 			 * separate query per table in FDW.
1015 			 */
1016 			row = mysql_fetch_row(result);
1017 			num_fields = mysql_num_fields(result);
1018 			if (row)
1019 			{
1020 				MYSQL_FIELD *field;
1021 				int			i;
1022 
1023 				for (i = 0; i < num_fields; i++)
1024 				{
1025 					field = mysql_fetch_field(result);
1026 					if (!row[i])
1027 						continue;
1028 					else if (strcmp(field->name, "rows") == 0)
1029 						rows = atof(row[i]);
1030 					else if (strcmp(field->name, "filtered") == 0)
1031 						filtered = atof(row[i]);
1032 				}
1033 			}
1034 			mysql_free_result(result);
1035 		}
1036 	}
1037 	if (rows > 0)
1038 		rows = ((rows + 1) * filtered) / 100;
1039 	else
1040 		rows = DEFAULTE_NUM_ROWS;
1041 
1042 	baserel->rows = rows;
1043 	baserel->tuples = rows;
1044 
1045 	/*
1046 	 * Set the name of relation in fpinfo, while we are constructing it here.
1047 	 * It will be used to build the string describing the join relation in
1048 	 * EXPLAIN output.  We can't know whether VERBOSE option is specified or
1049 	 * not, so always schema-qualify the foreign table name.
1050 	 */
1051 	fpinfo->relation_name = makeStringInfo();
1052 	database = options->svr_database;
1053 	relname = get_rel_name(foreigntableid);
1054 	refname = rte->eref->aliasname;
1055 	appendStringInfo(fpinfo->relation_name, "%s.%s",
1056 					 quote_identifier(database), quote_identifier(relname));
1057 	if (*refname && strcmp(refname, relname) != 0)
1058 		appendStringInfo(fpinfo->relation_name, " %s",
1059 						 quote_identifier(rte->eref->aliasname));
1060 }
1061 
1062 static bool
mysql_is_column_unique(Oid foreigntableid)1063 mysql_is_column_unique(Oid foreigntableid)
1064 {
1065 	StringInfoData sql;
1066 	MYSQL	   *conn;
1067 	MYSQL_RES  *result;
1068 	mysql_opt  *options;
1069 	Oid			userid = GetUserId();
1070 	ForeignServer *server;
1071 	UserMapping *user;
1072 	ForeignTable *table;
1073 
1074 	table = GetForeignTable(foreigntableid);
1075 	server = GetForeignServer(table->serverid);
1076 	user = GetUserMapping(userid, server->serverid);
1077 
1078 	/* Fetch the options */
1079 	options = mysql_get_options(foreigntableid, true);
1080 
1081 	/* Connect to the server */
1082 	conn = mysql_get_connection(server, user, options);
1083 
1084 	/* Build the query */
1085 	initStringInfo(&sql);
1086 
1087 	/*
1088 	 * Construct the query by prefixing the database name so that it can lookup
1089 	 * in correct database.
1090 	 */
1091 	appendStringInfo(&sql, "EXPLAIN %s.%s", options->svr_database,
1092 					 options->svr_table);
1093 	if (mysql_query(conn, sql.data) != 0)
1094 		mysql_error_print(conn);
1095 
1096 	result = mysql_store_result(conn);
1097 	if (result)
1098 	{
1099 		int			num_fields = mysql_num_fields(result);
1100 		MYSQL_ROW	row;
1101 
1102 		row = mysql_fetch_row(result);
1103 		if (row && num_fields > 3)
1104 		{
1105 			if ((strcmp(row[3], "PRI") == 0) || (strcmp(row[3], "UNI")) == 0)
1106 			{
1107 				mysql_free_result(result);
1108 				return true;
1109 			}
1110 		}
1111 		mysql_free_result(result);
1112 	}
1113 
1114 	return false;
1115 }
1116 
1117 /*
1118  * mysqlEstimateCosts
1119  * 		Estimate the remote query cost
1120  */
1121 static void
mysqlEstimateCosts(PlannerInfo * root,RelOptInfo * baserel,Cost * startup_cost,Cost * total_cost,Oid foreigntableid)1122 mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Cost *startup_cost,
1123 				   Cost *total_cost, Oid foreigntableid)
1124 {
1125 	mysql_opt  *options;
1126 
1127 	/* Fetch options */
1128 	options = mysql_get_options(foreigntableid, true);
1129 
1130 	/* Local databases are probably faster */
1131 	if (strcmp(options->svr_address, "127.0.0.1") == 0 ||
1132 		strcmp(options->svr_address, "localhost") == 0)
1133 		*startup_cost = 10;
1134 	else
1135 		*startup_cost = 25;
1136 
1137 	*total_cost = baserel->rows + *startup_cost;
1138 }
1139 
1140 /*
1141  * mysqlGetForeignPaths
1142  * 		Get the foreign paths
1143  */
1144 static void
mysqlGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)1145 mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
1146 					 Oid foreigntableid)
1147 {
1148 	Cost		startup_cost;
1149 	Cost		total_cost;
1150 
1151 	/* Estimate costs */
1152 	mysqlEstimateCosts(root, baserel, &startup_cost, &total_cost,
1153 					   foreigntableid);
1154 
1155 	/* Create a ForeignPath node and add it as only possible path */
1156 	add_path(baserel, (Path *)
1157 			 create_foreignscan_path(root, baserel,
1158 #if PG_VERSION_NUM >= 90600
1159 									 NULL,	/* default pathtarget */
1160 #endif
1161 									 baserel->rows,
1162 									 startup_cost,
1163 									 total_cost,
1164 									 NIL,	/* no pathkeys */
1165 									 baserel->lateral_relids,
1166 #if PG_VERSION_NUM >= 90500
1167 									 NULL,	/* no extra plan */
1168 #endif
1169 									 NULL));	/* no fdw_private data */
1170 }
1171 
1172 
1173 /*
1174  * mysqlGetForeignPlan
1175  * 		Get a foreign scan plan node
1176  */
1177 #if PG_VERSION_NUM >= 90500
1178 static ForeignScan *
mysqlGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1179 mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel,
1180 					Oid foreigntableid, ForeignPath *best_path,
1181 					List *tlist, List *scan_clauses, Plan *outer_plan)
1182 #else
1183 static ForeignScan *
1184 mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel,
1185 					Oid foreigntableid, ForeignPath *best_path,
1186 					List *tlist, List *scan_clauses)
1187 #endif
1188 {
1189 	MySQLFdwRelationInfo *fpinfo = (MySQLFdwRelationInfo *) foreignrel->fdw_private;
1190 	Index		scan_relid;
1191 	List	   *fdw_private;
1192 	List	   *local_exprs = NIL;
1193 	List	   *remote_exprs = NIL;
1194 	List	   *params_list = NIL;
1195 	List	   *remote_conds = NIL;
1196 	StringInfoData sql;
1197 	List	   *retrieved_attrs;
1198 	ListCell   *lc;
1199 	List	   *scan_var_list;
1200 	List	   *fdw_scan_tlist = NIL;
1201 	List	   *whole_row_lists = NIL;
1202 
1203 	if (foreignrel->reloptkind == RELOPT_BASEREL ||
1204 		foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1205 		scan_relid = foreignrel->relid;
1206 	else
1207 	{
1208 		scan_relid = 0;
1209 		Assert(!scan_clauses);
1210 
1211 		remote_conds = fpinfo->remote_conds;
1212 		local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1213 	}
1214 
1215 	/*
1216 	 * Separate the scan_clauses into those that can be executed remotely and
1217 	 * those that can't.  baserestrictinfo clauses that were previously
1218 	 * determined to be safe or unsafe are shown in fpinfo->remote_conds and
1219 	 * fpinfo->local_conds.  Anything else in the scan_clauses list will be
1220 	 * a join clause, which we have to check for remote-safety.
1221 	 *
1222 	 * This code must match "extract_actual_clauses(scan_clauses, false)"
1223 	 * except for the additional decision about remote versus local execution.
1224 	 * Note however that we only strip the RestrictInfo nodes from the
1225 	 * local_exprs list, since appendWhereClause expects a list of
1226 	 * RestrictInfos.
1227 	 */
1228 	foreach(lc, scan_clauses)
1229 	{
1230 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1231 
1232 		Assert(IsA(rinfo, RestrictInfo));
1233 
1234 		/* Ignore any pseudoconstants, they're dealt with elsewhere */
1235 		if (rinfo->pseudoconstant)
1236 			continue;
1237 
1238 		if (list_member_ptr(fpinfo->remote_conds, rinfo))
1239 		{
1240 			remote_conds = lappend(remote_conds, rinfo);
1241 			remote_exprs = lappend(remote_exprs, rinfo->clause);
1242 		}
1243 		else if (list_member_ptr(fpinfo->local_conds, rinfo))
1244 			local_exprs = lappend(local_exprs, rinfo->clause);
1245 		else if (mysql_is_foreign_expr(root, foreignrel, rinfo->clause, false))
1246 		{
1247 			remote_conds = lappend(remote_conds, rinfo);
1248 			remote_exprs = lappend(remote_exprs, rinfo->clause);
1249 		}
1250 		else
1251 			local_exprs = lappend(local_exprs, rinfo->clause);
1252 	}
1253 
1254 #if PG_VERSION_NUM >= 90600
1255 	scan_var_list = pull_var_clause((Node *) foreignrel->reltarget->exprs,
1256 									PVC_RECURSE_PLACEHOLDERS);
1257 #else
1258 	scan_var_list = pull_var_clause((Node *) foreignrel->reltargetlist,
1259 									PVC_RECURSE_AGGREGATES,
1260 									PVC_RECURSE_PLACEHOLDERS);
1261 #endif
1262 
1263 	/* System attributes are not allowed. */
1264 	foreach(lc, scan_var_list)
1265 	{
1266 		Var		   *var = lfirst(lc);
1267 		const FormData_pg_attribute *attr;
1268 
1269 		Assert(IsA(var, Var));
1270 
1271 		if (var->varattno >= 0)
1272 			continue;
1273 
1274 #if PG_VERSION_NUM >= 120000
1275 		attr = SystemAttributeDefinition(var->varattno);
1276 #else
1277 		attr = SystemAttributeDefinition(var->varattno, false);
1278 #endif
1279 		ereport(ERROR,
1280 				(errcode(ERRCODE_FDW_COLUMN_NAME_NOT_FOUND),
1281 				 errmsg("system attribute \"%s\" can't be fetched from remote relation",
1282 						attr->attname.data)));
1283 	}
1284 
1285 #if PG_VERSION_NUM >= 90600
1286 #if PG_VERSION_NUM >= 100000
1287 	if (IS_JOIN_REL(foreignrel))
1288 #else
1289 	if (foreignrel->reloptkind == RELOPT_JOINREL)
1290 #endif
1291 	{
1292 		scan_var_list = list_concat_unique(NIL, scan_var_list);
1293 
1294 		scan_var_list = list_concat_unique(scan_var_list,
1295 										   pull_var_clause((Node *) local_exprs,
1296 														   PVC_RECURSE_PLACEHOLDERS));
1297 
1298 		/*
1299 		 * For join relations, planner needs targetlist, which represents the
1300 		 * output of ForeignScan node. Prepare this before we modify
1301 		 * scan_var_list to include Vars required by whole row references, if
1302 		 * any.  Note that base foreign scan constructs the whole-row reference
1303 		 * at the time of projection.  Joins are required to get them from the
1304 		 * underlying base relations.  For a pushed down join the underlying
1305 		 * relations do not exist, hence the whole-row references need to be
1306 		 * constructed separately.
1307 		 */
1308 		fdw_scan_tlist = add_to_flat_tlist(NIL, scan_var_list);
1309 
1310 		/*
1311 		 * MySQL does not allow row value constructors to be part of SELECT
1312 		 * list.  Hence, whole row reference in join relations need to be
1313 		 * constructed by combining all the attributes of required base
1314 		 * relations into a tuple after fetching the result from the foreign
1315 		 * server.  So adjust the targetlist to include all attributes for
1316 		 * required base relations.  The function also returns list of Var node
1317 		 * lists required to construct the whole-row references of the
1318 		 * involved relations.
1319 		 */
1320 		scan_var_list = mysql_adjust_whole_row_ref(root, scan_var_list,
1321 												   &whole_row_lists,
1322 												   foreignrel->relids);
1323 
1324 		if (outer_plan)
1325 		{
1326 			ListCell   *lc;
1327 
1328 			foreach(lc, local_exprs)
1329 			{
1330 				Node	   *qual = lfirst(lc);
1331 
1332 				outer_plan->qual = list_delete(outer_plan->qual, qual);
1333 
1334 				/*
1335 				 * For an inner join the local conditions of foreign scan plan
1336 				 * can be part of the joinquals as well.  (They might also be
1337 				 * in the mergequals or hashquals, but we can't touch those
1338 				 * without breaking the plan.)
1339 				 */
1340 				if (IsA(outer_plan, NestLoop) ||
1341 					IsA(outer_plan, MergeJoin) ||
1342 					IsA(outer_plan, HashJoin))
1343 				{
1344 					Join	   *join_plan = (Join *) outer_plan;
1345 
1346 					if (join_plan->jointype == JOIN_INNER)
1347 						join_plan->joinqual = list_delete(join_plan->joinqual,
1348 														  qual);
1349 				}
1350 			}
1351 		}
1352 	}
1353 #endif
1354 
1355 	/*
1356 	 * Build the query string to be sent for execution, and identify
1357 	 * expressions to be sent as parameters.
1358 	 */
1359 	initStringInfo(&sql);
1360 	mysql_deparse_select_stmt_for_rel(&sql, root, foreignrel, scan_var_list,
1361 									  remote_conds, &retrieved_attrs,
1362 									  &params_list);
1363 
1364 	if (foreignrel->relid == root->parse->resultRelation &&
1365 		(root->parse->commandType == CMD_UPDATE ||
1366 		 root->parse->commandType == CMD_DELETE))
1367 	{
1368 		/* Relation is UPDATE/DELETE target, so use FOR UPDATE */
1369 		appendStringInfoString(&sql, " FOR UPDATE");
1370 	}
1371 
1372 	/*
1373 	 * Build the fdw_private list that will be available to the executor.
1374 	 * Items in the list must match enum FdwScanPrivateIndex, above.
1375 	 */
1376 
1377 	fdw_private = list_make2(makeString(sql.data), retrieved_attrs);
1378 #if PG_VERSION_NUM >= 100000
1379 	if (IS_JOIN_REL(foreignrel))
1380 #else
1381 	if (foreignrel->reloptkind == RELOPT_JOINREL)
1382 #endif
1383 	{
1384 		fdw_private = lappend(fdw_private,
1385 							  makeString(fpinfo->relation_name->data));
1386 
1387 		/*
1388 		 * To construct whole row references we need:
1389 		 *
1390 		 * 1. The lists of Var nodes required for whole-row references of
1391 		 *    joining relations
1392 		 * 2. targetlist corresponding the result expected from the foreign
1393 		 *    server.
1394 		 */
1395 		if (whole_row_lists)
1396 		{
1397 			fdw_private = lappend(fdw_private, whole_row_lists);
1398 			fdw_private = lappend(fdw_private,
1399 								  add_to_flat_tlist(NIL, scan_var_list));
1400 		}
1401 	}
1402 
1403 	/*
1404 	 * Create the ForeignScan node from target list, local filtering
1405 	 * expressions, remote parameter expressions, and FDW private information.
1406 	 *
1407 	 * Note that the remote parameter expressions are stored in the fdw_exprs
1408 	 * field of the finished plan node; we can't keep them in private state
1409 	 * because then they wouldn't be subject to later planner processing.
1410 	 */
1411 #if PG_VERSION_NUM >= 90500
1412 	return make_foreignscan(tlist, local_exprs, scan_relid, params_list,
1413 							fdw_private, fdw_scan_tlist, NIL, outer_plan);
1414 #else
1415 	return make_foreignscan(tlist, local_exprs, scan_relid, params_list,
1416 							fdw_private);
1417 #endif
1418 }
1419 
1420 /*
1421  * mysqlAnalyzeForeignTable
1422  * 		Implement stats collection
1423  */
1424 static bool
mysqlAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)1425 mysqlAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func,
1426 						 BlockNumber *totalpages)
1427 {
1428 	StringInfoData sql;
1429 	double		table_size = 0;
1430 	MYSQL	   *conn;
1431 	MYSQL_RES  *result;
1432 	Oid			foreignTableId = RelationGetRelid(relation);
1433 	mysql_opt  *options;
1434 	ForeignServer *server;
1435 	UserMapping *user;
1436 	ForeignTable *table;
1437 
1438 	table = GetForeignTable(foreignTableId);
1439 	server = GetForeignServer(table->serverid);
1440 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
1441 
1442 	/* Fetch options */
1443 	options = mysql_get_options(foreignTableId, true);
1444 	Assert(options->svr_database != NULL && options->svr_table != NULL);
1445 
1446 	/* Connect to the server */
1447 	conn = mysql_get_connection(server, user, options);
1448 
1449 	/* Build the query */
1450 	initStringInfo(&sql);
1451 	mysql_deparse_analyze(&sql, options->svr_database, options->svr_table);
1452 
1453 	if (mysql_query(conn, sql.data) != 0)
1454 		mysql_error_print(conn);
1455 
1456 	result = mysql_store_result(conn);
1457 
1458 	/*
1459 	 * To get the table size in ANALYZE operation, we run a SELECT query by
1460 	 * passing the database name and table name.  So if the remote table is not
1461 	 * present, then we end up getting zero rows.  Throw an error in that case.
1462 	 */
1463 	if (mysql_num_rows(result) == 0)
1464 		ereport(ERROR,
1465 				(errcode(ERRCODE_FDW_TABLE_NOT_FOUND),
1466 				 errmsg("relation %s.%s does not exist", options->svr_database,
1467 						options->svr_table)));
1468 
1469 	if (result)
1470 	{
1471 		MYSQL_ROW	row;
1472 
1473 		row = mysql_fetch_row(result);
1474 		table_size = atof(row[0]);
1475 		mysql_free_result(result);
1476 	}
1477 
1478 	*totalpages = table_size / MYSQL_BLKSIZ;
1479 
1480 	return false;
1481 }
1482 
1483 static List *
mysqlPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1484 mysqlPlanForeignModify(PlannerInfo *root,
1485 					   ModifyTable *plan,
1486 					   Index resultRelation,
1487 					   int subplan_index)
1488 {
1489 
1490 	CmdType		operation = plan->operation;
1491 	RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1492 	Relation	rel;
1493 	List	   *targetAttrs = NIL;
1494 	StringInfoData sql;
1495 	char	   *attname;
1496 	Oid			foreignTableId;
1497 
1498 	initStringInfo(&sql);
1499 
1500 	/*
1501 	 * Core code already has some lock on each rel being planned, so we can
1502 	 * use NoLock here.
1503 	 */
1504 #if PG_VERSION_NUM < 130000
1505 	rel = heap_open(rte->relid, NoLock);
1506 #else
1507 	rel = table_open(rte->relid, NoLock);
1508 #endif
1509 
1510 	foreignTableId = RelationGetRelid(rel);
1511 
1512 	if (!mysql_is_column_unique(foreignTableId))
1513 		ereport(ERROR,
1514 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1515 				 errmsg("first column of remote table must be unique for INSERT/UPDATE/DELETE operation")));
1516 
1517 	/*
1518 	 * In an INSERT, we transmit all columns that are defined in the foreign
1519 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1520 	 * foreign table, we transmit all columns like INSERT; else we transmit
1521 	 * only columns that were explicitly targets of the UPDATE, so as to avoid
1522 	 * unnecessary data transmission.  (We can't do that for INSERT since we
1523 	 * would miss sending default values for columns not listed in the source
1524 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1525 	 * those triggers might change values for non-target columns, in which
1526 	 * case we would miss sending changed values for those columns.)
1527 	 */
1528 	if (operation == CMD_INSERT ||
1529 		(operation == CMD_UPDATE &&
1530 		 rel->trigdesc &&
1531 		 rel->trigdesc->trig_update_before_row))
1532 	{
1533 		TupleDesc	tupdesc = RelationGetDescr(rel);
1534 		int			attnum;
1535 
1536 		/*
1537 		 * If it is an UPDATE operation, check for row identifier column in
1538 		 * target attribute list by calling getUpdateTargetAttrs().
1539 		 */
1540 		if (operation == CMD_UPDATE)
1541 			getUpdateTargetAttrs(rte);
1542 
1543 		for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1544 		{
1545 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1546 
1547 			if (!attr->attisdropped)
1548 				targetAttrs = lappend_int(targetAttrs, attnum);
1549 		}
1550 	}
1551 	else if (operation == CMD_UPDATE)
1552 	{
1553 		targetAttrs = getUpdateTargetAttrs(rte);
1554 		/* We also want the rowid column to be available for the update */
1555 		targetAttrs = lcons_int(1, targetAttrs);
1556 	}
1557 	else
1558 		targetAttrs = lcons_int(1, targetAttrs);
1559 
1560 #if PG_VERSION_NUM >= 110000
1561 	attname = get_attname(foreignTableId, 1, false);
1562 #else
1563 	attname = get_relid_attribute_name(foreignTableId, 1);
1564 #endif
1565 
1566 	/*
1567 	 * Construct the SQL command string.
1568 	 */
1569 	switch (operation)
1570 	{
1571 		case CMD_INSERT:
1572 			mysql_deparse_insert(&sql, root, resultRelation, rel, targetAttrs);
1573 			break;
1574 		case CMD_UPDATE:
1575 			mysql_deparse_update(&sql, root, resultRelation, rel, targetAttrs,
1576 								 attname);
1577 			break;
1578 		case CMD_DELETE:
1579 			mysql_deparse_delete(&sql, root, resultRelation, rel, attname);
1580 			break;
1581 		default:
1582 			elog(ERROR, "unexpected operation: %d", (int) operation);
1583 			break;
1584 	}
1585 
1586 	if (plan->returningLists)
1587 		ereport(ERROR,
1588 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1589 				 errmsg("RETURNING is not supported by this FDW")));
1590 
1591 #if PG_VERSION_NUM < 130000
1592 	heap_close(rel, NoLock);
1593 #else
1594 	table_close(rel, NoLock);
1595 #endif
1596 
1597 	return list_make2(makeString(sql.data), targetAttrs);
1598 }
1599 
1600 /*
1601  * mysqlBeginForeignModify
1602  * 		Begin an insert/update/delete operation on a foreign table
1603  */
1604 static void
mysqlBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1605 mysqlBeginForeignModify(ModifyTableState *mtstate,
1606 						ResultRelInfo *resultRelInfo,
1607 						List *fdw_private,
1608 						int subplan_index,
1609 						int eflags)
1610 {
1611 	MySQLFdwExecState *fmstate;
1612 	EState	   *estate = mtstate->ps.state;
1613 	Relation	rel = resultRelInfo->ri_RelationDesc;
1614 	AttrNumber	n_params;
1615 	Oid			typefnoid = InvalidOid;
1616 	bool		isvarlena = false;
1617 	ListCell   *lc;
1618 	Oid			foreignTableId = InvalidOid;
1619 	RangeTblEntry *rte;
1620 	Oid			userid;
1621 	ForeignServer *server;
1622 	UserMapping *user;
1623 	ForeignTable *table;
1624 
1625 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1626 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1627 
1628 	foreignTableId = RelationGetRelid(rel);
1629 
1630 	table = GetForeignTable(foreignTableId);
1631 	server = GetForeignServer(table->serverid);
1632 	user = GetUserMapping(userid, server->serverid);
1633 
1634 	/*
1635 	 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1636 	 * stays NULL.
1637 	 */
1638 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1639 		return;
1640 
1641 	/* Begin constructing MySQLFdwExecState. */
1642 	fmstate = (MySQLFdwExecState *) palloc0(sizeof(MySQLFdwExecState));
1643 
1644 	fmstate->mysqlFdwOptions = mysql_get_options(foreignTableId, true);
1645 	fmstate->conn = mysql_get_connection(server, user,
1646 										 fmstate->mysqlFdwOptions);
1647 
1648 	fmstate->query = strVal(list_nth(fdw_private, 0));
1649 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private, 1);
1650 
1651 	n_params = list_length(fmstate->retrieved_attrs) + 1;
1652 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1653 	fmstate->p_nums = 0;
1654 #if PG_VERSION_NUM >= 110000
1655 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1656 											  "mysql_fdw temporary data",
1657 											  ALLOCSET_DEFAULT_SIZES);
1658 #else
1659 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1660 											  "mysql_fdw temporary data",
1661 											  ALLOCSET_SMALL_MINSIZE,
1662 											  ALLOCSET_SMALL_INITSIZE,
1663 											  ALLOCSET_SMALL_MAXSIZE);
1664 #endif
1665 
1666 	/* Set up for remaining transmittable parameters */
1667 	foreach(lc, fmstate->retrieved_attrs)
1668 	{
1669 		int			attnum = lfirst_int(lc);
1670 		Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel),
1671 											   attnum - 1);
1672 
1673 		Assert(!attr->attisdropped);
1674 
1675 		getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1676 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1677 		fmstate->p_nums++;
1678 	}
1679 	Assert(fmstate->p_nums <= n_params);
1680 
1681 	n_params = list_length(fmstate->retrieved_attrs);
1682 
1683 	/* Initialize mysql statment */
1684 	fmstate->stmt = mysql_stmt_init(fmstate->conn);
1685 	if (!fmstate->stmt)
1686 		ereport(ERROR,
1687 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1688 				 errmsg("failed to initialize the MySQL query: \n%s",
1689 						mysql_error(fmstate->conn))));
1690 
1691 	/* Prepare mysql statment */
1692 	if (mysql_stmt_prepare(fmstate->stmt, fmstate->query,
1693 						   strlen(fmstate->query)) != 0)
1694 		mysql_stmt_error_print(fmstate, "failed to prepare the MySQL query");
1695 
1696 	resultRelInfo->ri_FdwState = fmstate;
1697 }
1698 
1699 /*
1700  * mysqlExecForeignInsert
1701  * 		Insert one row into a foreign table
1702  */
1703 static TupleTableSlot *
mysqlExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1704 mysqlExecForeignInsert(EState *estate,
1705 					   ResultRelInfo *resultRelInfo,
1706 					   TupleTableSlot *slot,
1707 					   TupleTableSlot *planSlot)
1708 {
1709 	MySQLFdwExecState *fmstate;
1710 	MYSQL_BIND *mysql_bind_buffer;
1711 	ListCell   *lc;
1712 	int			n_params;
1713 	MemoryContext oldcontext;
1714 	bool	   *isnull;
1715 
1716 	fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1717 	n_params = list_length(fmstate->retrieved_attrs);
1718 
1719 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
1720 
1721 	mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * n_params);
1722 	isnull = (bool *) palloc0(sizeof(bool) * n_params);
1723 
1724 	mysql_query(fmstate->conn, "SET sql_mode='ANSI_QUOTES'");
1725 
1726 	foreach(lc, fmstate->retrieved_attrs)
1727 	{
1728 		int			attnum = lfirst_int(lc) - 1;
1729 		Oid			type = TupleDescAttr(slot->tts_tupleDescriptor, attnum)->atttypid;
1730 		Datum		value;
1731 
1732 		value = slot_getattr(slot, attnum + 1, &isnull[attnum]);
1733 
1734 		mysql_bind_sql_var(type, attnum, value, mysql_bind_buffer,
1735 						   &isnull[attnum]);
1736 	}
1737 
1738 	/* Bind values */
1739 	if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1740 		mysql_stmt_error_print(fmstate, "failed to bind the MySQL query");
1741 
1742 	/* Execute the query */
1743 	if (mysql_stmt_execute(fmstate->stmt) != 0)
1744 		mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1745 
1746 	MemoryContextSwitchTo(oldcontext);
1747 	MemoryContextReset(fmstate->temp_cxt);
1748 	return slot;
1749 }
1750 
1751 static TupleTableSlot *
mysqlExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1752 mysqlExecForeignUpdate(EState *estate,
1753 					   ResultRelInfo *resultRelInfo,
1754 					   TupleTableSlot *slot,
1755 					   TupleTableSlot *planSlot)
1756 {
1757 	MySQLFdwExecState *fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1758 	Relation	rel = resultRelInfo->ri_RelationDesc;
1759 	MYSQL_BIND *mysql_bind_buffer;
1760 	Oid			foreignTableId = RelationGetRelid(rel);
1761 	bool		is_null = false;
1762 	ListCell   *lc;
1763 	int			bindnum = 0;
1764 	Oid			typeoid;
1765 	Datum		value;
1766 	int			n_params;
1767 	bool	   *isnull;
1768 	Datum		new_value;
1769 	HeapTuple	tuple;
1770 	Form_pg_attribute attr;
1771 	bool		found_row_id_col = false;
1772 
1773 	n_params = list_length(fmstate->retrieved_attrs);
1774 
1775 	mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * n_params);
1776 	isnull = (bool *) palloc0(sizeof(bool) * n_params);
1777 
1778 	/* Bind the values */
1779 	foreach(lc, fmstate->retrieved_attrs)
1780 	{
1781 		int			attnum = lfirst_int(lc);
1782 		Oid			type;
1783 
1784 		/*
1785 		 * The first attribute cannot be in the target list attribute.  Set the
1786 		 * found_row_id_col to true once we find it so that we can fetch the
1787 		 * value later.
1788 		 */
1789 		if (attnum == 1)
1790 		{
1791 			found_row_id_col = true;
1792 			continue;
1793 		}
1794 
1795 		type = TupleDescAttr(slot->tts_tupleDescriptor, attnum - 1)->atttypid;
1796 		value = slot_getattr(slot, attnum, (bool *) (&isnull[bindnum]));
1797 
1798 		mysql_bind_sql_var(type, bindnum, value, mysql_bind_buffer,
1799 						   &isnull[bindnum]);
1800 		bindnum++;
1801 	}
1802 
1803 	/*
1804 	 * Since we add a row identifier column in the target list always, so
1805 	 * found_row_id_col flag should be true.
1806 	 */
1807 	if (!found_row_id_col)
1808 		elog(ERROR, "missing row identifier column value in UPDATE");
1809 
1810 	new_value = slot_getattr(slot, 1, &is_null);
1811 
1812 	/*
1813 	 * Get the row identifier column value that was passed up as a resjunk
1814 	 * column and compare that value with the new value to identify if that
1815 	 * value is changed.
1816 	 */
1817 	value = ExecGetJunkAttribute(planSlot, 1, &is_null);
1818 
1819 	tuple = SearchSysCache2(ATTNUM,
1820 							ObjectIdGetDatum(foreignTableId),
1821 							Int16GetDatum(1));
1822 	if (!HeapTupleIsValid(tuple))
1823 		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1824 			 1, foreignTableId);
1825 
1826 	attr = (Form_pg_attribute) GETSTRUCT(tuple);
1827 	typeoid = attr->atttypid;
1828 
1829 	if (DatumGetPointer(new_value) != NULL && DatumGetPointer(value) != NULL)
1830 	{
1831 		Datum		n_value = new_value;
1832 		Datum 		o_value = value;
1833 
1834 		/* If the attribute type is varlena then need to detoast the datums. */
1835 		if (attr->attlen == -1)
1836 		{
1837 			n_value = PointerGetDatum(PG_DETOAST_DATUM(new_value));
1838 			o_value = PointerGetDatum(PG_DETOAST_DATUM(value));
1839 		}
1840 
1841 		if (!datumIsEqual(o_value, n_value, attr->attbyval, attr->attlen))
1842 			ereport(ERROR,
1843 					(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1844 					 errmsg("row identifier column update is not supported")));
1845 
1846 		/* Free memory if it's a copy made above */
1847 		if (DatumGetPointer(n_value) != DatumGetPointer(new_value))
1848 			pfree(DatumGetPointer(n_value));
1849 		if (DatumGetPointer(o_value) != DatumGetPointer(value))
1850 			pfree(DatumGetPointer(o_value));
1851 	}
1852 	else if (!(DatumGetPointer(new_value) == NULL &&
1853 			   DatumGetPointer(value) == NULL))
1854 		ereport(ERROR,
1855 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1856 				 errmsg("row identifier column update is not supported")));
1857 
1858 	ReleaseSysCache(tuple);
1859 
1860 	/* Bind qual */
1861 	mysql_bind_sql_var(typeoid, bindnum, value, mysql_bind_buffer, &is_null);
1862 
1863 	if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1864 		ereport(ERROR,
1865 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1866 				 errmsg("failed to bind the MySQL query: %s",
1867 						mysql_error(fmstate->conn))));
1868 
1869 	/* Execute the query */
1870 	if (mysql_stmt_execute(fmstate->stmt) != 0)
1871 		mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1872 
1873 	/* Return NULL if nothing was updated on the remote end */
1874 	return slot;
1875 }
1876 
1877 /*
1878  * mysqlAddForeignUpdateTargets
1879  * 		Add column(s) needed for update/delete on a foreign table, we are
1880  * 		using first column as row identification column, so we are adding
1881  * 		that into target list.
1882  */
1883 static void
mysqlAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1884 mysqlAddForeignUpdateTargets(Query *parsetree,
1885 							 RangeTblEntry *target_rte,
1886 							 Relation target_relation)
1887 {
1888 	Var		   *var;
1889 	const char *attrname;
1890 	TargetEntry *tle;
1891 
1892 	/*
1893 	 * What we need is the rowid which is the first column
1894 	 */
1895 	Form_pg_attribute attr =
1896 		TupleDescAttr(RelationGetDescr(target_relation), 0);
1897 
1898 	/* Make a Var representing the desired value */
1899 	var = makeVar(parsetree->resultRelation,
1900 				  1,
1901 				  attr->atttypid,
1902 				  attr->atttypmod,
1903 				  InvalidOid,
1904 				  0);
1905 
1906 	/* Wrap it in a TLE with the right name ... */
1907 	attrname = NameStr(attr->attname);
1908 
1909 	tle = makeTargetEntry((Expr *) var,
1910 						  list_length(parsetree->targetList) + 1,
1911 						  pstrdup(attrname), true);
1912 
1913 	/* ... and add it to the query's targetlist */
1914 	parsetree->targetList = lappend(parsetree->targetList, tle);
1915 }
1916 
1917 /*
1918  * mysqlExecForeignDelete
1919  * 		Delete one row from a foreign table
1920  */
1921 static TupleTableSlot *
mysqlExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1922 mysqlExecForeignDelete(EState *estate,
1923 					   ResultRelInfo *resultRelInfo,
1924 					   TupleTableSlot *slot,
1925 					   TupleTableSlot *planSlot)
1926 {
1927 	MySQLFdwExecState *fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1928 	Relation	rel = resultRelInfo->ri_RelationDesc;
1929 	MYSQL_BIND *mysql_bind_buffer;
1930 	Oid			foreignTableId = RelationGetRelid(rel);
1931 	bool		is_null = false;
1932 	Oid			typeoid;
1933 	Datum		value;
1934 
1935 	mysql_bind_buffer = (MYSQL_BIND *) palloc(sizeof(MYSQL_BIND));
1936 
1937 	/* Get the id that was passed up as a resjunk column */
1938 	value = ExecGetJunkAttribute(planSlot, 1, &is_null);
1939 	typeoid = get_atttype(foreignTableId, 1);
1940 
1941 	/* Bind qual */
1942 	mysql_bind_sql_var(typeoid, 0, value, mysql_bind_buffer, &is_null);
1943 
1944 	if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1945 		ereport(ERROR,
1946 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1947 				 errmsg("failed to execute the MySQL query: %s",
1948 						mysql_error(fmstate->conn))));
1949 
1950 	/* Execute the query */
1951 	if (mysql_stmt_execute(fmstate->stmt) != 0)
1952 		mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1953 
1954 	/* Return NULL if nothing was updated on the remote end */
1955 	return slot;
1956 }
1957 
1958 /*
1959  * mysqlEndForeignModify
1960  *		Finish an insert/update/delete operation on a foreign table
1961  */
1962 static void
mysqlEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1963 mysqlEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
1964 {
1965 	MySQLFdwExecState *festate = resultRelInfo->ri_FdwState;
1966 
1967 	if (festate && festate->stmt)
1968 	{
1969 		mysql_stmt_close(festate->stmt);
1970 		festate->stmt = NULL;
1971 	}
1972 }
1973 
1974 /*
1975  * mysqlImportForeignSchema
1976  * 		Import a foreign schema (9.5+)
1977  */
1978 #if PG_VERSION_NUM >= 90500
1979 static List *
mysqlImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)1980 mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
1981 {
1982 	List	   *commands = NIL;
1983 	bool		import_default = false;
1984 	bool		import_not_null = true;
1985 	ForeignServer *server;
1986 	UserMapping *user;
1987 	mysql_opt  *options;
1988 	MYSQL	   *conn;
1989 	StringInfoData buf;
1990 	MYSQL_RES  *volatile res = NULL;
1991 	MYSQL_ROW	row;
1992 	ListCell   *lc;
1993 
1994 	/* Parse statement options */
1995 	foreach(lc, stmt->options)
1996 	{
1997 		DefElem    *def = (DefElem *) lfirst(lc);
1998 
1999 		if (strcmp(def->defname, "import_default") == 0)
2000 			import_default = defGetBoolean(def);
2001 		else if (strcmp(def->defname, "import_not_null") == 0)
2002 			import_not_null = defGetBoolean(def);
2003 		else
2004 			ereport(ERROR,
2005 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
2006 					 errmsg("invalid option \"%s\"", def->defname)));
2007 	}
2008 
2009 	/*
2010 	 * Get connection to the foreign server.  Connection manager will
2011 	 * establish new connection if necessary.
2012 	 */
2013 	server = GetForeignServer(serverOid);
2014 	user = GetUserMapping(GetUserId(), server->serverid);
2015 	options = mysql_get_options(serverOid, false);
2016 	conn = mysql_get_connection(server, user, options);
2017 
2018 	/* Create workspace for strings */
2019 	initStringInfo(&buf);
2020 
2021 	/* Check that the schema really exists */
2022 	appendStringInfo(&buf,
2023 					 "SELECT 1 FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s'",
2024 					 stmt->remote_schema);
2025 
2026 	if (mysql_query(conn, buf.data) != 0)
2027 		mysql_error_print(conn);
2028 
2029 	res = mysql_store_result(conn);
2030 	if (!res || mysql_num_rows(res) < 1)
2031 		ereport(ERROR,
2032 				(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
2033 				 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
2034 						stmt->remote_schema, server->servername)));
2035 
2036 	mysql_free_result(res);
2037 	res = NULL;
2038 	resetStringInfo(&buf);
2039 
2040 	/*
2041 	 * Fetch all table data from this schema, possibly restricted by EXCEPT or
2042 	 * LIMIT TO.
2043 	 */
2044 	appendStringInfo(&buf,
2045 					 " SELECT"
2046 					 "  t.TABLE_NAME,"
2047 					 "  c.COLUMN_NAME,"
2048 					 "  CASE"
2049 					 "    WHEN c.DATA_TYPE = 'enum' THEN LOWER(CONCAT(t.TABLE_NAME, '_', c.COLUMN_NAME, '_t'))"
2050 					 "    WHEN c.DATA_TYPE = 'tinyint' THEN 'smallint'"
2051 					 "    WHEN c.DATA_TYPE = 'mediumint' THEN 'integer'"
2052 					 "    WHEN c.DATA_TYPE = 'tinyint unsigned' THEN 'smallint'"
2053 					 "    WHEN c.DATA_TYPE = 'smallint unsigned' THEN 'integer'"
2054 					 "    WHEN c.DATA_TYPE = 'mediumint unsigned' THEN 'integer'"
2055 					 "    WHEN c.DATA_TYPE = 'int unsigned' THEN 'bigint'"
2056 					 "    WHEN c.DATA_TYPE = 'bigint unsigned' THEN 'numeric(20)'"
2057 					 "    WHEN c.DATA_TYPE = 'double' THEN 'double precision'"
2058 					 "    WHEN c.DATA_TYPE = 'float' THEN 'real'"
2059 					 "    WHEN c.DATA_TYPE = 'datetime' THEN 'timestamp'"
2060 					 "    WHEN c.DATA_TYPE = 'longtext' THEN 'text'"
2061 					 "    WHEN c.DATA_TYPE = 'mediumtext' THEN 'text'"
2062 					 "    WHEN c.DATA_TYPE = 'tinytext' THEN 'text'"
2063 					 "    WHEN c.DATA_TYPE = 'blob' THEN 'bytea'"
2064 					 "    WHEN c.DATA_TYPE = 'mediumblob' THEN 'bytea'"
2065 					 "    WHEN c.DATA_TYPE = 'longblob' THEN 'bytea'"
2066 					 "    ELSE c.DATA_TYPE"
2067 					 "  END,"
2068 					 "  c.COLUMN_TYPE,"
2069 					 "  IF(c.IS_NULLABLE = 'NO', 't', 'f'),"
2070 					 "  c.COLUMN_DEFAULT"
2071 					 " FROM"
2072 					 "  information_schema.TABLES AS t"
2073 					 " JOIN"
2074 					 "  information_schema.COLUMNS AS c"
2075 					 " ON"
2076 					 "  t.TABLE_CATALOG <=> c.TABLE_CATALOG AND t.TABLE_SCHEMA <=> c.TABLE_SCHEMA AND t.TABLE_NAME <=> c.TABLE_NAME"
2077 					 " WHERE"
2078 					 "  t.TABLE_SCHEMA = '%s'",
2079 					 stmt->remote_schema);
2080 
2081 	/* Apply restrictions for LIMIT TO and EXCEPT */
2082 	if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
2083 		stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
2084 	{
2085 		bool		first_item = true;
2086 
2087 		appendStringInfoString(&buf, " AND t.TABLE_NAME ");
2088 		if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
2089 			appendStringInfoString(&buf, "NOT ");
2090 		appendStringInfoString(&buf, "IN (");
2091 
2092 		/* Append list of table names within IN clause */
2093 		foreach(lc, stmt->table_list)
2094 		{
2095 			RangeVar   *rv = (RangeVar *) lfirst(lc);
2096 
2097 			if (first_item)
2098 				first_item = false;
2099 			else
2100 				appendStringInfoString(&buf, ", ");
2101 
2102 			appendStringInfo(&buf, "'%s'", rv->relname);
2103 		}
2104 		appendStringInfoChar(&buf, ')');
2105 	}
2106 
2107 	/* Append ORDER BY at the end of query to ensure output ordering */
2108 	appendStringInfo(&buf, " ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION");
2109 
2110 	/* Fetch the data */
2111 	if (mysql_query(conn, buf.data) != 0)
2112 		mysql_error_print(conn);
2113 
2114 	res = mysql_store_result(conn);
2115 	row = mysql_fetch_row(res);
2116 	while (row)
2117 	{
2118 		char	   *tablename = row[0];
2119 		bool		first_item = true;
2120 
2121 		resetStringInfo(&buf);
2122 		appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
2123 						 quote_identifier(tablename));
2124 
2125 		/* Scan all rows for this table */
2126 		do
2127 		{
2128 			char	   *attname;
2129 			char	   *typename;
2130 			char	   *typedfn;
2131 			char	   *attnotnull;
2132 			char	   *attdefault;
2133 
2134 			/* If table has no columns, we'll see nulls here */
2135 			if (row[1] == NULL)
2136 				continue;
2137 
2138 			attname = row[1];
2139 			typename = row[2];
2140 
2141 			if (strcmp(typename, "char") == 0 || strcmp(typename, "varchar") == 0)
2142 				typename = row[3];
2143 
2144 			typedfn = row[3];
2145 			attnotnull = row[4];
2146 			attdefault = row[5] == NULL ? (char *) NULL : row[5];
2147 
2148 			if (strncmp(typedfn, "enum(", 5) == 0)
2149 				ereport(NOTICE,
2150 						(errmsg("error while generating the table definition"),
2151 						 errhint("If you encounter an error, you may need to execute the following first:\nDO $$BEGIN IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_type WHERE typname = '%s') THEN CREATE TYPE %s AS %s; END IF; END$$;\n",
2152 								 typename, typename, typedfn)));
2153 
2154 			if (first_item)
2155 				first_item = false;
2156 			else
2157 				appendStringInfoString(&buf, ",\n");
2158 
2159 			/* Print column name and type */
2160 			appendStringInfo(&buf, "  %s %s", quote_identifier(attname),
2161 							 typename);
2162 
2163 			/* Add DEFAULT if needed */
2164 			if (import_default && attdefault != NULL)
2165 				appendStringInfo(&buf, " DEFAULT %s", attdefault);
2166 
2167 			/* Add NOT NULL if needed */
2168 			if (import_not_null && attnotnull[0] == 't')
2169 				appendStringInfoString(&buf, " NOT NULL");
2170 		}
2171 		while ((row = mysql_fetch_row(res)) &&
2172 			   (strcmp(row[0], tablename) == 0));
2173 
2174 		/*
2175 		 * Add server name and table-level options.  We specify remote
2176 		 * database and table name as options (the latter to ensure that
2177 		 * renaming the foreign table doesn't break the association).
2178 		 */
2179 		appendStringInfo(&buf,
2180 						 "\n) SERVER %s OPTIONS (dbname '%s', table_name '%s');\n",
2181 						 quote_identifier(server->servername),
2182 						 stmt->remote_schema,
2183 						 tablename);
2184 
2185 		commands = lappend(commands, pstrdup(buf.data));
2186 	}
2187 
2188 	/* Clean up */
2189 	mysql_free_result(res);
2190 	res = NULL;
2191 	resetStringInfo(&buf);
2192 
2193 	mysql_release_connection(conn);
2194 
2195 	return commands;
2196 }
2197 #endif
2198 
2199 #if PG_VERSION_NUM >= 110000
2200 /*
2201  * mysqlBeginForeignInsert
2202  * 		Prepare for an insert operation triggered by partition routing
2203  * 		or COPY FROM.
2204  *
2205  * This is not yet supported, so raise an error.
2206  */
2207 static void
mysqlBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)2208 mysqlBeginForeignInsert(ModifyTableState *mtstate,
2209 						ResultRelInfo *resultRelInfo)
2210 {
2211 	ereport(ERROR,
2212 			(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2213 			 errmsg("COPY and foreign partition routing not supported in mysql_fdw")));
2214 }
2215 
2216 /*
2217  * mysqlEndForeignInsert
2218  * 		BeginForeignInsert() is not yet implemented, hence we do not
2219  * 		have anything to cleanup as of now. We throw an error here just
2220  * 		to make sure when we do that we do not forget to cleanup
2221  * 		resources.
2222  */
2223 static void
mysqlEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2224 mysqlEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo)
2225 {
2226 	ereport(ERROR,
2227 			(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2228 			 errmsg("COPY and foreign partition routing not supported in mysql_fdw")));
2229 }
2230 #endif
2231 
2232 /*
2233  * Prepare for processing of parameters used in remote query.
2234  */
2235 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values,Oid ** param_types)2236 prepare_query_params(PlanState *node,
2237 					 List *fdw_exprs,
2238 					 int numParams,
2239 					 FmgrInfo **param_flinfo,
2240 					 List **param_exprs,
2241 					 const char ***param_values,
2242 					 Oid **param_types)
2243 {
2244 	int			i;
2245 	ListCell   *lc;
2246 
2247 	Assert(numParams > 0);
2248 
2249 	/* Prepare for output conversion of parameters used in remote query. */
2250 	*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
2251 
2252 	*param_types = (Oid *) palloc0(sizeof(Oid) * numParams);
2253 
2254 	i = 0;
2255 	foreach(lc, fdw_exprs)
2256 	{
2257 		Node	   *param_expr = (Node *) lfirst(lc);
2258 		Oid			typefnoid;
2259 		bool		isvarlena;
2260 
2261 		(*param_types)[i] = exprType(param_expr);
2262 
2263 		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
2264 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
2265 		i++;
2266 	}
2267 
2268 	/*
2269 	 * Prepare remote-parameter expressions for evaluation.  (Note: in
2270 	 * practice, we expect that all these expressions will be just Params, so
2271 	 * we could possibly do something more efficient than using the full
2272 	 * expression-eval machinery for this.  But probably there would be little
2273 	 * benefit, and it'd require postgres_fdw to know more than is desirable
2274 	 * about Param evaluation.)
2275 	 */
2276 #if PG_VERSION_NUM >= 100000
2277 	*param_exprs = ExecInitExprList(fdw_exprs, node);
2278 #else
2279 	*param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
2280 #endif
2281 
2282 	/* Allocate buffer for text form of query parameters. */
2283 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
2284 }
2285 
2286 /*
2287  * Construct array of query parameter values in text format.
2288  */
2289 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values,MYSQL_BIND ** mysql_bind_buf,Oid * param_types)2290 process_query_params(ExprContext *econtext,
2291 					 FmgrInfo *param_flinfo,
2292 					 List *param_exprs,
2293 					 const char **param_values,
2294 					 MYSQL_BIND **mysql_bind_buf,
2295 					 Oid *param_types)
2296 {
2297 	int			i;
2298 	ListCell   *lc;
2299 
2300 	i = 0;
2301 	foreach(lc, param_exprs)
2302 	{
2303 		ExprState  *expr_state = (ExprState *) lfirst(lc);
2304 		Datum		expr_value;
2305 		bool		isNull;
2306 
2307 		/* Evaluate the parameter expression */
2308 #if PG_VERSION_NUM >= 100000
2309 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
2310 #else
2311 		expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
2312 #endif
2313 		mysql_bind_sql_var(param_types[i], i, expr_value, *mysql_bind_buf,
2314 						   &isNull);
2315 
2316 		/*
2317 		 * Get string representation of each parameter value by invoking
2318 		 * type-specific output function, unless the value is null.
2319 		 */
2320 		if (isNull)
2321 			param_values[i] = NULL;
2322 		else
2323 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
2324 		i++;
2325 	}
2326 }
2327 
2328 /*
2329  * Process the query params and bind the same with the statement, if any.
2330  * Also, execute the statement.
2331  */
2332 static void
bind_stmt_params_and_exec(ForeignScanState * node)2333 bind_stmt_params_and_exec(ForeignScanState *node)
2334 {
2335 	MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
2336 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2337 	int			numParams = festate->numParams;
2338 	const char **values = festate->param_values;
2339 	MYSQL_BIND *mysql_bind_buffer = NULL;
2340 
2341 	/*
2342 	 * Construct array of query parameter values in text format.  We do the
2343 	 * conversions in the short-lived per-tuple context, so as not to cause a
2344 	 * memory leak over repeated scans.
2345 	 */
2346 	if (numParams > 0)
2347 	{
2348 		MemoryContext oldcontext;
2349 
2350 		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2351 
2352 		mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * numParams);
2353 
2354 		process_query_params(econtext,
2355 							 festate->param_flinfo,
2356 							 festate->param_exprs,
2357 							 values,
2358 							 &mysql_bind_buffer,
2359 							 festate->param_types);
2360 
2361 		mysql_stmt_bind_param(festate->stmt, mysql_bind_buffer);
2362 
2363 		MemoryContextSwitchTo(oldcontext);
2364 	}
2365 
2366 	/*
2367 	 * Finally, execute the query. The result will be placed in the array we
2368 	 * already bind.
2369 	 */
2370 	if (mysql_stmt_execute(festate->stmt) != 0)
2371 		mysql_stmt_error_print(festate, "failed to execute the MySQL query");
2372 
2373 	/* Mark the query as executed */
2374 	festate->query_executed = true;
2375 }
2376 
2377 Datum
mysql_fdw_version(PG_FUNCTION_ARGS)2378 mysql_fdw_version(PG_FUNCTION_ARGS)
2379 {
2380 	PG_RETURN_INT32(CODE_VERSION);
2381 }
2382 
2383 static void
mysql_error_print(MYSQL * conn)2384 mysql_error_print(MYSQL *conn)
2385 {
2386 	switch (mysql_errno(conn))
2387 	{
2388 		case CR_NO_ERROR:
2389 			/* Should not happen, though give some message */
2390 			elog(ERROR, "unexpected error code");
2391 			break;
2392 		case CR_OUT_OF_MEMORY:
2393 		case CR_SERVER_GONE_ERROR:
2394 		case CR_SERVER_LOST:
2395 		case CR_UNKNOWN_ERROR:
2396 			mysql_release_connection(conn);
2397 			ereport(ERROR,
2398 					(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2399 					 errmsg("failed to execute the MySQL query: \n%s",
2400 							mysql_error(conn))));
2401 			break;
2402 		case CR_COMMANDS_OUT_OF_SYNC:
2403 		default:
2404 			ereport(ERROR,
2405 					(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2406 					 errmsg("failed to execute the MySQL query: \n%s",
2407 							mysql_error(conn))));
2408 	}
2409 }
2410 
2411 static void
mysql_stmt_error_print(MySQLFdwExecState * festate,const char * msg)2412 mysql_stmt_error_print(MySQLFdwExecState *festate, const char *msg)
2413 {
2414 	switch (mysql_stmt_errno(festate->stmt))
2415 	{
2416 		case CR_NO_ERROR:
2417 			/* Should not happen, though give some message */
2418 			elog(ERROR, "unexpected error code");
2419 			break;
2420 		case CR_OUT_OF_MEMORY:
2421 		case CR_SERVER_GONE_ERROR:
2422 		case CR_SERVER_LOST:
2423 		case CR_UNKNOWN_ERROR:
2424 			mysql_release_connection(festate->conn);
2425 			ereport(ERROR,
2426 					(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2427 					 errmsg("%s: \n%s", msg, mysql_error(festate->conn))));
2428 			break;
2429 		case CR_COMMANDS_OUT_OF_SYNC:
2430 		default:
2431 			ereport(ERROR,
2432 					(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2433 					 errmsg("%s: \n%s", msg, mysql_error(festate->conn))));
2434 			break;
2435 	}
2436 }
2437 
2438 /*
2439  * getUpdateTargetAttrs
2440  * 		Returns the list of attribute numbers of the columns being updated.
2441  */
2442 static List *
getUpdateTargetAttrs(RangeTblEntry * rte)2443 getUpdateTargetAttrs(RangeTblEntry *rte)
2444 {
2445 	List	   *targetAttrs = NIL;
2446 
2447 #if PG_VERSION_NUM >= 90500
2448 	Bitmapset  *tmpset = bms_copy(rte->updatedCols);
2449 #else
2450 	Bitmapset  *tmpset = bms_copy(rte->modifiedCols);
2451 #endif
2452 	AttrNumber	col;
2453 
2454 	while ((col = bms_first_member(tmpset)) >= 0)
2455 	{
2456 		col += FirstLowInvalidHeapAttributeNumber;
2457 		if (col <= InvalidAttrNumber)	/* shouldn't happen */
2458 			elog(ERROR, "system-column update is not supported");
2459 
2460 		/* We also disallow updates to the first column */
2461 		if (col == 1)
2462 			ereport(ERROR,
2463 				(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2464 				 errmsg("row identifier column update is not supported")));
2465 
2466 		targetAttrs = lappend_int(targetAttrs, col);
2467 	}
2468 
2469 	return targetAttrs;
2470 }
2471 
2472 /*
2473  * mysqlGetForeignJoinPaths
2474  *		Add possible ForeignPath to joinrel, if join is safe to push down.
2475  */
2476 #if PG_VERSION_NUM >= 90600
2477 static void
mysqlGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)2478 mysqlGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel,
2479 						 RelOptInfo *outerrel, RelOptInfo *innerrel,
2480 						 JoinType jointype, JoinPathExtraData *extra)
2481 {
2482 	MySQLFdwRelationInfo *fpinfo;
2483 	ForeignPath *joinpath;
2484 	Cost		startup_cost;
2485 	Cost		total_cost;
2486 	Path	   *epq_path = NULL; /* Path to create plan to be executed when
2487 								  * EvalPlanQual gets triggered. */
2488 
2489 	/*
2490 	 * Skip if this join combination has been considered already.
2491 	 */
2492 	if (joinrel->fdw_private)
2493 		return;
2494 
2495 	/*
2496 	 * Create unfinished MySQLFdwRelationInfo entry which is used to indicate
2497 	 * that the join relation is already considered, so that we won't waste
2498 	 * time in judging safety of join pushdown and adding the same paths again
2499 	 * if found safe.  Once we know that this join can be pushed down, we fill
2500 	 * the entry.
2501 	 */
2502 	fpinfo = (MySQLFdwRelationInfo *) palloc0(sizeof(MySQLFdwRelationInfo));
2503 	fpinfo->pushdown_safe = false;
2504 	joinrel->fdw_private = fpinfo;
2505 	/* attrs_used is only for base relations. */
2506 	fpinfo->attrs_used = NULL;
2507 
2508 	/*
2509 	 * In case there is a possibility that EvalPlanQual will be executed, we
2510 	 * should be able to reconstruct the row, from base relations applying all
2511 	 * the conditions.  We create a local plan from a suitable local path
2512 	 * available in the path list.  In case such a path doesn't exist, we can
2513 	 * not push the join to the foreign server since we won't be able to
2514 	 * reconstruct the row for EvalPlanQual().  Find an alternative local path
2515 	 * before we add ForeignPath, lest the new path would kick possibly the
2516 	 * only local path.  Do this before calling mysql_foreign_join_ok(), since
2517 	 * that function updates fpinfo and marks it as pushable if the join is
2518 	 * found to be pushable.
2519 	 */
2520 	if (root->parse->commandType == CMD_DELETE ||
2521 		root->parse->commandType == CMD_UPDATE ||
2522 		root->rowMarks)
2523 	{
2524 		epq_path = GetExistingLocalJoinPath(joinrel);
2525 		if (!epq_path)
2526 		{
2527 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
2528 			return;
2529 		}
2530 	}
2531 	else
2532 		epq_path = NULL;
2533 
2534 	if (!mysql_foreign_join_ok(root, joinrel, jointype, outerrel, innerrel,
2535 							   extra))
2536 	{
2537 		/* Free path required for EPQ if we copied one; we don't need it now */
2538 		if (epq_path)
2539 			pfree(epq_path);
2540 		return;
2541 	}
2542 
2543 	/* TODO: Put accurate estimates here */
2544 	startup_cost = 15.0;
2545 	total_cost = 20 + startup_cost;
2546 
2547 	/*
2548 	 * Create a new join path and add it to the joinrel which represents a
2549 	 * join between foreign tables.
2550 	 */
2551 #if PG_VERSION_NUM >= 120000
2552 	joinpath = create_foreign_join_path(root,
2553 									   joinrel,
2554 									   NULL,	/* default pathtarget */
2555 									   joinrel->rows,
2556 									   startup_cost,
2557 									   total_cost,
2558 									   NIL,		/* no pathkeys */
2559 									   joinrel->lateral_relids,
2560 									   epq_path,
2561 									   NIL);	/* no fdw_private */
2562 #else
2563 	joinpath = create_foreignscan_path(root,
2564 									   joinrel,
2565 									   NULL,	/* default pathtarget */
2566 									   joinrel->rows,
2567 									   startup_cost,
2568 									   total_cost,
2569 									   NIL, 	/* no pathkeys */
2570 									   joinrel->lateral_relids,
2571 									   epq_path,
2572 									   NIL);	/* no fdw_private */
2573 #endif      /* PG_VERSION_NUM >= 120000 */
2574 
2575 	/* Add generated path into joinrel by add_path(). */
2576 	add_path(joinrel, (Path *) joinpath);
2577 
2578 	/* XXX Consider pathkeys for the join relation */
2579 
2580 	/* XXX Consider parameterized paths for the join relation */
2581 }
2582 
2583 /*
2584  * mysql_foreign_join_ok
2585  * 		Assess whether the join between inner and outer relations can be
2586  * 		pushed down to the foreign server.
2587  *
2588  * As a side effect, save information we obtain in this function to
2589  * MySQLFdwRelationInfo passed in.
2590  */
2591 static bool
mysql_foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)2592 mysql_foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
2593 					  JoinType jointype, RelOptInfo *outerrel,
2594 					  RelOptInfo *innerrel, JoinPathExtraData *extra)
2595 {
2596 	MySQLFdwRelationInfo *fpinfo;
2597 	MySQLFdwRelationInfo *fpinfo_o;
2598 	MySQLFdwRelationInfo *fpinfo_i;
2599 	ListCell   *lc;
2600 	List	   *joinclauses;
2601 
2602 	/*
2603 	 * We support pushing down INNER, LEFT and RIGHT joins.
2604 	 * Constructing queries representing SEMI and ANTI joins is hard, hence
2605 	 * not considered right now.
2606 	 */
2607 	if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
2608 		jointype != JOIN_RIGHT)
2609 		return false;
2610 
2611 	/*
2612 	 * If either of the joining relations is marked as unsafe to pushdown, the
2613 	 * join cannot be pushed down.
2614 	 */
2615 	fpinfo = (MySQLFdwRelationInfo *) joinrel->fdw_private;
2616 	fpinfo_o = (MySQLFdwRelationInfo *) outerrel->fdw_private;
2617 	fpinfo_i = (MySQLFdwRelationInfo *) innerrel->fdw_private;
2618 	if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
2619 		!fpinfo_i || !fpinfo_i->pushdown_safe)
2620 		return false;
2621 
2622 	/*
2623 	 * If joining relations have local conditions, those conditions are
2624 	 * required to be applied before joining the relations.  Hence the join can
2625 	 * not be pushed down.
2626 	 */
2627 	if (fpinfo_o->local_conds || fpinfo_i->local_conds)
2628 		return false;
2629 
2630 	/*
2631 	 * Separate restrict list into join quals and pushed-down (other) quals.
2632 	 *
2633 	 * Join quals belonging to an outer join must all be shippable, else we
2634 	 * cannot execute the join remotely.  Add such quals to 'joinclauses'.
2635 	 *
2636 	 * Add other quals to fpinfo->remote_conds if they are shippable, else to
2637 	 * fpinfo->local_conds.  In an inner join it's okay to execute conditions
2638 	 * either locally or remotely; the same is true for pushed-down conditions
2639 	 * at an outer join.
2640 	 *
2641 	 * Note we might return failure after having already scribbled on
2642 	 * fpinfo->remote_conds and fpinfo->local_conds.  That's okay because we
2643 	 * won't consult those lists again if we deem the join unshippable.
2644 	 */
2645 	joinclauses = NIL;
2646 	foreach(lc, extra->restrictlist)
2647 	{
2648 		RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
2649 		bool		is_remote_clause = mysql_is_foreign_expr(root, joinrel,
2650 															 rinfo->clause,
2651 															 true);
2652 
2653 		if (IS_OUTER_JOIN(jointype) &&
2654 			!RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
2655 		{
2656 			if (!is_remote_clause)
2657 				return false;
2658 			joinclauses = lappend(joinclauses, rinfo);
2659 		}
2660 		else
2661 		{
2662 			if (is_remote_clause)
2663 			{
2664 				/*
2665 				 * Unlike postgres_fdw, don't append the join clauses to
2666 				 * remote_conds, instead keep the join clauses separate.
2667 				 * Currently, we are providing limited operator pushability
2668 				 * support for join pushdown, hence we keep those clauses
2669 				 * separate to avoid INNER JOIN not getting pushdown if any
2670 				 * of the WHERE clause is not shippable as per join pushdown
2671 				 * shippability.
2672 				 */
2673 				if (jointype == JOIN_INNER)
2674 					joinclauses = lappend(joinclauses, rinfo);
2675 				else
2676 					fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
2677 			}
2678 			else
2679 				fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
2680 		}
2681 	}
2682 
2683 	/*
2684 	 * mysqlDeparseExplicitTargetList() isn't smart enough to handle anything
2685 	 * other than a Var.  In particular, if there's some PlaceHolderVar that
2686 	 * would need to be evaluated within this join tree (because there's an
2687 	 * upper reference to a quantity that may go to NULL as a result of an
2688 	 * outer join), then we can't try to push the join down because we'll fail
2689 	 * when we get to mysqlDeparseExplicitTargetList().  However, a
2690 	 * PlaceHolderVar that needs to be evaluated *at the top* of this join tree
2691 	 * is OK, because we can do that locally after fetching the results from
2692 	 * the remote side.
2693 	 */
2694 	foreach(lc, root->placeholder_list)
2695 	{
2696 		PlaceHolderInfo *phinfo = lfirst(lc);
2697 		Relids		relids;
2698 
2699 		/* PlaceHolderInfo refers to parent relids, not child relids. */
2700 #if PG_VERSION_NUM >= 100000
2701 		relids = IS_OTHER_REL(joinrel) ?
2702 			joinrel->top_parent_relids : joinrel->relids;
2703 #else
2704 		relids = joinrel->relids;
2705 #endif			/* PG_VERSION_NUM >= 100000 */
2706 
2707 		if (bms_is_subset(phinfo->ph_eval_at, relids) &&
2708 			bms_nonempty_difference(relids, phinfo->ph_eval_at))
2709 			return false;
2710 	}
2711 
2712 	/* Save the join clauses, for later use. */
2713 	fpinfo->joinclauses = joinclauses;
2714 
2715 	/*
2716 	 * Pull the other remote conditions from the joining relations into join
2717 	 * clauses or other remote clauses (remote_conds) of this relation.  This
2718 	 * avoids building subqueries at every join step.
2719 	 *
2720 	 * For an inner join, clauses from both the relations are added to the
2721 	 * other remote clauses.  For an OUTER join, the clauses from the outer
2722 	 * side are added to remote_conds since those can be evaluated after the
2723 	 * join is evaluated.  The clauses from inner side are added to the
2724 	 * joinclauses, since they need to evaluated while constructing the join.
2725 	 *
2726 	 * The joining sides cannot have local conditions, thus no need to test
2727 	 * shippability of the clauses being pulled up.
2728 	 */
2729 	switch (jointype)
2730 	{
2731 		case JOIN_INNER:
2732 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2733 											   fpinfo_i->remote_conds);
2734 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2735 											   fpinfo_o->remote_conds);
2736 			break;
2737 
2738 		case JOIN_LEFT:
2739 			/* Check that clauses from the inner side are pushable or not. */
2740 			foreach(lc, fpinfo_i->remote_conds)
2741 			{
2742 				RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
2743 
2744 				if (!mysql_is_foreign_expr(root, joinrel, ri->clause, true))
2745 					return false;
2746 			}
2747 
2748 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
2749 											  fpinfo_i->remote_conds);
2750 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2751 											   fpinfo_o->remote_conds);
2752 			break;
2753 
2754 		case JOIN_RIGHT:
2755 			/* Check that clauses from the outer side are pushable or not. */
2756 			foreach(lc, fpinfo_o->remote_conds)
2757 			{
2758 				RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
2759 
2760 				if (!mysql_is_foreign_expr(root, joinrel, ri->clause, true))
2761 					return false;
2762 			}
2763 
2764 			fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
2765 											  fpinfo_o->remote_conds);
2766 			fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2767 											   fpinfo_i->remote_conds);
2768 			break;
2769 
2770 		default:
2771 			/* Should not happen, we have just check this above */
2772 			elog(ERROR, "unsupported join type %d", jointype);
2773 	}
2774 
2775 	fpinfo->outerrel = outerrel;
2776 	fpinfo->innerrel = innerrel;
2777 	fpinfo->jointype = jointype;
2778 
2779 	/* Mark that this join can be pushed down safely */
2780 	fpinfo->pushdown_safe = true;
2781 
2782 	/*
2783 	 * Set the string describing this join relation to be used in EXPLAIN
2784 	 * output of corresponding ForeignScan.
2785 	 */
2786 	fpinfo->relation_name = makeStringInfo();
2787 	appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
2788 					 fpinfo_o->relation_name->data,
2789 					 mysql_get_jointype_name(fpinfo->jointype),
2790 					 fpinfo_i->relation_name->data);
2791 
2792 	return true;
2793 }
2794 
2795 /*
2796  * mysqlRecheckForeignScan
2797  *		Execute a local join execution plan for a foreign join.
2798  */
2799 static bool
mysqlRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2800 mysqlRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2801 {
2802 	Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2803 	PlanState  *outerPlan = outerPlanState(node);
2804 	TupleTableSlot *result;
2805 
2806 	/* For base foreign relations, it suffices to set fdw_recheck_quals */
2807 	if (scanrelid > 0)
2808 		return true;
2809 
2810 	Assert(outerPlan != NULL);
2811 
2812 	/* Execute a local join execution plan */
2813 	result = ExecProcNode(outerPlan);
2814 	if (TupIsNull(result))
2815 		return false;
2816 
2817 	/* Store result in the given slot */
2818 	ExecCopySlot(slot, result);
2819 
2820 	return true;
2821 }
2822 
2823 /*
2824  * mysql_adjust_whole_row_ref
2825  * 		If the given list of Var nodes has whole-row reference, add Var
2826  * 		nodes corresponding to all the attributes of the corresponding
2827  * 		base relation.
2828  *
2829  * The function also returns an array of lists of var nodes.  The array is
2830  * indexed by the RTI and entry there contains the list of Var nodes which
2831  * make up the whole-row reference for corresponding base relation.
2832  * The relations not covered by given join and the relations which do not
2833  * have whole-row references will have NIL entries.
2834  *
2835  * If there are no whole-row references in the given list, the given list is
2836  * returned unmodified and the other list is NIL.
2837  */
2838 static List *
mysql_adjust_whole_row_ref(PlannerInfo * root,List * scan_var_list,List ** whole_row_lists,Bitmapset * relids)2839 mysql_adjust_whole_row_ref(PlannerInfo *root, List *scan_var_list,
2840 						   List **whole_row_lists, Bitmapset *relids)
2841 {
2842 	ListCell   *lc;
2843 	bool		has_whole_row = false;
2844 	List	  **wr_list_array = NULL;
2845 	int			cnt_rt;
2846 	List	   *wr_scan_var_list = NIL;
2847 
2848 	*whole_row_lists = NIL;
2849 
2850 	/* Check if there exists at least one whole row reference. */
2851 	foreach(lc, scan_var_list)
2852 	{
2853 		Var		   *var = (Var *) lfirst(lc);
2854 
2855 		Assert(IsA(var, Var));
2856 
2857 		if (var->varattno == 0)
2858 		{
2859 			has_whole_row = true;
2860 			break;
2861 		}
2862 	}
2863 
2864 	if (!has_whole_row)
2865 		return scan_var_list;
2866 
2867 	/*
2868 	 * Allocate large enough memory to hold whole-row Var lists for all the
2869 	 * relations.  This array will then be converted into a list of lists.
2870 	 * Since all the base relations are marked by range table index, it's easy
2871 	 * to keep track of the ones whose whole-row references have been taken
2872 	 * care of.
2873 	 */
2874 	wr_list_array = (List **) palloc0(sizeof(List *) *
2875 									  list_length(root->parse->rtable));
2876 
2877 	/* Adjust the whole-row references as described in the prologue. */
2878 	foreach(lc, scan_var_list)
2879 	{
2880 		Var		   *var = (Var *) lfirst(lc);
2881 
2882 		Assert(IsA(var, Var));
2883 
2884 		if (var->varattno == 0 && !wr_list_array[var->varno - 1])
2885 		{
2886 			List	   *wr_var_list;
2887 			List	   *retrieved_attrs;
2888 			RangeTblEntry *rte = rt_fetch(var->varno, root->parse->rtable);
2889 			Bitmapset  *attrs_used;
2890 
2891 			Assert(OidIsValid(rte->relid));
2892 
2893 			/*
2894 			 * Get list of Var nodes for all undropped attributes of the base
2895 			 * relation.
2896 			 */
2897 			attrs_used = bms_make_singleton(0 -
2898 											FirstLowInvalidHeapAttributeNumber);
2899 
2900 			/*
2901 			 * If the whole-row reference falls on the nullable side of the
2902 			 * outer join and that side is null in a given result row, the
2903 			 * whole row reference should be set to NULL.  In this case, all
2904 			 * the columns of that relation will be NULL, but that does not
2905 			 * help since those columns can be genuinely NULL in a row.
2906 			 */
2907 			wr_var_list =
2908 				mysql_build_scan_list_for_baserel(rte->relid, var->varno,
2909 												  attrs_used,
2910 												  &retrieved_attrs);
2911 			wr_list_array[var->varno - 1] = wr_var_list;
2912 			wr_scan_var_list = list_concat_unique(wr_scan_var_list,
2913 												  wr_var_list);
2914 			bms_free(attrs_used);
2915 			list_free(retrieved_attrs);
2916 		}
2917 		else
2918 			wr_scan_var_list = list_append_unique(wr_scan_var_list, var);
2919 	}
2920 
2921 	/*
2922 	 * Collect the required Var node lists into a list of lists ordered by the
2923 	 * base relations' range table indexes.
2924 	 */
2925 	cnt_rt = -1;
2926 	while ((cnt_rt = bms_next_member(relids, cnt_rt)) >= 0)
2927 		*whole_row_lists = lappend(*whole_row_lists, wr_list_array[cnt_rt - 1]);
2928 
2929 	pfree(wr_list_array);
2930 	return wr_scan_var_list;
2931 }
2932 
2933 /*
2934  * mysql_build_scan_list_for_baserel
2935  * 		Build list of nodes corresponding to the attributes requested for
2936  * 		given base relation.
2937  *
2938  * The list contains Var nodes corresponding to the attributes specified in
2939  * attrs_used.  If whole-row reference is required, the functions adds Var
2940  * nodes corresponding to all the attributes in the relation.
2941  */
2942 static List *
mysql_build_scan_list_for_baserel(Oid relid,Index varno,Bitmapset * attrs_used,List ** retrieved_attrs)2943 mysql_build_scan_list_for_baserel(Oid relid, Index varno,
2944 								  Bitmapset *attrs_used,
2945 								  List **retrieved_attrs)
2946 {
2947 	int			attno;
2948 	List	   *tlist = NIL;
2949 	Node	   *node;
2950 	bool		wholerow_requested = false;
2951 	Relation	relation;
2952 	TupleDesc	tupdesc;
2953 
2954 	Assert(OidIsValid(relid));
2955 
2956 	*retrieved_attrs = NIL;
2957 
2958 	/* Planner must have taken a lock, so request no lock here */
2959 #if PG_VERSION_NUM < 130000
2960 	relation = heap_open(relid, NoLock);
2961 #else
2962 	relation = table_open(relid, NoLock);
2963 #endif
2964 
2965 	tupdesc = RelationGetDescr(relation);
2966 
2967 	/* Is whole-row reference requested? */
2968 	wholerow_requested = bms_is_member(0 - FirstLowInvalidHeapAttributeNumber,
2969 									   attrs_used);
2970 
2971 	/* Handle user defined attributes. */
2972 	for (attno = 1; attno <= tupdesc->natts; attno++)
2973 	{
2974 		Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1);
2975 
2976 		/* Ignore dropped attributes. */
2977 		if (attr->attisdropped)
2978 			continue;
2979 
2980 		/*
2981 		 * For a required attribute create a Var node and add corresponding
2982 		 * attribute number to the retrieved_attrs list.
2983 		 */
2984 		if (wholerow_requested ||
2985 			bms_is_member(attno - FirstLowInvalidHeapAttributeNumber,
2986 						  attrs_used))
2987 		{
2988 			node = (Node *) makeVar(varno, attno, attr->atttypid,
2989 									attr->atttypmod, attr->attcollation, 0);
2990 			tlist = lappend(tlist, node);
2991 
2992 			*retrieved_attrs = lappend_int(*retrieved_attrs, attno);
2993 		}
2994 	}
2995 
2996 #if PG_VERSION_NUM < 130000
2997 	heap_close(relation, NoLock);
2998 #else
2999 	table_close(relation, NoLock);
3000 #endif
3001 
3002 	return tlist;
3003 }
3004 #endif     /* PG_VERSION_NUM >= 90600 */
3005 
3006 /*
3007  * mysql_build_whole_row_constr_info
3008  *		Calculate and save the information required to construct whole row
3009  *		references of base foreign relations involved in the pushed down join.
3010  *
3011  * tupdesc is the tuple descriptor describing the result returned by the
3012  * ForeignScan node.  It is expected to be same as
3013  * ForeignScanState::ss::ss_ScanTupleSlot, which is constructed using
3014  * fdw_scan_tlist.
3015  *
3016  * relids is the the set of relations participating in the pushed down join.
3017  *
3018  * max_relid is the maximum number of relation index expected.
3019  *
3020  * whole_row_lists is the list of Var node lists constituting the whole-row
3021  * reference for base relations in the relids in the same order.
3022  *
3023  * scan_tlist is the targetlist representing the result fetched from the
3024  * foreign server.
3025  *
3026  * fdw_scan_tlist is the targetlist representing the result returned by the
3027  * ForeignScan node.
3028  */
3029 static void
mysql_build_whole_row_constr_info(MySQLFdwExecState * festate,TupleDesc tupdesc,Bitmapset * relids,int max_relid,List * whole_row_lists,List * scan_tlist,List * fdw_scan_tlist)3030 mysql_build_whole_row_constr_info(MySQLFdwExecState *festate,
3031 								  TupleDesc tupdesc, Bitmapset *relids,
3032 								  int max_relid, List *whole_row_lists,
3033 								  List *scan_tlist, List *fdw_scan_tlist)
3034 {
3035 	int			cnt_rt;
3036 	int			cnt_vl;
3037 	int			cnt_attr;
3038 	ListCell   *lc;
3039 	int		   *fs_attr_pos = NULL;
3040 	MySQLWRState **mysqlwrstates = NULL;
3041 	int			fs_num_atts;
3042 
3043 	/*
3044 	 * Allocate memory to hold whole-row reference state for each relation.
3045 	 * Indexing by the range table index is faster than maintaining an
3046 	 * associative map.
3047 	 */
3048 	mysqlwrstates = (MySQLWRState **) palloc0(sizeof(MySQLWRState *) * max_relid);
3049 
3050 	/*
3051 	 * Set the whole-row reference state for the relations whose whole-row
3052 	 * reference needs to be constructed.
3053 	 */
3054 	cnt_rt = -1;
3055 	cnt_vl = 0;
3056 	while ((cnt_rt = bms_next_member(relids, cnt_rt)) >= 0)
3057 	{
3058 		MySQLWRState *wr_state = (MySQLWRState *) palloc0(sizeof(MySQLWRState));
3059 		List	   *var_list = list_nth(whole_row_lists, cnt_vl++);
3060 		int			natts;
3061 
3062 		/* Skip the relations without whole-row references. */
3063 		if (list_length(var_list) <= 0)
3064 			continue;
3065 
3066 		natts = list_length(var_list);
3067 		wr_state->attr_pos = (int *) palloc(sizeof(int) * natts);
3068 
3069 		/*
3070 		 * Create a map of attributes required for whole-row reference to
3071 		 * their positions in the result fetched from the foreign server.
3072 		 */
3073 		cnt_attr = 0;
3074 		foreach(lc, var_list)
3075 		{
3076 			Var		   *var = lfirst(lc);
3077 			TargetEntry *tle_sl;
3078 
3079 			Assert(IsA(var, Var) &&var->varno == cnt_rt);
3080 
3081 #if PG_VERSION_NUM >= 100000
3082 			tle_sl = tlist_member((Expr *) var, scan_tlist);
3083 #else
3084 			tle_sl = tlist_member((Node *) var, scan_tlist);
3085 #endif
3086 			Assert(tle_sl);
3087 
3088 			wr_state->attr_pos[cnt_attr++] = tle_sl->resno - 1;
3089 		}
3090 		Assert(natts == cnt_attr);
3091 
3092 		/* Build rest of the state */
3093 		wr_state->tupdesc = ExecTypeFromExprList(var_list);
3094 		Assert(natts == wr_state->tupdesc->natts);
3095 		wr_state->values = (Datum *) palloc(sizeof(Datum) * natts);
3096 		wr_state->nulls = (bool *) palloc(sizeof(bool) * natts);
3097 		BlessTupleDesc(wr_state->tupdesc);
3098 		mysqlwrstates[cnt_rt - 1] = wr_state;
3099 	}
3100 
3101 	/*
3102 	 * Construct the array mapping columns in the ForeignScan node output to
3103 	 * their positions in the result fetched from the foreign server.  Positive
3104 	 * values indicate the locations in the result and negative values
3105 	 * indicate the range table indexes of the base table whose whole-row
3106 	 * reference values are requested in that place.
3107 	 */
3108 	fs_num_atts = list_length(fdw_scan_tlist);
3109 	fs_attr_pos = (int *) palloc(sizeof(int) * fs_num_atts);
3110 	cnt_attr = 0;
3111 	foreach(lc, fdw_scan_tlist)
3112 	{
3113 		TargetEntry *tle_fsl = lfirst(lc);
3114 		Var		   *var = (Var *) tle_fsl->expr;
3115 
3116 		Assert(IsA(var, Var));
3117 		if (var->varattno == 0)
3118 			fs_attr_pos[cnt_attr] = -var->varno;
3119 		else
3120 		{
3121 #if PG_VERSION_NUM >= 100000
3122 			TargetEntry *tle_sl = tlist_member((Expr *) var, scan_tlist);
3123 #else
3124 			TargetEntry *tle_sl = tlist_member((Node *) var, scan_tlist);
3125 #endif
3126 
3127 			Assert(tle_sl);
3128 			fs_attr_pos[cnt_attr] = tle_sl->resno - 1;
3129 		}
3130 		cnt_attr++;
3131 	}
3132 
3133 	/*
3134 	 * The tuple descriptor passed in should have same number of attributes as
3135 	 * the entries in fdw_scan_tlist.
3136 	 */
3137 	Assert(fs_num_atts == tupdesc->natts);
3138 
3139 	festate->mysqlwrstates = mysqlwrstates;
3140 	festate->wr_attrs_pos = fs_attr_pos;
3141 	festate->wr_tupdesc = tupdesc;
3142 	festate->wr_values = (Datum *) palloc(sizeof(Datum) * tupdesc->natts);
3143 	festate->wr_nulls = (bool *) palloc(sizeof(bool) * tupdesc->natts);
3144 
3145 	return;
3146 }
3147 
3148 /*
3149  * mysql_get_tuple_with_whole_row
3150  *		Construct the result row with whole-row references.
3151  */
3152 static HeapTuple
mysql_get_tuple_with_whole_row(MySQLFdwExecState * festate,Datum * values,bool * nulls)3153 mysql_get_tuple_with_whole_row(MySQLFdwExecState *festate, Datum *values,
3154 							   bool *nulls)
3155 {
3156 	TupleDesc	tupdesc = festate->wr_tupdesc;
3157 	Datum	   *wr_values = festate->wr_values;
3158 	bool	   *wr_nulls = festate->wr_nulls;
3159 	int			cnt_attr;
3160 	HeapTuple	tuple = NULL;
3161 
3162 	for (cnt_attr = 0; cnt_attr < tupdesc->natts; cnt_attr++)
3163 	{
3164 		int			attr_pos = festate->wr_attrs_pos[cnt_attr];
3165 
3166 		if (attr_pos >= 0)
3167 		{
3168 			wr_values[cnt_attr] = values[attr_pos];
3169 			wr_nulls[cnt_attr] = nulls[attr_pos];
3170 		}
3171 		else
3172 		{
3173 			/*
3174 			 * The RTI of relation whose whole row reference is to be
3175 			 * constructed is stored as -ve attr_pos.
3176 			 */
3177 			MySQLWRState *wr_state = festate->mysqlwrstates[-attr_pos - 1];
3178 
3179 			wr_nulls[cnt_attr] = nulls[wr_state->wr_null_ind_pos];
3180 			if (!wr_nulls[cnt_attr])
3181 			{
3182 				HeapTuple	wr_tuple = mysql_form_whole_row(wr_state,
3183 															values,
3184 															nulls);
3185 
3186 				wr_values[cnt_attr] = HeapTupleGetDatum(wr_tuple);
3187 			}
3188 		}
3189 	}
3190 
3191 	tuple = heap_form_tuple(tupdesc, wr_values, wr_nulls);
3192 	return tuple;
3193 }
3194 
3195 /*
3196  * mysql_form_whole_row
3197  * 		The function constructs whole-row reference for a base relation
3198  * 		with the information given in wr_state.
3199  *
3200  * wr_state contains the information about which attributes from values and
3201  * nulls are to be used and in which order to construct the whole-row
3202  * reference.
3203  */
3204 static HeapTuple
mysql_form_whole_row(MySQLWRState * wr_state,Datum * values,bool * nulls)3205 mysql_form_whole_row(MySQLWRState *wr_state, Datum *values, bool *nulls)
3206 {
3207 	int			cnt_attr;
3208 
3209 	for (cnt_attr = 0; cnt_attr < wr_state->tupdesc->natts; cnt_attr++)
3210 	{
3211 		int			attr_pos = wr_state->attr_pos[cnt_attr];
3212 
3213 		wr_state->values[cnt_attr] = values[attr_pos];
3214 		wr_state->nulls[cnt_attr] = nulls[attr_pos];
3215 	}
3216 	return heap_form_tuple(wr_state->tupdesc, wr_state->values,
3217 						   wr_state->nulls);
3218 }
3219