1 /*-------------------------------------------------------------------------
2 *
3 * mysql_fdw.c
4 * Foreign-data wrapper for remote MySQL servers
5 *
6 * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
7 * Portions Copyright (c) 2004-2021, EnterpriseDB Corporation.
8 *
9 * IDENTIFICATION
10 * mysql_fdw.c
11 *
12 *-------------------------------------------------------------------------
13 */
14 #include "postgres.h"
15
16 /*
17 * Must be included before mysql.h as it has some conflicting definitions like
18 * list_length, etc.
19 */
20 #include "mysql_fdw.h"
21
22 #include <dlfcn.h>
23 #include <errmsg.h>
24 #include <mysql.h>
25 #include <stdio.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28
29 #include "access/htup_details.h"
30 #include "access/sysattr.h"
31 #include "access/reloptions.h"
32 #if PG_VERSION_NUM >= 120000
33 #include "access/table.h"
34 #endif
35 #include "commands/defrem.h"
36 #include "commands/explain.h"
37 #include "catalog/heap.h"
38 #include "foreign/fdwapi.h"
39 #include "miscadmin.h"
40 #include "mysql_query.h"
41 #include "nodes/makefuncs.h"
42 #include "nodes/nodeFuncs.h"
43 #include "nodes/nodes.h"
44 #include "optimizer/pathnode.h"
45 #include "optimizer/planmain.h"
46 #if PG_VERSION_NUM < 120000
47 #include "optimizer/var.h"
48 #else
49 #include "optimizer/optimizer.h"
50 #endif
51 #include "optimizer/restrictinfo.h"
52 #include "optimizer/tlist.h"
53 #include "parser/parsetree.h"
54 #include "storage/ipc.h"
55 #include "utils/builtins.h"
56 #include "utils/datum.h"
57 #include "utils/guc.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/syscache.h"
61
62 /* Declarations for dynamic loading */
63 PG_MODULE_MAGIC;
64
65 int ((mysql_options) (MYSQL *mysql, enum mysql_option option,
66 const void *arg));
67 int ((mysql_stmt_prepare) (MYSQL_STMT *stmt, const char *query,
68 unsigned long length));
69 int ((mysql_stmt_execute) (MYSQL_STMT *stmt));
70 int ((mysql_stmt_fetch) (MYSQL_STMT *stmt));
71 int ((mysql_query) (MYSQL *mysql, const char *q));
72 bool ((mysql_stmt_attr_set) (MYSQL_STMT *stmt,
73 enum enum_stmt_attr_type attr_type,
74 const void *attr));
75 bool ((mysql_stmt_close) (MYSQL_STMT *stmt));
76 bool ((mysql_stmt_reset) (MYSQL_STMT *stmt));
77 bool ((mysql_free_result) (MYSQL_RES *result));
78 bool ((mysql_stmt_bind_param) (MYSQL_STMT *stmt, MYSQL_BIND *bnd));
79 bool ((mysql_stmt_bind_result) (MYSQL_STMT *stmt, MYSQL_BIND *bnd));
80
81 MYSQL_STMT *((mysql_stmt_init) (MYSQL *mysql));
82 MYSQL_RES *((mysql_stmt_result_metadata) (MYSQL_STMT *stmt));
83 int ((mysql_stmt_store_result) (MYSQL *mysql));
84 MYSQL_ROW((mysql_fetch_row) (MYSQL_RES *result));
85 MYSQL_FIELD *((mysql_fetch_field) (MYSQL_RES *result));
86 MYSQL_FIELD *((mysql_fetch_fields) (MYSQL_RES *result));
87 const char *((mysql_error) (MYSQL *mysql));
88 void ((mysql_close) (MYSQL *sock));
89 MYSQL_RES *((mysql_store_result) (MYSQL *mysql));
90 MYSQL *((mysql_init) (MYSQL *mysql));
91 bool ((mysql_ssl_set) (MYSQL *mysql, const char *key, const char *cert,
92 const char *ca, const char *capath,
93 const char *cipher));
94 MYSQL *((mysql_real_connect) (MYSQL *mysql, const char *host, const char *user,
95 const char *passwd, const char *db,
96 unsigned int port, const char *unix_socket,
97 unsigned long clientflag));
98
99 const char *((mysql_get_host_info) (MYSQL *mysql));
100 const char *((mysql_get_server_info) (MYSQL *mysql));
101 int ((mysql_get_proto_info) (MYSQL *mysql));
102
103 unsigned int ((mysql_stmt_errno) (MYSQL_STMT *stmt));
104 unsigned int ((mysql_errno) (MYSQL *mysql));
105 unsigned int ((mysql_num_fields) (MYSQL_RES *result));
106 unsigned int ((mysql_num_rows) (MYSQL_RES *result));
107
108 #define DEFAULTE_NUM_ROWS 1000
109
110 /*
111 * In PG 9.5.1 the number will be 90501,
112 * our version is 2.6.0 so number will be 20600
113 */
114 #define CODE_VERSION 20600
115
116 /*
117 * Indexes of FDW-private information stored in fdw_private lists.
118 *
119 * These items are indexed with the enum mysqlFdwScanPrivateIndex, so an item
120 * can be fetched with list_nth(). For example, to get the SELECT statement:
121 * sql = strVal(list_nth(fdw_private, mysqlFdwScanPrivateSelectSql));
122 */
123 enum mysqlFdwScanPrivateIndex
124 {
125 /* SQL statement to execute remotely (as a String node) */
126 mysqlFdwScanPrivateSelectSql,
127
128 /* Integer list of attribute numbers retrieved by the SELECT */
129 mysqlFdwScanPrivateRetrievedAttrs,
130
131 /*
132 * String describing join i.e. names of relations being joined and types
133 * of join, added when the scan is join
134 */
135 mysqlFdwScanPrivateRelations,
136
137 /*
138 * List of Var node lists for constructing the whole-row references of
139 * base relations involved in pushed down join.
140 */
141 mysqlFdwPrivateWholeRowLists,
142
143 /*
144 * Targetlist representing the result fetched from the foreign server if
145 * whole-row references are involved.
146 */
147 mysqlFdwPrivateScanTList
148 };
149
150 extern PGDLLEXPORT void _PG_init(void);
151 extern Datum mysql_fdw_handler(PG_FUNCTION_ARGS);
152
153 PG_FUNCTION_INFO_V1(mysql_fdw_handler);
154 PG_FUNCTION_INFO_V1(mysql_fdw_version);
155
156 /*
157 * FDW callback routines
158 */
159 static void mysqlExplainForeignScan(ForeignScanState *node, ExplainState *es);
160 static void mysqlBeginForeignScan(ForeignScanState *node, int eflags);
161 static TupleTableSlot *mysqlIterateForeignScan(ForeignScanState *node);
162 static void mysqlReScanForeignScan(ForeignScanState *node);
163 static void mysqlEndForeignScan(ForeignScanState *node);
164
165 static List *mysqlPlanForeignModify(PlannerInfo *root, ModifyTable *plan,
166 Index resultRelation, int subplan_index);
167 static void mysqlBeginForeignModify(ModifyTableState *mtstate,
168 ResultRelInfo *resultRelInfo,
169 List *fdw_private, int subplan_index,
170 int eflags);
171 static TupleTableSlot *mysqlExecForeignInsert(EState *estate,
172 ResultRelInfo *resultRelInfo,
173 TupleTableSlot *slot,
174 TupleTableSlot *planSlot);
175 static void mysqlAddForeignUpdateTargets(Query *parsetree,
176 RangeTblEntry *target_rte,
177 Relation target_relation);
178 static TupleTableSlot *mysqlExecForeignUpdate(EState *estate,
179 ResultRelInfo *resultRelInfo,
180 TupleTableSlot *slot,
181 TupleTableSlot *planSlot);
182 static TupleTableSlot *mysqlExecForeignDelete(EState *estate,
183 ResultRelInfo *resultRelInfo,
184 TupleTableSlot *slot,
185 TupleTableSlot *planSlot);
186 static void mysqlEndForeignModify(EState *estate,
187 ResultRelInfo *resultRelInfo);
188
189 static void mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
190 Oid foreigntableid);
191 static void mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
192 Oid foreigntableid);
193 static bool mysqlAnalyzeForeignTable(Relation relation,
194 AcquireSampleRowsFunc *func,
195 BlockNumber *totalpages);
196 #if PG_VERSION_NUM >= 90500
197 static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root,
198 RelOptInfo *foreignrel,
199 Oid foreigntableid,
200 ForeignPath *best_path, List *tlist,
201 List *scan_clauses, Plan *outer_plan);
202 #else
203 static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root,
204 RelOptInfo *foreignrel,
205 Oid foreigntableid,
206 ForeignPath *best_path, List *tlist,
207 List *scan_clauses);
208 #endif
209 static void mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel,
210 Cost *startup_cost, Cost *total_cost,
211 Oid foreigntableid);
212 #if PG_VERSION_NUM >= 90600
213 static void mysqlGetForeignJoinPaths(PlannerInfo *root,
214 RelOptInfo *joinrel,
215 RelOptInfo *outerrel,
216 RelOptInfo *innerrel,
217 JoinType jointype,
218 JoinPathExtraData *extra);
219 static bool mysqlRecheckForeignScan(ForeignScanState *node,
220 TupleTableSlot *slot);
221 #endif
222
223 #if PG_VERSION_NUM >= 90500
224 static List *mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt,
225 Oid serverOid);
226 #endif
227
228 #if PG_VERSION_NUM >= 110000
229 static void mysqlBeginForeignInsert(ModifyTableState *mtstate,
230 ResultRelInfo *resultRelInfo);
231 static void mysqlEndForeignInsert(EState *estate,
232 ResultRelInfo *resultRelInfo);
233 #endif
234
235 /*
236 * Helper functions
237 */
238 bool mysql_load_library(void);
239 static void mysql_fdw_exit(int code, Datum arg);
240 static bool mysql_is_column_unique(Oid foreigntableid);
241
242 static void prepare_query_params(PlanState *node,
243 List *fdw_exprs,
244 int numParams,
245 FmgrInfo **param_flinfo,
246 List **param_exprs,
247 const char ***param_values,
248 Oid **param_types);
249
250 static void process_query_params(ExprContext *econtext,
251 FmgrInfo *param_flinfo,
252 List *param_exprs,
253 const char **param_values,
254 MYSQL_BIND **mysql_bind_buf,
255 Oid *param_types);
256
257 static void bind_stmt_params_and_exec(ForeignScanState *node);
258 #if PG_VERSION_NUM >= 90600
259 static bool mysql_foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
260 JoinType jointype, RelOptInfo *outerrel,
261 RelOptInfo *innerrel,
262 JoinPathExtraData *extra);
263 static List *mysql_adjust_whole_row_ref(PlannerInfo *root,
264 List *scan_var_list,
265 List **whole_row_lists,
266 Bitmapset *relids);
267 static List *mysql_build_scan_list_for_baserel(Oid relid, Index varno,
268 Bitmapset *attrs_used,
269 List **retrieved_attrs);
270 #endif
271 static void mysql_build_whole_row_constr_info(MySQLFdwExecState *festate,
272 TupleDesc tupdesc,
273 Bitmapset *relids,
274 int max_relid,
275 List *whole_row_lists,
276 List *scan_tlist,
277 List *fdw_scan_tlist);
278 static HeapTuple mysql_get_tuple_with_whole_row(MySQLFdwExecState *festate,
279 Datum *values,bool *nulls);
280 static HeapTuple mysql_form_whole_row(MySQLWRState *wr_state, Datum *values,
281 bool *nulls);
282
283 void *mysql_dll_handle = NULL;
284 static int wait_timeout = WAIT_TIMEOUT;
285 static int interactive_timeout = INTERACTIVE_TIMEOUT;
286 static void mysql_error_print(MYSQL *conn);
287 static void mysql_stmt_error_print(MySQLFdwExecState *festate,
288 const char *msg);
289 static List *getUpdateTargetAttrs(RangeTblEntry *rte);
290
291 /*
292 * mysql_load_library function dynamically load the mysql's library
293 * libmysqlclient.so. The only reason to load the library using dlopen
294 * is that, mysql and postgres both have function with same name like
295 * "list_delete", "list_delete" and "list_free" which cause compiler
296 * error "duplicate function name" and erroneously linking with a function.
297 * This port of the code is used to avoid the compiler error.
298 *
299 * #define list_delete mysql_list_delete
300 * #include <mysql.h>
301 * #undef list_delete
302 *
303 * But system crashed on function mysql_stmt_close function because
304 * mysql_stmt_close internally calling "list_delete" function which
305 * wrongly binds to postgres' "list_delete" function.
306 *
307 * The dlopen function provides a parameter "RTLD_DEEPBIND" which
308 * solved the binding issue.
309 *
310 * RTLD_DEEPBIND:
311 * Place the lookup scope of the symbols in this library ahead of the
312 * global scope. This means that a self-contained library will use its
313 * own symbols in preference to global symbols with the same name contained
314 * in libraries that have already been loaded.
315 */
316 bool
mysql_load_library(void)317 mysql_load_library(void)
318 {
319 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__FreeBSD__)
320 /*
321 * Mac OS/BSD does not support RTLD_DEEPBIND, but it still works without
322 * the RTLD_DEEPBIND
323 */
324 mysql_dll_handle = dlopen(_MYSQL_LIBNAME, RTLD_LAZY);
325 #else
326 mysql_dll_handle = dlopen(_MYSQL_LIBNAME, RTLD_LAZY | RTLD_DEEPBIND);
327 #endif
328 if (mysql_dll_handle == NULL)
329 return false;
330
331 _mysql_stmt_bind_param = dlsym(mysql_dll_handle, "mysql_stmt_bind_param");
332 _mysql_stmt_bind_result = dlsym(mysql_dll_handle, "mysql_stmt_bind_result");
333 _mysql_stmt_init = dlsym(mysql_dll_handle, "mysql_stmt_init");
334 _mysql_stmt_prepare = dlsym(mysql_dll_handle, "mysql_stmt_prepare");
335 _mysql_stmt_execute = dlsym(mysql_dll_handle, "mysql_stmt_execute");
336 _mysql_stmt_fetch = dlsym(mysql_dll_handle, "mysql_stmt_fetch");
337 _mysql_query = dlsym(mysql_dll_handle, "mysql_query");
338 _mysql_stmt_result_metadata = dlsym(mysql_dll_handle, "mysql_stmt_result_metadata");
339 _mysql_stmt_store_result = dlsym(mysql_dll_handle, "mysql_stmt_store_result");
340 _mysql_fetch_row = dlsym(mysql_dll_handle, "mysql_fetch_row");
341 _mysql_fetch_field = dlsym(mysql_dll_handle, "mysql_fetch_field");
342 _mysql_fetch_fields = dlsym(mysql_dll_handle, "mysql_fetch_fields");
343 _mysql_stmt_close = dlsym(mysql_dll_handle, "mysql_stmt_close");
344 _mysql_stmt_reset = dlsym(mysql_dll_handle, "mysql_stmt_reset");
345 _mysql_free_result = dlsym(mysql_dll_handle, "mysql_free_result");
346 _mysql_error = dlsym(mysql_dll_handle, "mysql_error");
347 _mysql_options = dlsym(mysql_dll_handle, "mysql_options");
348 _mysql_ssl_set = dlsym(mysql_dll_handle, "mysql_ssl_set");
349 _mysql_real_connect = dlsym(mysql_dll_handle, "mysql_real_connect");
350 _mysql_close = dlsym(mysql_dll_handle, "mysql_close");
351 _mysql_init = dlsym(mysql_dll_handle, "mysql_init");
352 _mysql_stmt_attr_set = dlsym(mysql_dll_handle, "mysql_stmt_attr_set");
353 _mysql_store_result = dlsym(mysql_dll_handle, "mysql_store_result");
354 _mysql_stmt_errno = dlsym(mysql_dll_handle, "mysql_stmt_errno");
355 _mysql_errno = dlsym(mysql_dll_handle, "mysql_errno");
356 _mysql_num_fields = dlsym(mysql_dll_handle, "mysql_num_fields");
357 _mysql_num_rows = dlsym(mysql_dll_handle, "mysql_num_rows");
358 _mysql_get_host_info = dlsym(mysql_dll_handle, "mysql_get_host_info");
359 _mysql_get_server_info = dlsym(mysql_dll_handle, "mysql_get_server_info");
360 _mysql_get_proto_info = dlsym(mysql_dll_handle, "mysql_get_proto_info");
361
362 if (_mysql_stmt_bind_param == NULL ||
363 _mysql_stmt_bind_result == NULL ||
364 _mysql_stmt_init == NULL ||
365 _mysql_stmt_prepare == NULL ||
366 _mysql_stmt_execute == NULL ||
367 _mysql_stmt_fetch == NULL ||
368 _mysql_query == NULL ||
369 _mysql_stmt_result_metadata == NULL ||
370 _mysql_stmt_store_result == NULL ||
371 _mysql_fetch_row == NULL ||
372 _mysql_fetch_field == NULL ||
373 _mysql_fetch_fields == NULL ||
374 _mysql_stmt_close == NULL ||
375 _mysql_stmt_reset == NULL ||
376 _mysql_free_result == NULL ||
377 _mysql_error == NULL ||
378 _mysql_options == NULL ||
379 _mysql_ssl_set == NULL ||
380 _mysql_real_connect == NULL ||
381 _mysql_close == NULL ||
382 _mysql_init == NULL ||
383 _mysql_stmt_attr_set == NULL ||
384 _mysql_store_result == NULL ||
385 _mysql_stmt_errno == NULL ||
386 _mysql_errno == NULL ||
387 _mysql_num_fields == NULL ||
388 _mysql_num_rows == NULL ||
389 _mysql_get_host_info == NULL ||
390 _mysql_get_server_info == NULL ||
391 _mysql_get_proto_info == NULL)
392 return false;
393
394 return true;
395 }
396
397 /*
398 * Library load-time initialization, sets on_proc_exit() callback for
399 * backend shutdown.
400 */
401 void
_PG_init(void)402 _PG_init(void)
403 {
404 if (!mysql_load_library())
405 ereport(ERROR,
406 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
407 errmsg("failed to load the mysql query: \n%s", dlerror()),
408 errhint("Export LD_LIBRARY_PATH to locate the library.")));
409
410 DefineCustomIntVariable("mysql_fdw.wait_timeout",
411 "Server-side wait_timeout",
412 "Set the maximum wait_timeout"
413 "use to set the MySQL session timeout",
414 &wait_timeout,
415 WAIT_TIMEOUT,
416 0,
417 INT_MAX,
418 PGC_USERSET,
419 0,
420 NULL,
421 NULL,
422 NULL);
423
424 DefineCustomIntVariable("mysql_fdw.interactive_timeout",
425 "Server-side interactive timeout",
426 "Set the maximum interactive timeout"
427 "use to set the MySQL session timeout",
428 &interactive_timeout,
429 INTERACTIVE_TIMEOUT,
430 0,
431 INT_MAX,
432 PGC_USERSET,
433 0,
434 NULL,
435 NULL,
436 NULL);
437
438 on_proc_exit(&mysql_fdw_exit, PointerGetDatum(NULL));
439 }
440
441 /*
442 * mysql_fdw_exit
443 * Exit callback function.
444 */
445 static void
mysql_fdw_exit(int code,Datum arg)446 mysql_fdw_exit(int code, Datum arg)
447 {
448 mysql_cleanup_connection();
449 }
450
451 /*
452 * Foreign-data wrapper handler function: return
453 * a struct with pointers to my callback routines.
454 */
455 Datum
mysql_fdw_handler(PG_FUNCTION_ARGS)456 mysql_fdw_handler(PG_FUNCTION_ARGS)
457 {
458 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
459
460 /* Functions for scanning foreign tables */
461 fdwroutine->GetForeignRelSize = mysqlGetForeignRelSize;
462 fdwroutine->GetForeignPaths = mysqlGetForeignPaths;
463 fdwroutine->GetForeignPlan = mysqlGetForeignPlan;
464 fdwroutine->BeginForeignScan = mysqlBeginForeignScan;
465 fdwroutine->IterateForeignScan = mysqlIterateForeignScan;
466 fdwroutine->ReScanForeignScan = mysqlReScanForeignScan;
467 fdwroutine->EndForeignScan = mysqlEndForeignScan;
468
469 /* Functions for updating foreign tables */
470 fdwroutine->AddForeignUpdateTargets = mysqlAddForeignUpdateTargets;
471 fdwroutine->PlanForeignModify = mysqlPlanForeignModify;
472 fdwroutine->BeginForeignModify = mysqlBeginForeignModify;
473 fdwroutine->ExecForeignInsert = mysqlExecForeignInsert;
474 fdwroutine->ExecForeignUpdate = mysqlExecForeignUpdate;
475 fdwroutine->ExecForeignDelete = mysqlExecForeignDelete;
476 fdwroutine->EndForeignModify = mysqlEndForeignModify;
477
478 /* Function for EvalPlanQual rechecks */
479 #if PG_VERSION_NUM >= 90600
480 fdwroutine->RecheckForeignScan = mysqlRecheckForeignScan;
481 #endif
482
483 /* Support functions for EXPLAIN */
484 fdwroutine->ExplainForeignScan = mysqlExplainForeignScan;
485
486 /* Support functions for ANALYZE */
487 fdwroutine->AnalyzeForeignTable = mysqlAnalyzeForeignTable;
488
489 /* Support functions for IMPORT FOREIGN SCHEMA */
490 #if PG_VERSION_NUM >= 90500
491 fdwroutine->ImportForeignSchema = mysqlImportForeignSchema;
492 #endif
493
494 #if PG_VERSION_NUM >= 110000
495 /* Partition routing and/or COPY from */
496 fdwroutine->BeginForeignInsert = mysqlBeginForeignInsert;
497 fdwroutine->EndForeignInsert = mysqlEndForeignInsert;
498 #endif
499
500 #if PG_VERSION_NUM >= 90600
501 /* Support functions for join push-down */
502 fdwroutine->GetForeignJoinPaths = mysqlGetForeignJoinPaths;
503 #endif
504
505 PG_RETURN_POINTER(fdwroutine);
506 }
507
508 /*
509 * mysqlBeginForeignScan
510 * Initiate access to the database
511 */
512 static void
mysqlBeginForeignScan(ForeignScanState * node,int eflags)513 mysqlBeginForeignScan(ForeignScanState *node, int eflags)
514 {
515 TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
516 TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor;
517 MYSQL *conn;
518 RangeTblEntry *rte;
519 MySQLFdwExecState *festate;
520 EState *estate = node->ss.ps.state;
521 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
522 mysql_opt *options;
523 ListCell *lc;
524 int atindex = 0;
525 unsigned long prefetch_rows = MYSQL_PREFETCH_ROWS;
526 unsigned long type = (unsigned long) CURSOR_TYPE_READ_ONLY;
527 Oid userid;
528 ForeignServer *server;
529 UserMapping *user;
530 ForeignTable *table;
531 char timeout[255];
532 int numParams;
533 int rtindex;
534 List *fdw_private = fsplan->fdw_private;
535
536 /*
537 * We'll save private state in node->fdw_state.
538 */
539 festate = (MySQLFdwExecState *) palloc(sizeof(MySQLFdwExecState));
540 node->fdw_state = (void *) festate;
541
542 /*
543 * If whole-row references are involved in pushed down join extract the
544 * information required to construct those.
545 */
546 if (list_length(fdw_private) >= mysqlFdwPrivateScanTList)
547 {
548 List *whole_row_lists = list_nth(fdw_private,
549 mysqlFdwPrivateWholeRowLists);
550 List *scan_tlist = list_nth(fdw_private,
551 mysqlFdwPrivateScanTList);
552 #if PG_VERSION_NUM >= 120000
553 TupleDesc scan_tupdesc = ExecTypeFromTL(scan_tlist);
554 #else
555 TupleDesc scan_tupdesc = ExecTypeFromTL(scan_tlist, false);
556 #endif
557
558 mysql_build_whole_row_constr_info(festate, tupleDescriptor,
559 fsplan->fs_relids,
560 list_length(node->ss.ps.state->es_range_table),
561 whole_row_lists, scan_tlist,
562 fsplan->fdw_scan_tlist);
563
564 /* Change tuple descriptor to match the result from foreign server. */
565 tupleDescriptor = scan_tupdesc;
566 }
567
568 /*
569 * Identify which user to do the remote access as. This should match what
570 * ExecCheckRTEPerms() does. In case of a join use the lowest-numbered
571 * member RTE as a representative; we would get the same result from any.
572 */
573 if (fsplan->scan.scanrelid > 0)
574 rtindex = fsplan->scan.scanrelid;
575 else
576 rtindex = bms_next_member(fsplan->fs_relids, -1);
577 rte = rt_fetch(rtindex, estate->es_range_table);
578 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
579
580 /* Get info about foreign table. */
581 table = GetForeignTable(rte->relid);
582 server = GetForeignServer(table->serverid);
583 user = GetUserMapping(userid, server->serverid);
584
585 /* Fetch the options */
586 options = mysql_get_options(rte->relid, true);
587
588 /*
589 * Get the already connected connection, otherwise connect and get the
590 * connection handle.
591 */
592 conn = mysql_get_connection(server, user, options);
593
594 /* Stash away the state info we have already */
595 festate->query = strVal(list_nth(fsplan->fdw_private,
596 mysqlFdwScanPrivateSelectSql));
597 festate->retrieved_attrs = list_nth(fsplan->fdw_private,
598 mysqlFdwScanPrivateRetrievedAttrs);
599 festate->conn = conn;
600 festate->query_executed = false;
601 festate->attinmeta = TupleDescGetAttInMetadata(tupleDescriptor);
602
603 #if PG_VERSION_NUM >= 110000
604 festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
605 "mysql_fdw temporary data",
606 ALLOCSET_DEFAULT_SIZES);
607 #else
608 festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
609 "mysql_fdw temporary data",
610 ALLOCSET_SMALL_MINSIZE,
611 ALLOCSET_SMALL_INITSIZE,
612 ALLOCSET_SMALL_MAXSIZE);
613 #endif
614
615 if (wait_timeout > 0)
616 {
617 /* Set the session timeout in seconds */
618 sprintf(timeout, "SET wait_timeout = %d", wait_timeout);
619 mysql_query(festate->conn, timeout);
620 }
621
622 if (interactive_timeout > 0)
623 {
624 /* Set the session timeout in seconds */
625 sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout);
626 mysql_query(festate->conn, timeout);
627 }
628
629 mysql_query(festate->conn, "SET sql_mode='ANSI_QUOTES'");
630
631 /* Initialize the MySQL statement */
632 festate->stmt = mysql_stmt_init(festate->conn);
633 if (festate->stmt == NULL)
634 ereport(ERROR,
635 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
636 errmsg("failed to initialize the mysql query: \n%s",
637 mysql_error(festate->conn))));
638
639 /* Prepare MySQL statement */
640 if (mysql_stmt_prepare(festate->stmt, festate->query,
641 strlen(festate->query)) != 0)
642 mysql_stmt_error_print(festate, "failed to prepare the MySQL query");
643
644 /* Prepare for output conversion of parameters used in remote query. */
645 numParams = list_length(fsplan->fdw_exprs);
646 festate->numParams = numParams;
647 if (numParams > 0)
648 prepare_query_params((PlanState *) node,
649 fsplan->fdw_exprs,
650 numParams,
651 &festate->param_flinfo,
652 &festate->param_exprs,
653 &festate->param_values,
654 &festate->param_types);
655
656 /* int column_count = mysql_num_fields(festate->meta); */
657
658 /* Set the statement as cursor type */
659 mysql_stmt_attr_set(festate->stmt, STMT_ATTR_CURSOR_TYPE, (void *) &type);
660
661 /* Set the pre-fetch rows */
662 mysql_stmt_attr_set(festate->stmt, STMT_ATTR_PREFETCH_ROWS,
663 (void *) &prefetch_rows);
664
665 festate->table = (mysql_table *) palloc0(sizeof(mysql_table));
666 festate->table->column = (mysql_column *) palloc0(sizeof(mysql_column) * tupleDescriptor->natts);
667 festate->table->mysql_bind = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * tupleDescriptor->natts);
668
669 festate->table->mysql_res = mysql_stmt_result_metadata(festate->stmt);
670 if (NULL == festate->table->mysql_res)
671 ereport(ERROR,
672 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
673 errmsg("failed to retrieve query result set metadata: \n%s",
674 mysql_error(festate->conn))));
675
676 festate->table->mysql_fields = mysql_fetch_fields(festate->table->mysql_res);
677
678 foreach(lc, festate->retrieved_attrs)
679 {
680 int attnum = lfirst_int(lc) - 1;
681 Oid pgtype = TupleDescAttr(tupleDescriptor, attnum)->atttypid;
682 int32 pgtypmod = TupleDescAttr(tupleDescriptor, attnum)->atttypmod;
683
684 if (TupleDescAttr(tupleDescriptor, attnum)->attisdropped)
685 continue;
686
687 festate->table->column[atindex].mysql_bind = &festate->table->mysql_bind[atindex];
688
689 mysql_bind_result(pgtype, pgtypmod,
690 &festate->table->mysql_fields[atindex],
691 &festate->table->column[atindex]);
692 atindex++;
693 }
694
695 /* Bind the results pointers for the prepare statements */
696 if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0)
697 mysql_stmt_error_print(festate, "failed to bind the MySQL query");
698 }
699
700 /*
701 * mysqlIterateForeignScan
702 * Iterate and get the rows one by one from MySQL and placed in tuple
703 * slot
704 */
705 static TupleTableSlot *
mysqlIterateForeignScan(ForeignScanState * node)706 mysqlIterateForeignScan(ForeignScanState *node)
707 {
708 MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
709 TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
710 int attid;
711 ListCell *lc;
712 int rc = 0;
713 Datum *dvalues;
714 bool *nulls;
715 int natts;
716 AttInMetadata *attinmeta = festate->attinmeta;
717 HeapTuple tup;
718 int i;
719 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
720 List *fdw_private = fsplan->fdw_private;
721
722 natts = attinmeta->tupdesc->natts;
723
724 dvalues = palloc0(natts * sizeof(Datum));
725 nulls = palloc(natts * sizeof(bool));
726 /* Initialize to nulls for any columns not present in result */
727 memset(nulls, true, natts * sizeof(bool));
728
729 ExecClearTuple(tupleSlot);
730
731 /*
732 * If this is the first call after Begin or ReScan, we need to bind the
733 * params and execute the query.
734 */
735 if (!festate->query_executed)
736 bind_stmt_params_and_exec(node);
737
738 attid = 0;
739 rc = mysql_stmt_fetch(festate->stmt);
740 if (rc == 0)
741 {
742 foreach(lc, festate->retrieved_attrs)
743 {
744 int attnum = lfirst_int(lc) - 1;
745 Oid pgtype = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypid;
746 int32 pgtypmod = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypmod;
747
748 nulls[attnum] = festate->table->column[attid].is_null;
749 if (!festate->table->column[attid].is_null)
750 dvalues[attnum] = mysql_convert_to_pg(pgtype, pgtypmod,
751 &festate->table->column[attid]);
752
753 attid++;
754 }
755
756 ExecClearTuple(tupleSlot);
757
758 if (list_length(fdw_private) >= mysqlFdwPrivateScanTList)
759 {
760 /* Construct tuple with whole-row references. */
761 tup = mysql_get_tuple_with_whole_row(festate, dvalues, nulls);
762 }
763 else
764 {
765 /* Form the Tuple using Datums */
766 tup = heap_form_tuple(attinmeta->tupdesc, dvalues, nulls);
767 }
768
769 if (tup)
770 #if PG_VERSION_NUM >= 120000
771 ExecStoreHeapTuple(tup, tupleSlot, false);
772 #else
773 ExecStoreTuple(tup, tupleSlot, InvalidBuffer, false);
774 #endif
775 else
776 mysql_stmt_close(festate->stmt);
777
778 /*
779 * Release locally palloc'd space and values of pass-by-reference
780 * datums, as well.
781 */
782 for (i = 0; i < natts; i++)
783 {
784 if (dvalues[i] && !TupleDescAttr(attinmeta->tupdesc, i)->attbyval)
785 pfree(DatumGetPointer(dvalues[i]));
786 }
787 pfree(dvalues);
788 pfree(nulls);
789 }
790 else if (rc == 1)
791 {
792 /*
793 * Error occurred. Error code and message can be obtained by calling
794 * mysql_stmt_errno() and mysql_stmt_error().
795 */
796 }
797 else if (rc == MYSQL_NO_DATA)
798 {
799 /*
800 * No more rows/data exists
801 */
802 }
803 else if (rc == MYSQL_DATA_TRUNCATED)
804 {
805 /* Data truncation occurred */
806 /*
807 * MYSQL_DATA_TRUNCATED is returned when truncation reporting is
808 * enabled. To determine which column values were truncated when this
809 * value is returned, check the error members of the MYSQL_BIND
810 * structures used for fetching values. Truncation reporting is
811 * enabled by default, but can be controlled by calling
812 * mysql_options() with the MYSQL_REPORT_DATA_TRUNCATION option.
813 */
814 }
815
816 return tupleSlot;
817 }
818
819
820 /*
821 * mysqlExplainForeignScan
822 * Produce extra output for EXPLAIN
823 */
824 static void
mysqlExplainForeignScan(ForeignScanState * node,ExplainState * es)825 mysqlExplainForeignScan(ForeignScanState *node, ExplainState *es)
826 {
827 MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
828 RangeTblEntry *rte;
829 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
830 int rtindex;
831 EState *estate = node->ss.ps.state;
832 List *fdw_private = fsplan->fdw_private;
833
834 if (fsplan->scan.scanrelid > 0)
835 rtindex = fsplan->scan.scanrelid;
836 else
837 rtindex = bms_next_member(fsplan->fs_relids, -1);
838 rte = rt_fetch(rtindex, estate->es_range_table);
839
840 if (list_length(fdw_private) > mysqlFdwScanPrivateRelations)
841 {
842 char *relations = strVal(list_nth(fdw_private,
843 mysqlFdwScanPrivateRelations));
844
845 ExplainPropertyText("Relations", relations, es);
846 }
847
848 /* Give some possibly useful info about startup costs */
849 if (es->costs)
850 {
851 mysql_opt *options = mysql_get_options(rte->relid, true);
852
853 if (strcmp(options->svr_address, "127.0.0.1") == 0 ||
854 strcmp(options->svr_address, "localhost") == 0)
855 #if PG_VERSION_NUM >= 110000
856 ExplainPropertyInteger("Local server startup cost", NULL, 10, es);
857 #else
858 ExplainPropertyLong("Local server startup cost", 10, es);
859 #endif
860 else
861 #if PG_VERSION_NUM >= 110000
862 ExplainPropertyInteger("Remote server startup cost", NULL, 25, es);
863 #else
864 ExplainPropertyLong("Remote server startup cost", 25, es);
865 #endif
866 }
867
868 /* Show the remote query in verbose mode */
869 if (es->verbose)
870 ExplainPropertyText("Remote query", festate->query, es);
871 }
872
873 /*
874 * mysqlEndForeignScan
875 * Finish scanning foreign table and dispose objects used for this scan
876 */
877 static void
mysqlEndForeignScan(ForeignScanState * node)878 mysqlEndForeignScan(ForeignScanState *node)
879 {
880 MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
881
882 if (festate->table && festate->table->mysql_res)
883 {
884 mysql_free_result(festate->table->mysql_res);
885 festate->table->mysql_res = NULL;
886 }
887
888 if (festate->stmt)
889 {
890 mysql_stmt_close(festate->stmt);
891 festate->stmt = NULL;
892 }
893 }
894
895 /*
896 * mysqlReScanForeignScan
897 * Rescan table, possibly with new parameters
898 */
899 static void
mysqlReScanForeignScan(ForeignScanState * node)900 mysqlReScanForeignScan(ForeignScanState *node)
901 {
902 MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
903
904 /*
905 * Set the query_executed flag to false so that the query will be executed
906 * in mysqlIterateForeignScan().
907 */
908 festate->query_executed = false;
909
910 }
911
912 /*
913 * mysqlGetForeignRelSize
914 * Create a FdwPlan for a scan on the foreign table
915 */
916 static void
mysqlGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)917 mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
918 Oid foreigntableid)
919 {
920 double rows = 0;
921 double filtered = 0;
922 MYSQL *conn;
923 MYSQL_ROW row;
924 Bitmapset *attrs_used = NULL;
925 mysql_opt *options;
926 Oid userid = GetUserId();
927 ForeignServer *server;
928 UserMapping *user;
929 ForeignTable *table;
930 MySQLFdwRelationInfo *fpinfo;
931 ListCell *lc;
932 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
933 const char *database;
934 const char *relname;
935 const char *refname;
936
937 fpinfo = (MySQLFdwRelationInfo *) palloc0(sizeof(MySQLFdwRelationInfo));
938 baserel->fdw_private = (void *) fpinfo;
939
940 /* Base foreign tables need to be push down always. */
941 fpinfo->pushdown_safe = true;
942
943 table = GetForeignTable(foreigntableid);
944 server = GetForeignServer(table->serverid);
945 user = GetUserMapping(userid, server->serverid);
946
947 /* Fetch options */
948 options = mysql_get_options(foreigntableid, true);
949
950 /* Connect to the server */
951 conn = mysql_get_connection(server, user, options);
952
953 mysql_query(conn, "SET sql_mode='ANSI_QUOTES'");
954
955 #if PG_VERSION_NUM >= 90600
956 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
957 &attrs_used);
958 #else
959 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
960 &attrs_used);
961 #endif
962
963 foreach(lc, baserel->baserestrictinfo)
964 {
965 RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
966
967 if (mysql_is_foreign_expr(root, baserel, ri->clause, false))
968 fpinfo->remote_conds = lappend(fpinfo->remote_conds, ri);
969 else
970 fpinfo->local_conds = lappend(fpinfo->local_conds, ri);
971 }
972
973 #if PG_VERSION_NUM >= 90600
974 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
975 &fpinfo->attrs_used);
976 #else
977 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
978 &fpinfo->attrs_used);
979 #endif
980
981 foreach(lc, fpinfo->local_conds)
982 {
983 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
984
985 pull_varattnos((Node *) rinfo->clause, baserel->relid,
986 &fpinfo->attrs_used);
987 }
988
989 if (options->use_remote_estimate)
990 {
991 StringInfoData sql;
992 MYSQL_RES *result = NULL;
993 List *retrieved_attrs = NULL;
994 List *params_list = NULL;
995
996 initStringInfo(&sql);
997 appendStringInfo(&sql, "EXPLAIN ");
998
999 mysql_deparse_select_stmt_for_rel(&sql, root, baserel, NULL,
1000 fpinfo->remote_conds,
1001 &retrieved_attrs, ¶ms_list);
1002
1003 if (mysql_query(conn, sql.data) != 0)
1004 mysql_error_print(conn);
1005
1006 result = mysql_store_result(conn);
1007 if (result)
1008 {
1009 int num_fields;
1010
1011 /*
1012 * MySQL provide numbers of rows per table invole in the statement,
1013 * but we don't have problem with it because we are sending
1014 * separate query per table in FDW.
1015 */
1016 row = mysql_fetch_row(result);
1017 num_fields = mysql_num_fields(result);
1018 if (row)
1019 {
1020 MYSQL_FIELD *field;
1021 int i;
1022
1023 for (i = 0; i < num_fields; i++)
1024 {
1025 field = mysql_fetch_field(result);
1026 if (!row[i])
1027 continue;
1028 else if (strcmp(field->name, "rows") == 0)
1029 rows = atof(row[i]);
1030 else if (strcmp(field->name, "filtered") == 0)
1031 filtered = atof(row[i]);
1032 }
1033 }
1034 mysql_free_result(result);
1035 }
1036 }
1037 if (rows > 0)
1038 rows = ((rows + 1) * filtered) / 100;
1039 else
1040 rows = DEFAULTE_NUM_ROWS;
1041
1042 baserel->rows = rows;
1043 baserel->tuples = rows;
1044
1045 /*
1046 * Set the name of relation in fpinfo, while we are constructing it here.
1047 * It will be used to build the string describing the join relation in
1048 * EXPLAIN output. We can't know whether VERBOSE option is specified or
1049 * not, so always schema-qualify the foreign table name.
1050 */
1051 fpinfo->relation_name = makeStringInfo();
1052 database = options->svr_database;
1053 relname = get_rel_name(foreigntableid);
1054 refname = rte->eref->aliasname;
1055 appendStringInfo(fpinfo->relation_name, "%s.%s",
1056 quote_identifier(database), quote_identifier(relname));
1057 if (*refname && strcmp(refname, relname) != 0)
1058 appendStringInfo(fpinfo->relation_name, " %s",
1059 quote_identifier(rte->eref->aliasname));
1060 }
1061
1062 static bool
mysql_is_column_unique(Oid foreigntableid)1063 mysql_is_column_unique(Oid foreigntableid)
1064 {
1065 StringInfoData sql;
1066 MYSQL *conn;
1067 MYSQL_RES *result;
1068 mysql_opt *options;
1069 Oid userid = GetUserId();
1070 ForeignServer *server;
1071 UserMapping *user;
1072 ForeignTable *table;
1073
1074 table = GetForeignTable(foreigntableid);
1075 server = GetForeignServer(table->serverid);
1076 user = GetUserMapping(userid, server->serverid);
1077
1078 /* Fetch the options */
1079 options = mysql_get_options(foreigntableid, true);
1080
1081 /* Connect to the server */
1082 conn = mysql_get_connection(server, user, options);
1083
1084 /* Build the query */
1085 initStringInfo(&sql);
1086
1087 /*
1088 * Construct the query by prefixing the database name so that it can lookup
1089 * in correct database.
1090 */
1091 appendStringInfo(&sql, "EXPLAIN %s.%s", options->svr_database,
1092 options->svr_table);
1093 if (mysql_query(conn, sql.data) != 0)
1094 mysql_error_print(conn);
1095
1096 result = mysql_store_result(conn);
1097 if (result)
1098 {
1099 int num_fields = mysql_num_fields(result);
1100 MYSQL_ROW row;
1101
1102 row = mysql_fetch_row(result);
1103 if (row && num_fields > 3)
1104 {
1105 if ((strcmp(row[3], "PRI") == 0) || (strcmp(row[3], "UNI")) == 0)
1106 {
1107 mysql_free_result(result);
1108 return true;
1109 }
1110 }
1111 mysql_free_result(result);
1112 }
1113
1114 return false;
1115 }
1116
1117 /*
1118 * mysqlEstimateCosts
1119 * Estimate the remote query cost
1120 */
1121 static void
mysqlEstimateCosts(PlannerInfo * root,RelOptInfo * baserel,Cost * startup_cost,Cost * total_cost,Oid foreigntableid)1122 mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Cost *startup_cost,
1123 Cost *total_cost, Oid foreigntableid)
1124 {
1125 mysql_opt *options;
1126
1127 /* Fetch options */
1128 options = mysql_get_options(foreigntableid, true);
1129
1130 /* Local databases are probably faster */
1131 if (strcmp(options->svr_address, "127.0.0.1") == 0 ||
1132 strcmp(options->svr_address, "localhost") == 0)
1133 *startup_cost = 10;
1134 else
1135 *startup_cost = 25;
1136
1137 *total_cost = baserel->rows + *startup_cost;
1138 }
1139
1140 /*
1141 * mysqlGetForeignPaths
1142 * Get the foreign paths
1143 */
1144 static void
mysqlGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)1145 mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
1146 Oid foreigntableid)
1147 {
1148 Cost startup_cost;
1149 Cost total_cost;
1150
1151 /* Estimate costs */
1152 mysqlEstimateCosts(root, baserel, &startup_cost, &total_cost,
1153 foreigntableid);
1154
1155 /* Create a ForeignPath node and add it as only possible path */
1156 add_path(baserel, (Path *)
1157 create_foreignscan_path(root, baserel,
1158 #if PG_VERSION_NUM >= 90600
1159 NULL, /* default pathtarget */
1160 #endif
1161 baserel->rows,
1162 startup_cost,
1163 total_cost,
1164 NIL, /* no pathkeys */
1165 baserel->lateral_relids,
1166 #if PG_VERSION_NUM >= 90500
1167 NULL, /* no extra plan */
1168 #endif
1169 NULL)); /* no fdw_private data */
1170 }
1171
1172
1173 /*
1174 * mysqlGetForeignPlan
1175 * Get a foreign scan plan node
1176 */
1177 #if PG_VERSION_NUM >= 90500
1178 static ForeignScan *
mysqlGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1179 mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel,
1180 Oid foreigntableid, ForeignPath *best_path,
1181 List *tlist, List *scan_clauses, Plan *outer_plan)
1182 #else
1183 static ForeignScan *
1184 mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel,
1185 Oid foreigntableid, ForeignPath *best_path,
1186 List *tlist, List *scan_clauses)
1187 #endif
1188 {
1189 MySQLFdwRelationInfo *fpinfo = (MySQLFdwRelationInfo *) foreignrel->fdw_private;
1190 Index scan_relid;
1191 List *fdw_private;
1192 List *local_exprs = NIL;
1193 List *remote_exprs = NIL;
1194 List *params_list = NIL;
1195 List *remote_conds = NIL;
1196 StringInfoData sql;
1197 List *retrieved_attrs;
1198 ListCell *lc;
1199 List *scan_var_list;
1200 List *fdw_scan_tlist = NIL;
1201 List *whole_row_lists = NIL;
1202
1203 if (foreignrel->reloptkind == RELOPT_BASEREL ||
1204 foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1205 scan_relid = foreignrel->relid;
1206 else
1207 {
1208 scan_relid = 0;
1209 Assert(!scan_clauses);
1210
1211 remote_conds = fpinfo->remote_conds;
1212 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1213 }
1214
1215 /*
1216 * Separate the scan_clauses into those that can be executed remotely and
1217 * those that can't. baserestrictinfo clauses that were previously
1218 * determined to be safe or unsafe are shown in fpinfo->remote_conds and
1219 * fpinfo->local_conds. Anything else in the scan_clauses list will be
1220 * a join clause, which we have to check for remote-safety.
1221 *
1222 * This code must match "extract_actual_clauses(scan_clauses, false)"
1223 * except for the additional decision about remote versus local execution.
1224 * Note however that we only strip the RestrictInfo nodes from the
1225 * local_exprs list, since appendWhereClause expects a list of
1226 * RestrictInfos.
1227 */
1228 foreach(lc, scan_clauses)
1229 {
1230 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1231
1232 Assert(IsA(rinfo, RestrictInfo));
1233
1234 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1235 if (rinfo->pseudoconstant)
1236 continue;
1237
1238 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1239 {
1240 remote_conds = lappend(remote_conds, rinfo);
1241 remote_exprs = lappend(remote_exprs, rinfo->clause);
1242 }
1243 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1244 local_exprs = lappend(local_exprs, rinfo->clause);
1245 else if (mysql_is_foreign_expr(root, foreignrel, rinfo->clause, false))
1246 {
1247 remote_conds = lappend(remote_conds, rinfo);
1248 remote_exprs = lappend(remote_exprs, rinfo->clause);
1249 }
1250 else
1251 local_exprs = lappend(local_exprs, rinfo->clause);
1252 }
1253
1254 #if PG_VERSION_NUM >= 90600
1255 scan_var_list = pull_var_clause((Node *) foreignrel->reltarget->exprs,
1256 PVC_RECURSE_PLACEHOLDERS);
1257 #else
1258 scan_var_list = pull_var_clause((Node *) foreignrel->reltargetlist,
1259 PVC_RECURSE_AGGREGATES,
1260 PVC_RECURSE_PLACEHOLDERS);
1261 #endif
1262
1263 /* System attributes are not allowed. */
1264 foreach(lc, scan_var_list)
1265 {
1266 Var *var = lfirst(lc);
1267 const FormData_pg_attribute *attr;
1268
1269 Assert(IsA(var, Var));
1270
1271 if (var->varattno >= 0)
1272 continue;
1273
1274 #if PG_VERSION_NUM >= 120000
1275 attr = SystemAttributeDefinition(var->varattno);
1276 #else
1277 attr = SystemAttributeDefinition(var->varattno, false);
1278 #endif
1279 ereport(ERROR,
1280 (errcode(ERRCODE_FDW_COLUMN_NAME_NOT_FOUND),
1281 errmsg("system attribute \"%s\" can't be fetched from remote relation",
1282 attr->attname.data)));
1283 }
1284
1285 #if PG_VERSION_NUM >= 90600
1286 #if PG_VERSION_NUM >= 100000
1287 if (IS_JOIN_REL(foreignrel))
1288 #else
1289 if (foreignrel->reloptkind == RELOPT_JOINREL)
1290 #endif
1291 {
1292 scan_var_list = list_concat_unique(NIL, scan_var_list);
1293
1294 scan_var_list = list_concat_unique(scan_var_list,
1295 pull_var_clause((Node *) local_exprs,
1296 PVC_RECURSE_PLACEHOLDERS));
1297
1298 /*
1299 * For join relations, planner needs targetlist, which represents the
1300 * output of ForeignScan node. Prepare this before we modify
1301 * scan_var_list to include Vars required by whole row references, if
1302 * any. Note that base foreign scan constructs the whole-row reference
1303 * at the time of projection. Joins are required to get them from the
1304 * underlying base relations. For a pushed down join the underlying
1305 * relations do not exist, hence the whole-row references need to be
1306 * constructed separately.
1307 */
1308 fdw_scan_tlist = add_to_flat_tlist(NIL, scan_var_list);
1309
1310 /*
1311 * MySQL does not allow row value constructors to be part of SELECT
1312 * list. Hence, whole row reference in join relations need to be
1313 * constructed by combining all the attributes of required base
1314 * relations into a tuple after fetching the result from the foreign
1315 * server. So adjust the targetlist to include all attributes for
1316 * required base relations. The function also returns list of Var node
1317 * lists required to construct the whole-row references of the
1318 * involved relations.
1319 */
1320 scan_var_list = mysql_adjust_whole_row_ref(root, scan_var_list,
1321 &whole_row_lists,
1322 foreignrel->relids);
1323
1324 if (outer_plan)
1325 {
1326 ListCell *lc;
1327
1328 foreach(lc, local_exprs)
1329 {
1330 Node *qual = lfirst(lc);
1331
1332 outer_plan->qual = list_delete(outer_plan->qual, qual);
1333
1334 /*
1335 * For an inner join the local conditions of foreign scan plan
1336 * can be part of the joinquals as well. (They might also be
1337 * in the mergequals or hashquals, but we can't touch those
1338 * without breaking the plan.)
1339 */
1340 if (IsA(outer_plan, NestLoop) ||
1341 IsA(outer_plan, MergeJoin) ||
1342 IsA(outer_plan, HashJoin))
1343 {
1344 Join *join_plan = (Join *) outer_plan;
1345
1346 if (join_plan->jointype == JOIN_INNER)
1347 join_plan->joinqual = list_delete(join_plan->joinqual,
1348 qual);
1349 }
1350 }
1351 }
1352 }
1353 #endif
1354
1355 /*
1356 * Build the query string to be sent for execution, and identify
1357 * expressions to be sent as parameters.
1358 */
1359 initStringInfo(&sql);
1360 mysql_deparse_select_stmt_for_rel(&sql, root, foreignrel, scan_var_list,
1361 remote_conds, &retrieved_attrs,
1362 ¶ms_list);
1363
1364 if (foreignrel->relid == root->parse->resultRelation &&
1365 (root->parse->commandType == CMD_UPDATE ||
1366 root->parse->commandType == CMD_DELETE))
1367 {
1368 /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
1369 appendStringInfoString(&sql, " FOR UPDATE");
1370 }
1371
1372 /*
1373 * Build the fdw_private list that will be available to the executor.
1374 * Items in the list must match enum FdwScanPrivateIndex, above.
1375 */
1376
1377 fdw_private = list_make2(makeString(sql.data), retrieved_attrs);
1378 #if PG_VERSION_NUM >= 100000
1379 if (IS_JOIN_REL(foreignrel))
1380 #else
1381 if (foreignrel->reloptkind == RELOPT_JOINREL)
1382 #endif
1383 {
1384 fdw_private = lappend(fdw_private,
1385 makeString(fpinfo->relation_name->data));
1386
1387 /*
1388 * To construct whole row references we need:
1389 *
1390 * 1. The lists of Var nodes required for whole-row references of
1391 * joining relations
1392 * 2. targetlist corresponding the result expected from the foreign
1393 * server.
1394 */
1395 if (whole_row_lists)
1396 {
1397 fdw_private = lappend(fdw_private, whole_row_lists);
1398 fdw_private = lappend(fdw_private,
1399 add_to_flat_tlist(NIL, scan_var_list));
1400 }
1401 }
1402
1403 /*
1404 * Create the ForeignScan node from target list, local filtering
1405 * expressions, remote parameter expressions, and FDW private information.
1406 *
1407 * Note that the remote parameter expressions are stored in the fdw_exprs
1408 * field of the finished plan node; we can't keep them in private state
1409 * because then they wouldn't be subject to later planner processing.
1410 */
1411 #if PG_VERSION_NUM >= 90500
1412 return make_foreignscan(tlist, local_exprs, scan_relid, params_list,
1413 fdw_private, fdw_scan_tlist, NIL, outer_plan);
1414 #else
1415 return make_foreignscan(tlist, local_exprs, scan_relid, params_list,
1416 fdw_private);
1417 #endif
1418 }
1419
1420 /*
1421 * mysqlAnalyzeForeignTable
1422 * Implement stats collection
1423 */
1424 static bool
mysqlAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)1425 mysqlAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func,
1426 BlockNumber *totalpages)
1427 {
1428 StringInfoData sql;
1429 double table_size = 0;
1430 MYSQL *conn;
1431 MYSQL_RES *result;
1432 Oid foreignTableId = RelationGetRelid(relation);
1433 mysql_opt *options;
1434 ForeignServer *server;
1435 UserMapping *user;
1436 ForeignTable *table;
1437
1438 table = GetForeignTable(foreignTableId);
1439 server = GetForeignServer(table->serverid);
1440 user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
1441
1442 /* Fetch options */
1443 options = mysql_get_options(foreignTableId, true);
1444 Assert(options->svr_database != NULL && options->svr_table != NULL);
1445
1446 /* Connect to the server */
1447 conn = mysql_get_connection(server, user, options);
1448
1449 /* Build the query */
1450 initStringInfo(&sql);
1451 mysql_deparse_analyze(&sql, options->svr_database, options->svr_table);
1452
1453 if (mysql_query(conn, sql.data) != 0)
1454 mysql_error_print(conn);
1455
1456 result = mysql_store_result(conn);
1457
1458 /*
1459 * To get the table size in ANALYZE operation, we run a SELECT query by
1460 * passing the database name and table name. So if the remote table is not
1461 * present, then we end up getting zero rows. Throw an error in that case.
1462 */
1463 if (mysql_num_rows(result) == 0)
1464 ereport(ERROR,
1465 (errcode(ERRCODE_FDW_TABLE_NOT_FOUND),
1466 errmsg("relation %s.%s does not exist", options->svr_database,
1467 options->svr_table)));
1468
1469 if (result)
1470 {
1471 MYSQL_ROW row;
1472
1473 row = mysql_fetch_row(result);
1474 table_size = atof(row[0]);
1475 mysql_free_result(result);
1476 }
1477
1478 *totalpages = table_size / MYSQL_BLKSIZ;
1479
1480 return false;
1481 }
1482
1483 static List *
mysqlPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1484 mysqlPlanForeignModify(PlannerInfo *root,
1485 ModifyTable *plan,
1486 Index resultRelation,
1487 int subplan_index)
1488 {
1489
1490 CmdType operation = plan->operation;
1491 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1492 Relation rel;
1493 List *targetAttrs = NIL;
1494 StringInfoData sql;
1495 char *attname;
1496 Oid foreignTableId;
1497
1498 initStringInfo(&sql);
1499
1500 /*
1501 * Core code already has some lock on each rel being planned, so we can
1502 * use NoLock here.
1503 */
1504 #if PG_VERSION_NUM < 130000
1505 rel = heap_open(rte->relid, NoLock);
1506 #else
1507 rel = table_open(rte->relid, NoLock);
1508 #endif
1509
1510 foreignTableId = RelationGetRelid(rel);
1511
1512 if (!mysql_is_column_unique(foreignTableId))
1513 ereport(ERROR,
1514 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1515 errmsg("first column of remote table must be unique for INSERT/UPDATE/DELETE operation")));
1516
1517 /*
1518 * In an INSERT, we transmit all columns that are defined in the foreign
1519 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1520 * foreign table, we transmit all columns like INSERT; else we transmit
1521 * only columns that were explicitly targets of the UPDATE, so as to avoid
1522 * unnecessary data transmission. (We can't do that for INSERT since we
1523 * would miss sending default values for columns not listed in the source
1524 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1525 * those triggers might change values for non-target columns, in which
1526 * case we would miss sending changed values for those columns.)
1527 */
1528 if (operation == CMD_INSERT ||
1529 (operation == CMD_UPDATE &&
1530 rel->trigdesc &&
1531 rel->trigdesc->trig_update_before_row))
1532 {
1533 TupleDesc tupdesc = RelationGetDescr(rel);
1534 int attnum;
1535
1536 /*
1537 * If it is an UPDATE operation, check for row identifier column in
1538 * target attribute list by calling getUpdateTargetAttrs().
1539 */
1540 if (operation == CMD_UPDATE)
1541 getUpdateTargetAttrs(rte);
1542
1543 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1544 {
1545 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1546
1547 if (!attr->attisdropped)
1548 targetAttrs = lappend_int(targetAttrs, attnum);
1549 }
1550 }
1551 else if (operation == CMD_UPDATE)
1552 {
1553 targetAttrs = getUpdateTargetAttrs(rte);
1554 /* We also want the rowid column to be available for the update */
1555 targetAttrs = lcons_int(1, targetAttrs);
1556 }
1557 else
1558 targetAttrs = lcons_int(1, targetAttrs);
1559
1560 #if PG_VERSION_NUM >= 110000
1561 attname = get_attname(foreignTableId, 1, false);
1562 #else
1563 attname = get_relid_attribute_name(foreignTableId, 1);
1564 #endif
1565
1566 /*
1567 * Construct the SQL command string.
1568 */
1569 switch (operation)
1570 {
1571 case CMD_INSERT:
1572 mysql_deparse_insert(&sql, root, resultRelation, rel, targetAttrs);
1573 break;
1574 case CMD_UPDATE:
1575 mysql_deparse_update(&sql, root, resultRelation, rel, targetAttrs,
1576 attname);
1577 break;
1578 case CMD_DELETE:
1579 mysql_deparse_delete(&sql, root, resultRelation, rel, attname);
1580 break;
1581 default:
1582 elog(ERROR, "unexpected operation: %d", (int) operation);
1583 break;
1584 }
1585
1586 if (plan->returningLists)
1587 ereport(ERROR,
1588 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1589 errmsg("RETURNING is not supported by this FDW")));
1590
1591 #if PG_VERSION_NUM < 130000
1592 heap_close(rel, NoLock);
1593 #else
1594 table_close(rel, NoLock);
1595 #endif
1596
1597 return list_make2(makeString(sql.data), targetAttrs);
1598 }
1599
1600 /*
1601 * mysqlBeginForeignModify
1602 * Begin an insert/update/delete operation on a foreign table
1603 */
1604 static void
mysqlBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1605 mysqlBeginForeignModify(ModifyTableState *mtstate,
1606 ResultRelInfo *resultRelInfo,
1607 List *fdw_private,
1608 int subplan_index,
1609 int eflags)
1610 {
1611 MySQLFdwExecState *fmstate;
1612 EState *estate = mtstate->ps.state;
1613 Relation rel = resultRelInfo->ri_RelationDesc;
1614 AttrNumber n_params;
1615 Oid typefnoid = InvalidOid;
1616 bool isvarlena = false;
1617 ListCell *lc;
1618 Oid foreignTableId = InvalidOid;
1619 RangeTblEntry *rte;
1620 Oid userid;
1621 ForeignServer *server;
1622 UserMapping *user;
1623 ForeignTable *table;
1624
1625 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1626 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1627
1628 foreignTableId = RelationGetRelid(rel);
1629
1630 table = GetForeignTable(foreignTableId);
1631 server = GetForeignServer(table->serverid);
1632 user = GetUserMapping(userid, server->serverid);
1633
1634 /*
1635 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1636 * stays NULL.
1637 */
1638 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1639 return;
1640
1641 /* Begin constructing MySQLFdwExecState. */
1642 fmstate = (MySQLFdwExecState *) palloc0(sizeof(MySQLFdwExecState));
1643
1644 fmstate->mysqlFdwOptions = mysql_get_options(foreignTableId, true);
1645 fmstate->conn = mysql_get_connection(server, user,
1646 fmstate->mysqlFdwOptions);
1647
1648 fmstate->query = strVal(list_nth(fdw_private, 0));
1649 fmstate->retrieved_attrs = (List *) list_nth(fdw_private, 1);
1650
1651 n_params = list_length(fmstate->retrieved_attrs) + 1;
1652 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1653 fmstate->p_nums = 0;
1654 #if PG_VERSION_NUM >= 110000
1655 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1656 "mysql_fdw temporary data",
1657 ALLOCSET_DEFAULT_SIZES);
1658 #else
1659 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1660 "mysql_fdw temporary data",
1661 ALLOCSET_SMALL_MINSIZE,
1662 ALLOCSET_SMALL_INITSIZE,
1663 ALLOCSET_SMALL_MAXSIZE);
1664 #endif
1665
1666 /* Set up for remaining transmittable parameters */
1667 foreach(lc, fmstate->retrieved_attrs)
1668 {
1669 int attnum = lfirst_int(lc);
1670 Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel),
1671 attnum - 1);
1672
1673 Assert(!attr->attisdropped);
1674
1675 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1676 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1677 fmstate->p_nums++;
1678 }
1679 Assert(fmstate->p_nums <= n_params);
1680
1681 n_params = list_length(fmstate->retrieved_attrs);
1682
1683 /* Initialize mysql statment */
1684 fmstate->stmt = mysql_stmt_init(fmstate->conn);
1685 if (!fmstate->stmt)
1686 ereport(ERROR,
1687 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1688 errmsg("failed to initialize the MySQL query: \n%s",
1689 mysql_error(fmstate->conn))));
1690
1691 /* Prepare mysql statment */
1692 if (mysql_stmt_prepare(fmstate->stmt, fmstate->query,
1693 strlen(fmstate->query)) != 0)
1694 mysql_stmt_error_print(fmstate, "failed to prepare the MySQL query");
1695
1696 resultRelInfo->ri_FdwState = fmstate;
1697 }
1698
1699 /*
1700 * mysqlExecForeignInsert
1701 * Insert one row into a foreign table
1702 */
1703 static TupleTableSlot *
mysqlExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1704 mysqlExecForeignInsert(EState *estate,
1705 ResultRelInfo *resultRelInfo,
1706 TupleTableSlot *slot,
1707 TupleTableSlot *planSlot)
1708 {
1709 MySQLFdwExecState *fmstate;
1710 MYSQL_BIND *mysql_bind_buffer;
1711 ListCell *lc;
1712 int n_params;
1713 MemoryContext oldcontext;
1714 bool *isnull;
1715
1716 fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1717 n_params = list_length(fmstate->retrieved_attrs);
1718
1719 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
1720
1721 mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * n_params);
1722 isnull = (bool *) palloc0(sizeof(bool) * n_params);
1723
1724 mysql_query(fmstate->conn, "SET sql_mode='ANSI_QUOTES'");
1725
1726 foreach(lc, fmstate->retrieved_attrs)
1727 {
1728 int attnum = lfirst_int(lc) - 1;
1729 Oid type = TupleDescAttr(slot->tts_tupleDescriptor, attnum)->atttypid;
1730 Datum value;
1731
1732 value = slot_getattr(slot, attnum + 1, &isnull[attnum]);
1733
1734 mysql_bind_sql_var(type, attnum, value, mysql_bind_buffer,
1735 &isnull[attnum]);
1736 }
1737
1738 /* Bind values */
1739 if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1740 mysql_stmt_error_print(fmstate, "failed to bind the MySQL query");
1741
1742 /* Execute the query */
1743 if (mysql_stmt_execute(fmstate->stmt) != 0)
1744 mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1745
1746 MemoryContextSwitchTo(oldcontext);
1747 MemoryContextReset(fmstate->temp_cxt);
1748 return slot;
1749 }
1750
1751 static TupleTableSlot *
mysqlExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1752 mysqlExecForeignUpdate(EState *estate,
1753 ResultRelInfo *resultRelInfo,
1754 TupleTableSlot *slot,
1755 TupleTableSlot *planSlot)
1756 {
1757 MySQLFdwExecState *fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1758 Relation rel = resultRelInfo->ri_RelationDesc;
1759 MYSQL_BIND *mysql_bind_buffer;
1760 Oid foreignTableId = RelationGetRelid(rel);
1761 bool is_null = false;
1762 ListCell *lc;
1763 int bindnum = 0;
1764 Oid typeoid;
1765 Datum value;
1766 int n_params;
1767 bool *isnull;
1768 Datum new_value;
1769 HeapTuple tuple;
1770 Form_pg_attribute attr;
1771 bool found_row_id_col = false;
1772
1773 n_params = list_length(fmstate->retrieved_attrs);
1774
1775 mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * n_params);
1776 isnull = (bool *) palloc0(sizeof(bool) * n_params);
1777
1778 /* Bind the values */
1779 foreach(lc, fmstate->retrieved_attrs)
1780 {
1781 int attnum = lfirst_int(lc);
1782 Oid type;
1783
1784 /*
1785 * The first attribute cannot be in the target list attribute. Set the
1786 * found_row_id_col to true once we find it so that we can fetch the
1787 * value later.
1788 */
1789 if (attnum == 1)
1790 {
1791 found_row_id_col = true;
1792 continue;
1793 }
1794
1795 type = TupleDescAttr(slot->tts_tupleDescriptor, attnum - 1)->atttypid;
1796 value = slot_getattr(slot, attnum, (bool *) (&isnull[bindnum]));
1797
1798 mysql_bind_sql_var(type, bindnum, value, mysql_bind_buffer,
1799 &isnull[bindnum]);
1800 bindnum++;
1801 }
1802
1803 /*
1804 * Since we add a row identifier column in the target list always, so
1805 * found_row_id_col flag should be true.
1806 */
1807 if (!found_row_id_col)
1808 elog(ERROR, "missing row identifier column value in UPDATE");
1809
1810 new_value = slot_getattr(slot, 1, &is_null);
1811
1812 /*
1813 * Get the row identifier column value that was passed up as a resjunk
1814 * column and compare that value with the new value to identify if that
1815 * value is changed.
1816 */
1817 value = ExecGetJunkAttribute(planSlot, 1, &is_null);
1818
1819 tuple = SearchSysCache2(ATTNUM,
1820 ObjectIdGetDatum(foreignTableId),
1821 Int16GetDatum(1));
1822 if (!HeapTupleIsValid(tuple))
1823 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1824 1, foreignTableId);
1825
1826 attr = (Form_pg_attribute) GETSTRUCT(tuple);
1827 typeoid = attr->atttypid;
1828
1829 if (DatumGetPointer(new_value) != NULL && DatumGetPointer(value) != NULL)
1830 {
1831 Datum n_value = new_value;
1832 Datum o_value = value;
1833
1834 /* If the attribute type is varlena then need to detoast the datums. */
1835 if (attr->attlen == -1)
1836 {
1837 n_value = PointerGetDatum(PG_DETOAST_DATUM(new_value));
1838 o_value = PointerGetDatum(PG_DETOAST_DATUM(value));
1839 }
1840
1841 if (!datumIsEqual(o_value, n_value, attr->attbyval, attr->attlen))
1842 ereport(ERROR,
1843 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1844 errmsg("row identifier column update is not supported")));
1845
1846 /* Free memory if it's a copy made above */
1847 if (DatumGetPointer(n_value) != DatumGetPointer(new_value))
1848 pfree(DatumGetPointer(n_value));
1849 if (DatumGetPointer(o_value) != DatumGetPointer(value))
1850 pfree(DatumGetPointer(o_value));
1851 }
1852 else if (!(DatumGetPointer(new_value) == NULL &&
1853 DatumGetPointer(value) == NULL))
1854 ereport(ERROR,
1855 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1856 errmsg("row identifier column update is not supported")));
1857
1858 ReleaseSysCache(tuple);
1859
1860 /* Bind qual */
1861 mysql_bind_sql_var(typeoid, bindnum, value, mysql_bind_buffer, &is_null);
1862
1863 if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1864 ereport(ERROR,
1865 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1866 errmsg("failed to bind the MySQL query: %s",
1867 mysql_error(fmstate->conn))));
1868
1869 /* Execute the query */
1870 if (mysql_stmt_execute(fmstate->stmt) != 0)
1871 mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1872
1873 /* Return NULL if nothing was updated on the remote end */
1874 return slot;
1875 }
1876
1877 /*
1878 * mysqlAddForeignUpdateTargets
1879 * Add column(s) needed for update/delete on a foreign table, we are
1880 * using first column as row identification column, so we are adding
1881 * that into target list.
1882 */
1883 static void
mysqlAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1884 mysqlAddForeignUpdateTargets(Query *parsetree,
1885 RangeTblEntry *target_rte,
1886 Relation target_relation)
1887 {
1888 Var *var;
1889 const char *attrname;
1890 TargetEntry *tle;
1891
1892 /*
1893 * What we need is the rowid which is the first column
1894 */
1895 Form_pg_attribute attr =
1896 TupleDescAttr(RelationGetDescr(target_relation), 0);
1897
1898 /* Make a Var representing the desired value */
1899 var = makeVar(parsetree->resultRelation,
1900 1,
1901 attr->atttypid,
1902 attr->atttypmod,
1903 InvalidOid,
1904 0);
1905
1906 /* Wrap it in a TLE with the right name ... */
1907 attrname = NameStr(attr->attname);
1908
1909 tle = makeTargetEntry((Expr *) var,
1910 list_length(parsetree->targetList) + 1,
1911 pstrdup(attrname), true);
1912
1913 /* ... and add it to the query's targetlist */
1914 parsetree->targetList = lappend(parsetree->targetList, tle);
1915 }
1916
1917 /*
1918 * mysqlExecForeignDelete
1919 * Delete one row from a foreign table
1920 */
1921 static TupleTableSlot *
mysqlExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1922 mysqlExecForeignDelete(EState *estate,
1923 ResultRelInfo *resultRelInfo,
1924 TupleTableSlot *slot,
1925 TupleTableSlot *planSlot)
1926 {
1927 MySQLFdwExecState *fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState;
1928 Relation rel = resultRelInfo->ri_RelationDesc;
1929 MYSQL_BIND *mysql_bind_buffer;
1930 Oid foreignTableId = RelationGetRelid(rel);
1931 bool is_null = false;
1932 Oid typeoid;
1933 Datum value;
1934
1935 mysql_bind_buffer = (MYSQL_BIND *) palloc(sizeof(MYSQL_BIND));
1936
1937 /* Get the id that was passed up as a resjunk column */
1938 value = ExecGetJunkAttribute(planSlot, 1, &is_null);
1939 typeoid = get_atttype(foreignTableId, 1);
1940
1941 /* Bind qual */
1942 mysql_bind_sql_var(typeoid, 0, value, mysql_bind_buffer, &is_null);
1943
1944 if (mysql_stmt_bind_param(fmstate->stmt, mysql_bind_buffer) != 0)
1945 ereport(ERROR,
1946 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1947 errmsg("failed to execute the MySQL query: %s",
1948 mysql_error(fmstate->conn))));
1949
1950 /* Execute the query */
1951 if (mysql_stmt_execute(fmstate->stmt) != 0)
1952 mysql_stmt_error_print(fmstate, "failed to execute the MySQL query");
1953
1954 /* Return NULL if nothing was updated on the remote end */
1955 return slot;
1956 }
1957
1958 /*
1959 * mysqlEndForeignModify
1960 * Finish an insert/update/delete operation on a foreign table
1961 */
1962 static void
mysqlEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1963 mysqlEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
1964 {
1965 MySQLFdwExecState *festate = resultRelInfo->ri_FdwState;
1966
1967 if (festate && festate->stmt)
1968 {
1969 mysql_stmt_close(festate->stmt);
1970 festate->stmt = NULL;
1971 }
1972 }
1973
1974 /*
1975 * mysqlImportForeignSchema
1976 * Import a foreign schema (9.5+)
1977 */
1978 #if PG_VERSION_NUM >= 90500
1979 static List *
mysqlImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)1980 mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
1981 {
1982 List *commands = NIL;
1983 bool import_default = false;
1984 bool import_not_null = true;
1985 ForeignServer *server;
1986 UserMapping *user;
1987 mysql_opt *options;
1988 MYSQL *conn;
1989 StringInfoData buf;
1990 MYSQL_RES *volatile res = NULL;
1991 MYSQL_ROW row;
1992 ListCell *lc;
1993
1994 /* Parse statement options */
1995 foreach(lc, stmt->options)
1996 {
1997 DefElem *def = (DefElem *) lfirst(lc);
1998
1999 if (strcmp(def->defname, "import_default") == 0)
2000 import_default = defGetBoolean(def);
2001 else if (strcmp(def->defname, "import_not_null") == 0)
2002 import_not_null = defGetBoolean(def);
2003 else
2004 ereport(ERROR,
2005 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
2006 errmsg("invalid option \"%s\"", def->defname)));
2007 }
2008
2009 /*
2010 * Get connection to the foreign server. Connection manager will
2011 * establish new connection if necessary.
2012 */
2013 server = GetForeignServer(serverOid);
2014 user = GetUserMapping(GetUserId(), server->serverid);
2015 options = mysql_get_options(serverOid, false);
2016 conn = mysql_get_connection(server, user, options);
2017
2018 /* Create workspace for strings */
2019 initStringInfo(&buf);
2020
2021 /* Check that the schema really exists */
2022 appendStringInfo(&buf,
2023 "SELECT 1 FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s'",
2024 stmt->remote_schema);
2025
2026 if (mysql_query(conn, buf.data) != 0)
2027 mysql_error_print(conn);
2028
2029 res = mysql_store_result(conn);
2030 if (!res || mysql_num_rows(res) < 1)
2031 ereport(ERROR,
2032 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
2033 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
2034 stmt->remote_schema, server->servername)));
2035
2036 mysql_free_result(res);
2037 res = NULL;
2038 resetStringInfo(&buf);
2039
2040 /*
2041 * Fetch all table data from this schema, possibly restricted by EXCEPT or
2042 * LIMIT TO.
2043 */
2044 appendStringInfo(&buf,
2045 " SELECT"
2046 " t.TABLE_NAME,"
2047 " c.COLUMN_NAME,"
2048 " CASE"
2049 " WHEN c.DATA_TYPE = 'enum' THEN LOWER(CONCAT(t.TABLE_NAME, '_', c.COLUMN_NAME, '_t'))"
2050 " WHEN c.DATA_TYPE = 'tinyint' THEN 'smallint'"
2051 " WHEN c.DATA_TYPE = 'mediumint' THEN 'integer'"
2052 " WHEN c.DATA_TYPE = 'tinyint unsigned' THEN 'smallint'"
2053 " WHEN c.DATA_TYPE = 'smallint unsigned' THEN 'integer'"
2054 " WHEN c.DATA_TYPE = 'mediumint unsigned' THEN 'integer'"
2055 " WHEN c.DATA_TYPE = 'int unsigned' THEN 'bigint'"
2056 " WHEN c.DATA_TYPE = 'bigint unsigned' THEN 'numeric(20)'"
2057 " WHEN c.DATA_TYPE = 'double' THEN 'double precision'"
2058 " WHEN c.DATA_TYPE = 'float' THEN 'real'"
2059 " WHEN c.DATA_TYPE = 'datetime' THEN 'timestamp'"
2060 " WHEN c.DATA_TYPE = 'longtext' THEN 'text'"
2061 " WHEN c.DATA_TYPE = 'mediumtext' THEN 'text'"
2062 " WHEN c.DATA_TYPE = 'tinytext' THEN 'text'"
2063 " WHEN c.DATA_TYPE = 'blob' THEN 'bytea'"
2064 " WHEN c.DATA_TYPE = 'mediumblob' THEN 'bytea'"
2065 " WHEN c.DATA_TYPE = 'longblob' THEN 'bytea'"
2066 " ELSE c.DATA_TYPE"
2067 " END,"
2068 " c.COLUMN_TYPE,"
2069 " IF(c.IS_NULLABLE = 'NO', 't', 'f'),"
2070 " c.COLUMN_DEFAULT"
2071 " FROM"
2072 " information_schema.TABLES AS t"
2073 " JOIN"
2074 " information_schema.COLUMNS AS c"
2075 " ON"
2076 " t.TABLE_CATALOG <=> c.TABLE_CATALOG AND t.TABLE_SCHEMA <=> c.TABLE_SCHEMA AND t.TABLE_NAME <=> c.TABLE_NAME"
2077 " WHERE"
2078 " t.TABLE_SCHEMA = '%s'",
2079 stmt->remote_schema);
2080
2081 /* Apply restrictions for LIMIT TO and EXCEPT */
2082 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
2083 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
2084 {
2085 bool first_item = true;
2086
2087 appendStringInfoString(&buf, " AND t.TABLE_NAME ");
2088 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
2089 appendStringInfoString(&buf, "NOT ");
2090 appendStringInfoString(&buf, "IN (");
2091
2092 /* Append list of table names within IN clause */
2093 foreach(lc, stmt->table_list)
2094 {
2095 RangeVar *rv = (RangeVar *) lfirst(lc);
2096
2097 if (first_item)
2098 first_item = false;
2099 else
2100 appendStringInfoString(&buf, ", ");
2101
2102 appendStringInfo(&buf, "'%s'", rv->relname);
2103 }
2104 appendStringInfoChar(&buf, ')');
2105 }
2106
2107 /* Append ORDER BY at the end of query to ensure output ordering */
2108 appendStringInfo(&buf, " ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION");
2109
2110 /* Fetch the data */
2111 if (mysql_query(conn, buf.data) != 0)
2112 mysql_error_print(conn);
2113
2114 res = mysql_store_result(conn);
2115 row = mysql_fetch_row(res);
2116 while (row)
2117 {
2118 char *tablename = row[0];
2119 bool first_item = true;
2120
2121 resetStringInfo(&buf);
2122 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
2123 quote_identifier(tablename));
2124
2125 /* Scan all rows for this table */
2126 do
2127 {
2128 char *attname;
2129 char *typename;
2130 char *typedfn;
2131 char *attnotnull;
2132 char *attdefault;
2133
2134 /* If table has no columns, we'll see nulls here */
2135 if (row[1] == NULL)
2136 continue;
2137
2138 attname = row[1];
2139 typename = row[2];
2140
2141 if (strcmp(typename, "char") == 0 || strcmp(typename, "varchar") == 0)
2142 typename = row[3];
2143
2144 typedfn = row[3];
2145 attnotnull = row[4];
2146 attdefault = row[5] == NULL ? (char *) NULL : row[5];
2147
2148 if (strncmp(typedfn, "enum(", 5) == 0)
2149 ereport(NOTICE,
2150 (errmsg("error while generating the table definition"),
2151 errhint("If you encounter an error, you may need to execute the following first:\nDO $$BEGIN IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_type WHERE typname = '%s') THEN CREATE TYPE %s AS %s; END IF; END$$;\n",
2152 typename, typename, typedfn)));
2153
2154 if (first_item)
2155 first_item = false;
2156 else
2157 appendStringInfoString(&buf, ",\n");
2158
2159 /* Print column name and type */
2160 appendStringInfo(&buf, " %s %s", quote_identifier(attname),
2161 typename);
2162
2163 /* Add DEFAULT if needed */
2164 if (import_default && attdefault != NULL)
2165 appendStringInfo(&buf, " DEFAULT %s", attdefault);
2166
2167 /* Add NOT NULL if needed */
2168 if (import_not_null && attnotnull[0] == 't')
2169 appendStringInfoString(&buf, " NOT NULL");
2170 }
2171 while ((row = mysql_fetch_row(res)) &&
2172 (strcmp(row[0], tablename) == 0));
2173
2174 /*
2175 * Add server name and table-level options. We specify remote
2176 * database and table name as options (the latter to ensure that
2177 * renaming the foreign table doesn't break the association).
2178 */
2179 appendStringInfo(&buf,
2180 "\n) SERVER %s OPTIONS (dbname '%s', table_name '%s');\n",
2181 quote_identifier(server->servername),
2182 stmt->remote_schema,
2183 tablename);
2184
2185 commands = lappend(commands, pstrdup(buf.data));
2186 }
2187
2188 /* Clean up */
2189 mysql_free_result(res);
2190 res = NULL;
2191 resetStringInfo(&buf);
2192
2193 mysql_release_connection(conn);
2194
2195 return commands;
2196 }
2197 #endif
2198
2199 #if PG_VERSION_NUM >= 110000
2200 /*
2201 * mysqlBeginForeignInsert
2202 * Prepare for an insert operation triggered by partition routing
2203 * or COPY FROM.
2204 *
2205 * This is not yet supported, so raise an error.
2206 */
2207 static void
mysqlBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)2208 mysqlBeginForeignInsert(ModifyTableState *mtstate,
2209 ResultRelInfo *resultRelInfo)
2210 {
2211 ereport(ERROR,
2212 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2213 errmsg("COPY and foreign partition routing not supported in mysql_fdw")));
2214 }
2215
2216 /*
2217 * mysqlEndForeignInsert
2218 * BeginForeignInsert() is not yet implemented, hence we do not
2219 * have anything to cleanup as of now. We throw an error here just
2220 * to make sure when we do that we do not forget to cleanup
2221 * resources.
2222 */
2223 static void
mysqlEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2224 mysqlEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo)
2225 {
2226 ereport(ERROR,
2227 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2228 errmsg("COPY and foreign partition routing not supported in mysql_fdw")));
2229 }
2230 #endif
2231
2232 /*
2233 * Prepare for processing of parameters used in remote query.
2234 */
2235 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values,Oid ** param_types)2236 prepare_query_params(PlanState *node,
2237 List *fdw_exprs,
2238 int numParams,
2239 FmgrInfo **param_flinfo,
2240 List **param_exprs,
2241 const char ***param_values,
2242 Oid **param_types)
2243 {
2244 int i;
2245 ListCell *lc;
2246
2247 Assert(numParams > 0);
2248
2249 /* Prepare for output conversion of parameters used in remote query. */
2250 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
2251
2252 *param_types = (Oid *) palloc0(sizeof(Oid) * numParams);
2253
2254 i = 0;
2255 foreach(lc, fdw_exprs)
2256 {
2257 Node *param_expr = (Node *) lfirst(lc);
2258 Oid typefnoid;
2259 bool isvarlena;
2260
2261 (*param_types)[i] = exprType(param_expr);
2262
2263 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
2264 fmgr_info(typefnoid, &(*param_flinfo)[i]);
2265 i++;
2266 }
2267
2268 /*
2269 * Prepare remote-parameter expressions for evaluation. (Note: in
2270 * practice, we expect that all these expressions will be just Params, so
2271 * we could possibly do something more efficient than using the full
2272 * expression-eval machinery for this. But probably there would be little
2273 * benefit, and it'd require postgres_fdw to know more than is desirable
2274 * about Param evaluation.)
2275 */
2276 #if PG_VERSION_NUM >= 100000
2277 *param_exprs = ExecInitExprList(fdw_exprs, node);
2278 #else
2279 *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
2280 #endif
2281
2282 /* Allocate buffer for text form of query parameters. */
2283 *param_values = (const char **) palloc0(numParams * sizeof(char *));
2284 }
2285
2286 /*
2287 * Construct array of query parameter values in text format.
2288 */
2289 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values,MYSQL_BIND ** mysql_bind_buf,Oid * param_types)2290 process_query_params(ExprContext *econtext,
2291 FmgrInfo *param_flinfo,
2292 List *param_exprs,
2293 const char **param_values,
2294 MYSQL_BIND **mysql_bind_buf,
2295 Oid *param_types)
2296 {
2297 int i;
2298 ListCell *lc;
2299
2300 i = 0;
2301 foreach(lc, param_exprs)
2302 {
2303 ExprState *expr_state = (ExprState *) lfirst(lc);
2304 Datum expr_value;
2305 bool isNull;
2306
2307 /* Evaluate the parameter expression */
2308 #if PG_VERSION_NUM >= 100000
2309 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
2310 #else
2311 expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
2312 #endif
2313 mysql_bind_sql_var(param_types[i], i, expr_value, *mysql_bind_buf,
2314 &isNull);
2315
2316 /*
2317 * Get string representation of each parameter value by invoking
2318 * type-specific output function, unless the value is null.
2319 */
2320 if (isNull)
2321 param_values[i] = NULL;
2322 else
2323 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
2324 i++;
2325 }
2326 }
2327
2328 /*
2329 * Process the query params and bind the same with the statement, if any.
2330 * Also, execute the statement.
2331 */
2332 static void
bind_stmt_params_and_exec(ForeignScanState * node)2333 bind_stmt_params_and_exec(ForeignScanState *node)
2334 {
2335 MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
2336 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2337 int numParams = festate->numParams;
2338 const char **values = festate->param_values;
2339 MYSQL_BIND *mysql_bind_buffer = NULL;
2340
2341 /*
2342 * Construct array of query parameter values in text format. We do the
2343 * conversions in the short-lived per-tuple context, so as not to cause a
2344 * memory leak over repeated scans.
2345 */
2346 if (numParams > 0)
2347 {
2348 MemoryContext oldcontext;
2349
2350 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2351
2352 mysql_bind_buffer = (MYSQL_BIND *) palloc0(sizeof(MYSQL_BIND) * numParams);
2353
2354 process_query_params(econtext,
2355 festate->param_flinfo,
2356 festate->param_exprs,
2357 values,
2358 &mysql_bind_buffer,
2359 festate->param_types);
2360
2361 mysql_stmt_bind_param(festate->stmt, mysql_bind_buffer);
2362
2363 MemoryContextSwitchTo(oldcontext);
2364 }
2365
2366 /*
2367 * Finally, execute the query. The result will be placed in the array we
2368 * already bind.
2369 */
2370 if (mysql_stmt_execute(festate->stmt) != 0)
2371 mysql_stmt_error_print(festate, "failed to execute the MySQL query");
2372
2373 /* Mark the query as executed */
2374 festate->query_executed = true;
2375 }
2376
2377 Datum
mysql_fdw_version(PG_FUNCTION_ARGS)2378 mysql_fdw_version(PG_FUNCTION_ARGS)
2379 {
2380 PG_RETURN_INT32(CODE_VERSION);
2381 }
2382
2383 static void
mysql_error_print(MYSQL * conn)2384 mysql_error_print(MYSQL *conn)
2385 {
2386 switch (mysql_errno(conn))
2387 {
2388 case CR_NO_ERROR:
2389 /* Should not happen, though give some message */
2390 elog(ERROR, "unexpected error code");
2391 break;
2392 case CR_OUT_OF_MEMORY:
2393 case CR_SERVER_GONE_ERROR:
2394 case CR_SERVER_LOST:
2395 case CR_UNKNOWN_ERROR:
2396 mysql_release_connection(conn);
2397 ereport(ERROR,
2398 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2399 errmsg("failed to execute the MySQL query: \n%s",
2400 mysql_error(conn))));
2401 break;
2402 case CR_COMMANDS_OUT_OF_SYNC:
2403 default:
2404 ereport(ERROR,
2405 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2406 errmsg("failed to execute the MySQL query: \n%s",
2407 mysql_error(conn))));
2408 }
2409 }
2410
2411 static void
mysql_stmt_error_print(MySQLFdwExecState * festate,const char * msg)2412 mysql_stmt_error_print(MySQLFdwExecState *festate, const char *msg)
2413 {
2414 switch (mysql_stmt_errno(festate->stmt))
2415 {
2416 case CR_NO_ERROR:
2417 /* Should not happen, though give some message */
2418 elog(ERROR, "unexpected error code");
2419 break;
2420 case CR_OUT_OF_MEMORY:
2421 case CR_SERVER_GONE_ERROR:
2422 case CR_SERVER_LOST:
2423 case CR_UNKNOWN_ERROR:
2424 mysql_release_connection(festate->conn);
2425 ereport(ERROR,
2426 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2427 errmsg("%s: \n%s", msg, mysql_error(festate->conn))));
2428 break;
2429 case CR_COMMANDS_OUT_OF_SYNC:
2430 default:
2431 ereport(ERROR,
2432 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2433 errmsg("%s: \n%s", msg, mysql_error(festate->conn))));
2434 break;
2435 }
2436 }
2437
2438 /*
2439 * getUpdateTargetAttrs
2440 * Returns the list of attribute numbers of the columns being updated.
2441 */
2442 static List *
getUpdateTargetAttrs(RangeTblEntry * rte)2443 getUpdateTargetAttrs(RangeTblEntry *rte)
2444 {
2445 List *targetAttrs = NIL;
2446
2447 #if PG_VERSION_NUM >= 90500
2448 Bitmapset *tmpset = bms_copy(rte->updatedCols);
2449 #else
2450 Bitmapset *tmpset = bms_copy(rte->modifiedCols);
2451 #endif
2452 AttrNumber col;
2453
2454 while ((col = bms_first_member(tmpset)) >= 0)
2455 {
2456 col += FirstLowInvalidHeapAttributeNumber;
2457 if (col <= InvalidAttrNumber) /* shouldn't happen */
2458 elog(ERROR, "system-column update is not supported");
2459
2460 /* We also disallow updates to the first column */
2461 if (col == 1)
2462 ereport(ERROR,
2463 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2464 errmsg("row identifier column update is not supported")));
2465
2466 targetAttrs = lappend_int(targetAttrs, col);
2467 }
2468
2469 return targetAttrs;
2470 }
2471
2472 /*
2473 * mysqlGetForeignJoinPaths
2474 * Add possible ForeignPath to joinrel, if join is safe to push down.
2475 */
2476 #if PG_VERSION_NUM >= 90600
2477 static void
mysqlGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)2478 mysqlGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel,
2479 RelOptInfo *outerrel, RelOptInfo *innerrel,
2480 JoinType jointype, JoinPathExtraData *extra)
2481 {
2482 MySQLFdwRelationInfo *fpinfo;
2483 ForeignPath *joinpath;
2484 Cost startup_cost;
2485 Cost total_cost;
2486 Path *epq_path = NULL; /* Path to create plan to be executed when
2487 * EvalPlanQual gets triggered. */
2488
2489 /*
2490 * Skip if this join combination has been considered already.
2491 */
2492 if (joinrel->fdw_private)
2493 return;
2494
2495 /*
2496 * Create unfinished MySQLFdwRelationInfo entry which is used to indicate
2497 * that the join relation is already considered, so that we won't waste
2498 * time in judging safety of join pushdown and adding the same paths again
2499 * if found safe. Once we know that this join can be pushed down, we fill
2500 * the entry.
2501 */
2502 fpinfo = (MySQLFdwRelationInfo *) palloc0(sizeof(MySQLFdwRelationInfo));
2503 fpinfo->pushdown_safe = false;
2504 joinrel->fdw_private = fpinfo;
2505 /* attrs_used is only for base relations. */
2506 fpinfo->attrs_used = NULL;
2507
2508 /*
2509 * In case there is a possibility that EvalPlanQual will be executed, we
2510 * should be able to reconstruct the row, from base relations applying all
2511 * the conditions. We create a local plan from a suitable local path
2512 * available in the path list. In case such a path doesn't exist, we can
2513 * not push the join to the foreign server since we won't be able to
2514 * reconstruct the row for EvalPlanQual(). Find an alternative local path
2515 * before we add ForeignPath, lest the new path would kick possibly the
2516 * only local path. Do this before calling mysql_foreign_join_ok(), since
2517 * that function updates fpinfo and marks it as pushable if the join is
2518 * found to be pushable.
2519 */
2520 if (root->parse->commandType == CMD_DELETE ||
2521 root->parse->commandType == CMD_UPDATE ||
2522 root->rowMarks)
2523 {
2524 epq_path = GetExistingLocalJoinPath(joinrel);
2525 if (!epq_path)
2526 {
2527 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
2528 return;
2529 }
2530 }
2531 else
2532 epq_path = NULL;
2533
2534 if (!mysql_foreign_join_ok(root, joinrel, jointype, outerrel, innerrel,
2535 extra))
2536 {
2537 /* Free path required for EPQ if we copied one; we don't need it now */
2538 if (epq_path)
2539 pfree(epq_path);
2540 return;
2541 }
2542
2543 /* TODO: Put accurate estimates here */
2544 startup_cost = 15.0;
2545 total_cost = 20 + startup_cost;
2546
2547 /*
2548 * Create a new join path and add it to the joinrel which represents a
2549 * join between foreign tables.
2550 */
2551 #if PG_VERSION_NUM >= 120000
2552 joinpath = create_foreign_join_path(root,
2553 joinrel,
2554 NULL, /* default pathtarget */
2555 joinrel->rows,
2556 startup_cost,
2557 total_cost,
2558 NIL, /* no pathkeys */
2559 joinrel->lateral_relids,
2560 epq_path,
2561 NIL); /* no fdw_private */
2562 #else
2563 joinpath = create_foreignscan_path(root,
2564 joinrel,
2565 NULL, /* default pathtarget */
2566 joinrel->rows,
2567 startup_cost,
2568 total_cost,
2569 NIL, /* no pathkeys */
2570 joinrel->lateral_relids,
2571 epq_path,
2572 NIL); /* no fdw_private */
2573 #endif /* PG_VERSION_NUM >= 120000 */
2574
2575 /* Add generated path into joinrel by add_path(). */
2576 add_path(joinrel, (Path *) joinpath);
2577
2578 /* XXX Consider pathkeys for the join relation */
2579
2580 /* XXX Consider parameterized paths for the join relation */
2581 }
2582
2583 /*
2584 * mysql_foreign_join_ok
2585 * Assess whether the join between inner and outer relations can be
2586 * pushed down to the foreign server.
2587 *
2588 * As a side effect, save information we obtain in this function to
2589 * MySQLFdwRelationInfo passed in.
2590 */
2591 static bool
mysql_foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)2592 mysql_foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
2593 JoinType jointype, RelOptInfo *outerrel,
2594 RelOptInfo *innerrel, JoinPathExtraData *extra)
2595 {
2596 MySQLFdwRelationInfo *fpinfo;
2597 MySQLFdwRelationInfo *fpinfo_o;
2598 MySQLFdwRelationInfo *fpinfo_i;
2599 ListCell *lc;
2600 List *joinclauses;
2601
2602 /*
2603 * We support pushing down INNER, LEFT and RIGHT joins.
2604 * Constructing queries representing SEMI and ANTI joins is hard, hence
2605 * not considered right now.
2606 */
2607 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
2608 jointype != JOIN_RIGHT)
2609 return false;
2610
2611 /*
2612 * If either of the joining relations is marked as unsafe to pushdown, the
2613 * join cannot be pushed down.
2614 */
2615 fpinfo = (MySQLFdwRelationInfo *) joinrel->fdw_private;
2616 fpinfo_o = (MySQLFdwRelationInfo *) outerrel->fdw_private;
2617 fpinfo_i = (MySQLFdwRelationInfo *) innerrel->fdw_private;
2618 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
2619 !fpinfo_i || !fpinfo_i->pushdown_safe)
2620 return false;
2621
2622 /*
2623 * If joining relations have local conditions, those conditions are
2624 * required to be applied before joining the relations. Hence the join can
2625 * not be pushed down.
2626 */
2627 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
2628 return false;
2629
2630 /*
2631 * Separate restrict list into join quals and pushed-down (other) quals.
2632 *
2633 * Join quals belonging to an outer join must all be shippable, else we
2634 * cannot execute the join remotely. Add such quals to 'joinclauses'.
2635 *
2636 * Add other quals to fpinfo->remote_conds if they are shippable, else to
2637 * fpinfo->local_conds. In an inner join it's okay to execute conditions
2638 * either locally or remotely; the same is true for pushed-down conditions
2639 * at an outer join.
2640 *
2641 * Note we might return failure after having already scribbled on
2642 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
2643 * won't consult those lists again if we deem the join unshippable.
2644 */
2645 joinclauses = NIL;
2646 foreach(lc, extra->restrictlist)
2647 {
2648 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
2649 bool is_remote_clause = mysql_is_foreign_expr(root, joinrel,
2650 rinfo->clause,
2651 true);
2652
2653 if (IS_OUTER_JOIN(jointype) &&
2654 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
2655 {
2656 if (!is_remote_clause)
2657 return false;
2658 joinclauses = lappend(joinclauses, rinfo);
2659 }
2660 else
2661 {
2662 if (is_remote_clause)
2663 {
2664 /*
2665 * Unlike postgres_fdw, don't append the join clauses to
2666 * remote_conds, instead keep the join clauses separate.
2667 * Currently, we are providing limited operator pushability
2668 * support for join pushdown, hence we keep those clauses
2669 * separate to avoid INNER JOIN not getting pushdown if any
2670 * of the WHERE clause is not shippable as per join pushdown
2671 * shippability.
2672 */
2673 if (jointype == JOIN_INNER)
2674 joinclauses = lappend(joinclauses, rinfo);
2675 else
2676 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
2677 }
2678 else
2679 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
2680 }
2681 }
2682
2683 /*
2684 * mysqlDeparseExplicitTargetList() isn't smart enough to handle anything
2685 * other than a Var. In particular, if there's some PlaceHolderVar that
2686 * would need to be evaluated within this join tree (because there's an
2687 * upper reference to a quantity that may go to NULL as a result of an
2688 * outer join), then we can't try to push the join down because we'll fail
2689 * when we get to mysqlDeparseExplicitTargetList(). However, a
2690 * PlaceHolderVar that needs to be evaluated *at the top* of this join tree
2691 * is OK, because we can do that locally after fetching the results from
2692 * the remote side.
2693 */
2694 foreach(lc, root->placeholder_list)
2695 {
2696 PlaceHolderInfo *phinfo = lfirst(lc);
2697 Relids relids;
2698
2699 /* PlaceHolderInfo refers to parent relids, not child relids. */
2700 #if PG_VERSION_NUM >= 100000
2701 relids = IS_OTHER_REL(joinrel) ?
2702 joinrel->top_parent_relids : joinrel->relids;
2703 #else
2704 relids = joinrel->relids;
2705 #endif /* PG_VERSION_NUM >= 100000 */
2706
2707 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
2708 bms_nonempty_difference(relids, phinfo->ph_eval_at))
2709 return false;
2710 }
2711
2712 /* Save the join clauses, for later use. */
2713 fpinfo->joinclauses = joinclauses;
2714
2715 /*
2716 * Pull the other remote conditions from the joining relations into join
2717 * clauses or other remote clauses (remote_conds) of this relation. This
2718 * avoids building subqueries at every join step.
2719 *
2720 * For an inner join, clauses from both the relations are added to the
2721 * other remote clauses. For an OUTER join, the clauses from the outer
2722 * side are added to remote_conds since those can be evaluated after the
2723 * join is evaluated. The clauses from inner side are added to the
2724 * joinclauses, since they need to evaluated while constructing the join.
2725 *
2726 * The joining sides cannot have local conditions, thus no need to test
2727 * shippability of the clauses being pulled up.
2728 */
2729 switch (jointype)
2730 {
2731 case JOIN_INNER:
2732 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2733 fpinfo_i->remote_conds);
2734 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2735 fpinfo_o->remote_conds);
2736 break;
2737
2738 case JOIN_LEFT:
2739 /* Check that clauses from the inner side are pushable or not. */
2740 foreach(lc, fpinfo_i->remote_conds)
2741 {
2742 RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
2743
2744 if (!mysql_is_foreign_expr(root, joinrel, ri->clause, true))
2745 return false;
2746 }
2747
2748 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
2749 fpinfo_i->remote_conds);
2750 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2751 fpinfo_o->remote_conds);
2752 break;
2753
2754 case JOIN_RIGHT:
2755 /* Check that clauses from the outer side are pushable or not. */
2756 foreach(lc, fpinfo_o->remote_conds)
2757 {
2758 RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
2759
2760 if (!mysql_is_foreign_expr(root, joinrel, ri->clause, true))
2761 return false;
2762 }
2763
2764 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
2765 fpinfo_o->remote_conds);
2766 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
2767 fpinfo_i->remote_conds);
2768 break;
2769
2770 default:
2771 /* Should not happen, we have just check this above */
2772 elog(ERROR, "unsupported join type %d", jointype);
2773 }
2774
2775 fpinfo->outerrel = outerrel;
2776 fpinfo->innerrel = innerrel;
2777 fpinfo->jointype = jointype;
2778
2779 /* Mark that this join can be pushed down safely */
2780 fpinfo->pushdown_safe = true;
2781
2782 /*
2783 * Set the string describing this join relation to be used in EXPLAIN
2784 * output of corresponding ForeignScan.
2785 */
2786 fpinfo->relation_name = makeStringInfo();
2787 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
2788 fpinfo_o->relation_name->data,
2789 mysql_get_jointype_name(fpinfo->jointype),
2790 fpinfo_i->relation_name->data);
2791
2792 return true;
2793 }
2794
2795 /*
2796 * mysqlRecheckForeignScan
2797 * Execute a local join execution plan for a foreign join.
2798 */
2799 static bool
mysqlRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2800 mysqlRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2801 {
2802 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2803 PlanState *outerPlan = outerPlanState(node);
2804 TupleTableSlot *result;
2805
2806 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2807 if (scanrelid > 0)
2808 return true;
2809
2810 Assert(outerPlan != NULL);
2811
2812 /* Execute a local join execution plan */
2813 result = ExecProcNode(outerPlan);
2814 if (TupIsNull(result))
2815 return false;
2816
2817 /* Store result in the given slot */
2818 ExecCopySlot(slot, result);
2819
2820 return true;
2821 }
2822
2823 /*
2824 * mysql_adjust_whole_row_ref
2825 * If the given list of Var nodes has whole-row reference, add Var
2826 * nodes corresponding to all the attributes of the corresponding
2827 * base relation.
2828 *
2829 * The function also returns an array of lists of var nodes. The array is
2830 * indexed by the RTI and entry there contains the list of Var nodes which
2831 * make up the whole-row reference for corresponding base relation.
2832 * The relations not covered by given join and the relations which do not
2833 * have whole-row references will have NIL entries.
2834 *
2835 * If there are no whole-row references in the given list, the given list is
2836 * returned unmodified and the other list is NIL.
2837 */
2838 static List *
mysql_adjust_whole_row_ref(PlannerInfo * root,List * scan_var_list,List ** whole_row_lists,Bitmapset * relids)2839 mysql_adjust_whole_row_ref(PlannerInfo *root, List *scan_var_list,
2840 List **whole_row_lists, Bitmapset *relids)
2841 {
2842 ListCell *lc;
2843 bool has_whole_row = false;
2844 List **wr_list_array = NULL;
2845 int cnt_rt;
2846 List *wr_scan_var_list = NIL;
2847
2848 *whole_row_lists = NIL;
2849
2850 /* Check if there exists at least one whole row reference. */
2851 foreach(lc, scan_var_list)
2852 {
2853 Var *var = (Var *) lfirst(lc);
2854
2855 Assert(IsA(var, Var));
2856
2857 if (var->varattno == 0)
2858 {
2859 has_whole_row = true;
2860 break;
2861 }
2862 }
2863
2864 if (!has_whole_row)
2865 return scan_var_list;
2866
2867 /*
2868 * Allocate large enough memory to hold whole-row Var lists for all the
2869 * relations. This array will then be converted into a list of lists.
2870 * Since all the base relations are marked by range table index, it's easy
2871 * to keep track of the ones whose whole-row references have been taken
2872 * care of.
2873 */
2874 wr_list_array = (List **) palloc0(sizeof(List *) *
2875 list_length(root->parse->rtable));
2876
2877 /* Adjust the whole-row references as described in the prologue. */
2878 foreach(lc, scan_var_list)
2879 {
2880 Var *var = (Var *) lfirst(lc);
2881
2882 Assert(IsA(var, Var));
2883
2884 if (var->varattno == 0 && !wr_list_array[var->varno - 1])
2885 {
2886 List *wr_var_list;
2887 List *retrieved_attrs;
2888 RangeTblEntry *rte = rt_fetch(var->varno, root->parse->rtable);
2889 Bitmapset *attrs_used;
2890
2891 Assert(OidIsValid(rte->relid));
2892
2893 /*
2894 * Get list of Var nodes for all undropped attributes of the base
2895 * relation.
2896 */
2897 attrs_used = bms_make_singleton(0 -
2898 FirstLowInvalidHeapAttributeNumber);
2899
2900 /*
2901 * If the whole-row reference falls on the nullable side of the
2902 * outer join and that side is null in a given result row, the
2903 * whole row reference should be set to NULL. In this case, all
2904 * the columns of that relation will be NULL, but that does not
2905 * help since those columns can be genuinely NULL in a row.
2906 */
2907 wr_var_list =
2908 mysql_build_scan_list_for_baserel(rte->relid, var->varno,
2909 attrs_used,
2910 &retrieved_attrs);
2911 wr_list_array[var->varno - 1] = wr_var_list;
2912 wr_scan_var_list = list_concat_unique(wr_scan_var_list,
2913 wr_var_list);
2914 bms_free(attrs_used);
2915 list_free(retrieved_attrs);
2916 }
2917 else
2918 wr_scan_var_list = list_append_unique(wr_scan_var_list, var);
2919 }
2920
2921 /*
2922 * Collect the required Var node lists into a list of lists ordered by the
2923 * base relations' range table indexes.
2924 */
2925 cnt_rt = -1;
2926 while ((cnt_rt = bms_next_member(relids, cnt_rt)) >= 0)
2927 *whole_row_lists = lappend(*whole_row_lists, wr_list_array[cnt_rt - 1]);
2928
2929 pfree(wr_list_array);
2930 return wr_scan_var_list;
2931 }
2932
2933 /*
2934 * mysql_build_scan_list_for_baserel
2935 * Build list of nodes corresponding to the attributes requested for
2936 * given base relation.
2937 *
2938 * The list contains Var nodes corresponding to the attributes specified in
2939 * attrs_used. If whole-row reference is required, the functions adds Var
2940 * nodes corresponding to all the attributes in the relation.
2941 */
2942 static List *
mysql_build_scan_list_for_baserel(Oid relid,Index varno,Bitmapset * attrs_used,List ** retrieved_attrs)2943 mysql_build_scan_list_for_baserel(Oid relid, Index varno,
2944 Bitmapset *attrs_used,
2945 List **retrieved_attrs)
2946 {
2947 int attno;
2948 List *tlist = NIL;
2949 Node *node;
2950 bool wholerow_requested = false;
2951 Relation relation;
2952 TupleDesc tupdesc;
2953
2954 Assert(OidIsValid(relid));
2955
2956 *retrieved_attrs = NIL;
2957
2958 /* Planner must have taken a lock, so request no lock here */
2959 #if PG_VERSION_NUM < 130000
2960 relation = heap_open(relid, NoLock);
2961 #else
2962 relation = table_open(relid, NoLock);
2963 #endif
2964
2965 tupdesc = RelationGetDescr(relation);
2966
2967 /* Is whole-row reference requested? */
2968 wholerow_requested = bms_is_member(0 - FirstLowInvalidHeapAttributeNumber,
2969 attrs_used);
2970
2971 /* Handle user defined attributes. */
2972 for (attno = 1; attno <= tupdesc->natts; attno++)
2973 {
2974 Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1);
2975
2976 /* Ignore dropped attributes. */
2977 if (attr->attisdropped)
2978 continue;
2979
2980 /*
2981 * For a required attribute create a Var node and add corresponding
2982 * attribute number to the retrieved_attrs list.
2983 */
2984 if (wholerow_requested ||
2985 bms_is_member(attno - FirstLowInvalidHeapAttributeNumber,
2986 attrs_used))
2987 {
2988 node = (Node *) makeVar(varno, attno, attr->atttypid,
2989 attr->atttypmod, attr->attcollation, 0);
2990 tlist = lappend(tlist, node);
2991
2992 *retrieved_attrs = lappend_int(*retrieved_attrs, attno);
2993 }
2994 }
2995
2996 #if PG_VERSION_NUM < 130000
2997 heap_close(relation, NoLock);
2998 #else
2999 table_close(relation, NoLock);
3000 #endif
3001
3002 return tlist;
3003 }
3004 #endif /* PG_VERSION_NUM >= 90600 */
3005
3006 /*
3007 * mysql_build_whole_row_constr_info
3008 * Calculate and save the information required to construct whole row
3009 * references of base foreign relations involved in the pushed down join.
3010 *
3011 * tupdesc is the tuple descriptor describing the result returned by the
3012 * ForeignScan node. It is expected to be same as
3013 * ForeignScanState::ss::ss_ScanTupleSlot, which is constructed using
3014 * fdw_scan_tlist.
3015 *
3016 * relids is the the set of relations participating in the pushed down join.
3017 *
3018 * max_relid is the maximum number of relation index expected.
3019 *
3020 * whole_row_lists is the list of Var node lists constituting the whole-row
3021 * reference for base relations in the relids in the same order.
3022 *
3023 * scan_tlist is the targetlist representing the result fetched from the
3024 * foreign server.
3025 *
3026 * fdw_scan_tlist is the targetlist representing the result returned by the
3027 * ForeignScan node.
3028 */
3029 static void
mysql_build_whole_row_constr_info(MySQLFdwExecState * festate,TupleDesc tupdesc,Bitmapset * relids,int max_relid,List * whole_row_lists,List * scan_tlist,List * fdw_scan_tlist)3030 mysql_build_whole_row_constr_info(MySQLFdwExecState *festate,
3031 TupleDesc tupdesc, Bitmapset *relids,
3032 int max_relid, List *whole_row_lists,
3033 List *scan_tlist, List *fdw_scan_tlist)
3034 {
3035 int cnt_rt;
3036 int cnt_vl;
3037 int cnt_attr;
3038 ListCell *lc;
3039 int *fs_attr_pos = NULL;
3040 MySQLWRState **mysqlwrstates = NULL;
3041 int fs_num_atts;
3042
3043 /*
3044 * Allocate memory to hold whole-row reference state for each relation.
3045 * Indexing by the range table index is faster than maintaining an
3046 * associative map.
3047 */
3048 mysqlwrstates = (MySQLWRState **) palloc0(sizeof(MySQLWRState *) * max_relid);
3049
3050 /*
3051 * Set the whole-row reference state for the relations whose whole-row
3052 * reference needs to be constructed.
3053 */
3054 cnt_rt = -1;
3055 cnt_vl = 0;
3056 while ((cnt_rt = bms_next_member(relids, cnt_rt)) >= 0)
3057 {
3058 MySQLWRState *wr_state = (MySQLWRState *) palloc0(sizeof(MySQLWRState));
3059 List *var_list = list_nth(whole_row_lists, cnt_vl++);
3060 int natts;
3061
3062 /* Skip the relations without whole-row references. */
3063 if (list_length(var_list) <= 0)
3064 continue;
3065
3066 natts = list_length(var_list);
3067 wr_state->attr_pos = (int *) palloc(sizeof(int) * natts);
3068
3069 /*
3070 * Create a map of attributes required for whole-row reference to
3071 * their positions in the result fetched from the foreign server.
3072 */
3073 cnt_attr = 0;
3074 foreach(lc, var_list)
3075 {
3076 Var *var = lfirst(lc);
3077 TargetEntry *tle_sl;
3078
3079 Assert(IsA(var, Var) &&var->varno == cnt_rt);
3080
3081 #if PG_VERSION_NUM >= 100000
3082 tle_sl = tlist_member((Expr *) var, scan_tlist);
3083 #else
3084 tle_sl = tlist_member((Node *) var, scan_tlist);
3085 #endif
3086 Assert(tle_sl);
3087
3088 wr_state->attr_pos[cnt_attr++] = tle_sl->resno - 1;
3089 }
3090 Assert(natts == cnt_attr);
3091
3092 /* Build rest of the state */
3093 wr_state->tupdesc = ExecTypeFromExprList(var_list);
3094 Assert(natts == wr_state->tupdesc->natts);
3095 wr_state->values = (Datum *) palloc(sizeof(Datum) * natts);
3096 wr_state->nulls = (bool *) palloc(sizeof(bool) * natts);
3097 BlessTupleDesc(wr_state->tupdesc);
3098 mysqlwrstates[cnt_rt - 1] = wr_state;
3099 }
3100
3101 /*
3102 * Construct the array mapping columns in the ForeignScan node output to
3103 * their positions in the result fetched from the foreign server. Positive
3104 * values indicate the locations in the result and negative values
3105 * indicate the range table indexes of the base table whose whole-row
3106 * reference values are requested in that place.
3107 */
3108 fs_num_atts = list_length(fdw_scan_tlist);
3109 fs_attr_pos = (int *) palloc(sizeof(int) * fs_num_atts);
3110 cnt_attr = 0;
3111 foreach(lc, fdw_scan_tlist)
3112 {
3113 TargetEntry *tle_fsl = lfirst(lc);
3114 Var *var = (Var *) tle_fsl->expr;
3115
3116 Assert(IsA(var, Var));
3117 if (var->varattno == 0)
3118 fs_attr_pos[cnt_attr] = -var->varno;
3119 else
3120 {
3121 #if PG_VERSION_NUM >= 100000
3122 TargetEntry *tle_sl = tlist_member((Expr *) var, scan_tlist);
3123 #else
3124 TargetEntry *tle_sl = tlist_member((Node *) var, scan_tlist);
3125 #endif
3126
3127 Assert(tle_sl);
3128 fs_attr_pos[cnt_attr] = tle_sl->resno - 1;
3129 }
3130 cnt_attr++;
3131 }
3132
3133 /*
3134 * The tuple descriptor passed in should have same number of attributes as
3135 * the entries in fdw_scan_tlist.
3136 */
3137 Assert(fs_num_atts == tupdesc->natts);
3138
3139 festate->mysqlwrstates = mysqlwrstates;
3140 festate->wr_attrs_pos = fs_attr_pos;
3141 festate->wr_tupdesc = tupdesc;
3142 festate->wr_values = (Datum *) palloc(sizeof(Datum) * tupdesc->natts);
3143 festate->wr_nulls = (bool *) palloc(sizeof(bool) * tupdesc->natts);
3144
3145 return;
3146 }
3147
3148 /*
3149 * mysql_get_tuple_with_whole_row
3150 * Construct the result row with whole-row references.
3151 */
3152 static HeapTuple
mysql_get_tuple_with_whole_row(MySQLFdwExecState * festate,Datum * values,bool * nulls)3153 mysql_get_tuple_with_whole_row(MySQLFdwExecState *festate, Datum *values,
3154 bool *nulls)
3155 {
3156 TupleDesc tupdesc = festate->wr_tupdesc;
3157 Datum *wr_values = festate->wr_values;
3158 bool *wr_nulls = festate->wr_nulls;
3159 int cnt_attr;
3160 HeapTuple tuple = NULL;
3161
3162 for (cnt_attr = 0; cnt_attr < tupdesc->natts; cnt_attr++)
3163 {
3164 int attr_pos = festate->wr_attrs_pos[cnt_attr];
3165
3166 if (attr_pos >= 0)
3167 {
3168 wr_values[cnt_attr] = values[attr_pos];
3169 wr_nulls[cnt_attr] = nulls[attr_pos];
3170 }
3171 else
3172 {
3173 /*
3174 * The RTI of relation whose whole row reference is to be
3175 * constructed is stored as -ve attr_pos.
3176 */
3177 MySQLWRState *wr_state = festate->mysqlwrstates[-attr_pos - 1];
3178
3179 wr_nulls[cnt_attr] = nulls[wr_state->wr_null_ind_pos];
3180 if (!wr_nulls[cnt_attr])
3181 {
3182 HeapTuple wr_tuple = mysql_form_whole_row(wr_state,
3183 values,
3184 nulls);
3185
3186 wr_values[cnt_attr] = HeapTupleGetDatum(wr_tuple);
3187 }
3188 }
3189 }
3190
3191 tuple = heap_form_tuple(tupdesc, wr_values, wr_nulls);
3192 return tuple;
3193 }
3194
3195 /*
3196 * mysql_form_whole_row
3197 * The function constructs whole-row reference for a base relation
3198 * with the information given in wr_state.
3199 *
3200 * wr_state contains the information about which attributes from values and
3201 * nulls are to be used and in which order to construct the whole-row
3202 * reference.
3203 */
3204 static HeapTuple
mysql_form_whole_row(MySQLWRState * wr_state,Datum * values,bool * nulls)3205 mysql_form_whole_row(MySQLWRState *wr_state, Datum *values, bool *nulls)
3206 {
3207 int cnt_attr;
3208
3209 for (cnt_attr = 0; cnt_attr < wr_state->tupdesc->natts; cnt_attr++)
3210 {
3211 int attr_pos = wr_state->attr_pos[cnt_attr];
3212
3213 wr_state->values[cnt_attr] = values[attr_pos];
3214 wr_state->nulls[cnt_attr] = nulls[attr_pos];
3215 }
3216 return heap_form_tuple(wr_state->tupdesc, wr_state->values,
3217 wr_state->nulls);
3218 }
3219