1 /*------------------------------------------------------------------
2 *
3 * Foreign data wrapper for TDS (Sybase and Microsoft SQL Server)
4 *
5 * Author: Geoff Montee
6 * Name: tds_fdw
7 * File: tds_fdw/src/tds_fdw.c
8 *
9 * Description:
10 * This is a PostgreSQL foreign data wrapper for use to connect to databases that use TDS,
11 * such as Sybase databases and Microsoft SQL server.
12 *
13 * This foreign data wrapper requires requires a library that uses the DB-Library interface,
14 * such as FreeTDS (http://www.freetds.org/). This has been tested with FreeTDS, but not
15 * the proprietary implementations of DB-Library.
16 *----------------------------------------------------------------------------
17 */
18
19
20
21 #include <stdio.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25
26 /* Override PGDLLEXPORT for visibility */
27
28 #include "visibility.h"
29
30 /* postgres headers */
31
32 #include "postgres.h"
33 #include "funcapi.h"
34 #include "access/reloptions.h"
35 #include "catalog/pg_foreign_server.h"
36 #include "catalog/pg_foreign_table.h"
37 #include "catalog/pg_user_mapping.h"
38 #include "catalog/pg_type.h"
39 #include "commands/defrem.h"
40 #include "commands/explain.h"
41 #include "foreign/fdwapi.h"
42 #include "foreign/foreign.h"
43 #include "miscadmin.h"
44 #include "mb/pg_wchar.h"
45 #include "optimizer/cost.h"
46 #include "optimizer/paths.h"
47 #include "optimizer/prep.h"
48 #if (PG_VERSION_NUM < 120000)
49 #include "optimizer/var.h"
50 #else
51 #include "optimizer/optimizer.h"
52 #endif
53 #include "storage/fd.h"
54 #include "utils/array.h"
55 #include "utils/builtins.h"
56 #include "utils/rel.h"
57 #include "utils/memutils.h"
58 #include "utils/guc.h"
59 #include "utils/timestamp.h"
60
61 #if (PG_VERSION_NUM >= 90300)
62 #include "access/htup_details.h"
63 #else
64 #include "access/htup.h"
65 #endif
66
67 #include "optimizer/pathnode.h"
68 #include "optimizer/restrictinfo.h"
69 #include "optimizer/planmain.h"
70
71 /* DB-Library headers (e.g. FreeTDS */
72 #include <sybfront.h>
73 #include <sybdb.h>
74
75 /* #define DEBUG */
76
77 PG_MODULE_MAGIC;
78
79 #include "tds_fdw.h"
80 #include "options.h"
81 #include "deparse.h"
82
83 /* run on module load */
84
85 extern PGDLLEXPORT void _PG_init(void);
86
87 static const bool DEFAULT_SHOW_FINISHED_MEMORY_STATS = false;
88 static bool show_finished_memory_stats = false;
89
90 static const bool DEFAULT_SHOW_BEFORE_ROW_MEMORY_STATS = false;
91 static bool show_before_row_memory_stats = false;
92
93 static const bool DEFAULT_SHOW_AFTER_ROW_MEMORY_STATS = false;
94 static bool show_after_row_memory_stats = false;
95
96 static const double DEFAULT_FDW_SORT_MULTIPLIER=1.2;
97
98 /* error handling */
99
100 static char* last_error_message = NULL;
101
102 static int tds_err_capture(DBPROCESS *dbproc, int severity, int dberr, int oserr, char *dberrstr, char *oserrstr);
103 static char *tds_err_msg(int severity, int dberr, int oserr, char *dberrstr, char *oserrstr);
104
105 /*
106 * Indexes of FDW-private information stored in fdw_private lists.
107 *
108 * We store various information in ForeignScan.fdw_private to pass it from
109 * planner to executor. Currently we store:
110 *
111 * 1) SELECT statement text to be sent to the remote server
112 * 2) Integer list of attribute numbers retrieved by the SELECT
113 *
114 * These items are indexed with the enum FdwScanPrivateIndex, so an item
115 * can be fetched with list_nth(). For example, to get the SELECT statement:
116 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
117 */
118 enum FdwScanPrivateIndex
119 {
120 /* SQL statement to execute remotely (as a String node) */
121 FdwScanPrivateSelectSql,
122 /* Integer list of attribute numbers retrieved by the SELECT */
123 FdwScanPrivateRetrievedAttrs
124 };
125
126 PG_FUNCTION_INFO_V1(tds_fdw_handler);
127 PG_FUNCTION_INFO_V1(tds_fdw_validator);
128
tds_fdw_handler(PG_FUNCTION_ARGS)129 PGDLLEXPORT Datum tds_fdw_handler(PG_FUNCTION_ARGS)
130 {
131 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
132
133 #ifdef DEBUG
134 ereport(NOTICE,
135 (errmsg("----> starting tds_fdw_handler")
136 ));
137 #endif
138
139 #if (PG_VERSION_NUM >= 90200)
140 fdwroutine->GetForeignRelSize = tdsGetForeignRelSize;
141 fdwroutine->GetForeignPaths = tdsGetForeignPaths;
142 fdwroutine->AnalyzeForeignTable = tdsAnalyzeForeignTable;
143 fdwroutine->GetForeignPlan = tdsGetForeignPlan;
144 #else
145 fdwroutine->PlanForeignScan = tdsPlanForeignScan;
146 #endif
147
148 fdwroutine->ExplainForeignScan = tdsExplainForeignScan;
149 fdwroutine->BeginForeignScan = tdsBeginForeignScan;
150 fdwroutine->IterateForeignScan = tdsIterateForeignScan;
151 fdwroutine->ReScanForeignScan = tdsReScanForeignScan;
152 fdwroutine->EndForeignScan = tdsEndForeignScan;
153
154 #ifdef IMPORT_API
155 fdwroutine->ImportForeignSchema = tdsImportForeignSchema;
156 #endif /* IMPORT_API */
157
158 #ifdef DEBUG
159 ereport(NOTICE,
160 (errmsg("----> finishing tds_fdw_handler")
161 ));
162 #endif
163
164 PG_RETURN_POINTER(fdwroutine);
165 }
166
tds_fdw_validator(PG_FUNCTION_ARGS)167 PGDLLEXPORT Datum tds_fdw_validator(PG_FUNCTION_ARGS)
168 {
169 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
170 Oid catalog = PG_GETARG_OID(1);
171 TdsFdwOptionSet option_set;
172
173 #ifdef DEBUG
174 ereport(NOTICE,
175 (errmsg("----> starting tds_fdw_validator")
176 ));
177 #endif
178
179 tdsValidateOptions(options_list, catalog, &option_set);
180
181 #ifdef DEBUG
182 ereport(NOTICE,
183 (errmsg("----> finishing tds_fdw_validator")
184 ));
185 #endif
186
187 PG_RETURN_VOID();
188 }
189
_PG_init(void)190 void _PG_init(void)
191 {
192 DefineCustomBoolVariable("tds_fdw.show_finished_memory_stats",
193 "Show finished memory stats",
194 "Set to true to show memory stats after a query finishes",
195 &show_finished_memory_stats,
196 DEFAULT_SHOW_FINISHED_MEMORY_STATS,
197 PGC_SUSET,
198 0,
199 NULL,
200 NULL,
201 NULL);
202
203 DefineCustomBoolVariable("tds_fdw.show_before_row_memory_stats",
204 "Show before row memory stats",
205 "Set to true to show memory stats before fetching each row",
206 &show_before_row_memory_stats,
207 DEFAULT_SHOW_BEFORE_ROW_MEMORY_STATS,
208 PGC_SUSET,
209 0,
210 NULL,
211 NULL,
212 NULL);
213
214 DefineCustomBoolVariable("tds_fdw.show_after_row_memory_stats",
215 "Show after row memory stats",
216 "Set to true to show memory stats after fetching each row",
217 &show_after_row_memory_stats,
218 DEFAULT_SHOW_AFTER_ROW_MEMORY_STATS,
219 PGC_SUSET,
220 0,
221 NULL,
222 NULL,
223 NULL);
224 }
225
226 /*
227 * Find an equivalence class member expression, all of whose Vars, come from
228 * the indicated relation.
229 */
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)230 Expr * find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
231 {
232 ListCell *lc_em;
233
234 foreach(lc_em, ec->ec_members)
235 {
236 EquivalenceMember *em = lfirst(lc_em);
237
238 if (bms_equal(em->em_relids, rel->relids))
239 {
240 /*
241 * If there is more than one equivalence member whose Vars are
242 * taken entirely from this relation, we'll be content to choose
243 * any one of those.
244 */
245 return em->em_expr;
246 }
247 }
248
249 /* We didn't find any suitable equivalence class expression */
250 return NULL;
251 }
252
253 /* This is used for JOIN pushdowns, so it is only needed on 9.5+ */
254 #if (PG_VERSION_NUM >= 90500)
255 /*
256 * Detect whether we want to process an EquivalenceClass member.
257 *
258 * This is a callback for use by generate_implied_equalities_for_column.
259 */
260 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)261 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
262 EquivalenceClass *ec, EquivalenceMember *em,
263 void *arg)
264 {
265 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
266 Expr *expr = em->em_expr;
267
268 /*
269 * If we've identified what we're processing in the current scan, we only
270 * want to match that expression.
271 */
272 if (state->current != NULL)
273 return equal(expr, state->current);
274
275 /*
276 * Otherwise, ignore anything we've already processed.
277 */
278 if (list_member(state->already_used, expr))
279 return false;
280
281 /* This is the new target to process. */
282 state->current = expr;
283 return true;
284 }
285 #endif
286
287 /* build query that gets sent to remote server */
288
tdsBuildForeignQuery(PlannerInfo * root,RelOptInfo * baserel,TdsFdwOptionSet * option_set,Bitmapset * attrs_used,List ** retrieved_attrs,List * remote_conds,List * remote_join_conds,List * pathkeys)289 void tdsBuildForeignQuery(PlannerInfo *root, RelOptInfo *baserel, TdsFdwOptionSet* option_set,
290 Bitmapset* attrs_used, List** retrieved_attrs, List* remote_conds, List* remote_join_conds,
291 List* pathkeys)
292 {
293 #ifdef DEBUG
294 ereport(NOTICE,
295 (errmsg("----> starting tdsBuildForeignQuery")
296 ));
297 #endif
298
299 ereport(DEBUG3,
300 (errmsg("tds_fdw: Getting query")
301 ));
302
303 if (option_set->query)
304 {
305 ereport(DEBUG3,
306 (errmsg("tds_fdw: Query is explicitly set")
307 ));
308
309 if (option_set->match_column_names)
310 {
311 /* do this, so that retrieved_attrs is filled in */
312
313 StringInfoData sql;
314
315 initStringInfo(&sql);
316 deparseSelectSql(&sql, root, baserel, attrs_used,
317 retrieved_attrs, option_set);
318 }
319 }
320
321 else
322 {
323 StringInfoData sql;
324
325 initStringInfo(&sql);
326 deparseSelectSql(&sql, root, baserel, attrs_used,
327 retrieved_attrs, option_set);
328 if (remote_conds)
329 appendWhereClause(&sql, root, baserel, remote_conds,
330 true, NULL);
331 if (remote_join_conds)
332 appendWhereClause(&sql, root, baserel, remote_join_conds,
333 (remote_conds == NIL), NULL);
334
335 if (pathkeys)
336 appendOrderByClause(&sql, root, baserel, pathkeys);
337
338 /*
339 * Add FOR UPDATE/SHARE if appropriate. We apply locking during the
340 * initial row fetch, rather than later on as is done for local tables.
341 * The extra roundtrips involved in trying to duplicate the local
342 * semantics exactly don't seem worthwhile (see also comments for
343 * RowMarkType).
344 *
345 * Note: because we actually run the query as a cursor, this assumes that
346 * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
347 */
348 if (baserel->relid == root->parse->resultRelation &&
349 (root->parse->commandType == CMD_UPDATE ||
350 root->parse->commandType == CMD_DELETE))
351 {
352 /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
353 appendStringInfoString(&sql, " FOR UPDATE");
354 }
355 #if (PG_VERSION_NUM >= 90500)
356 else
357 {
358 PlanRowMark *rc = get_plan_rowmark(root->rowMarks, baserel->relid);
359
360 if (rc)
361 {
362 /*
363 * Relation is specified as a FOR UPDATE/SHARE target, so handle
364 * that. (But we could also see LCS_NONE, meaning this isn't a
365 * target relation after all.)
366 *
367 * For now, just ignore any [NO] KEY specification, since (a) it's
368 * not clear what that means for a remote table that we don't have
369 * complete information about, and (b) it wouldn't work anyway on
370 * older remote servers. Likewise, we don't worry about NOWAIT.
371 */
372 switch (rc->strength)
373 {
374 case LCS_NONE:
375 /* No locking needed */
376 break;
377 case LCS_FORKEYSHARE:
378 case LCS_FORSHARE:
379 appendStringInfoString(&sql, " FOR SHARE");
380 break;
381 case LCS_FORNOKEYUPDATE:
382 case LCS_FORUPDATE:
383 appendStringInfoString(&sql, " FOR UPDATE");
384 break;
385 }
386 }
387 }
388 #endif
389
390 /* now copy it to option_set->query */
391
392 if ((option_set->query = palloc((sql.len + 1) * sizeof(char))) == NULL)
393 {
394 ereport(ERROR,
395 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
396 errmsg("Failed to allocate memory for query")
397 ));
398 }
399
400 strcpy(option_set->query, sql.data);
401 }
402
403 ereport(DEBUG3,
404 (errmsg("tds_fdw: Value of query is %s", option_set->query)
405 ));
406
407 #ifdef DEBUG
408 ereport(NOTICE,
409 (errmsg("----> finishing tdsBuildForeignQuery")
410 ));
411 #endif
412 }
413
414
415 /* set up connection */
416
tdsSetupConnection(TdsFdwOptionSet * option_set,LOGINREC * login,DBPROCESS ** dbproc)417 int tdsSetupConnection(TdsFdwOptionSet* option_set, LOGINREC *login, DBPROCESS **dbproc)
418 {
419 char *servers;
420 RETCODE erc;
421
422 #ifdef DEBUG
423 ereport(NOTICE,
424 (errmsg("----> starting tdsSetupConnection")
425 ));
426 #endif
427
428 ereport(DEBUG3,
429 (errmsg("tds_fdw: Setting login user to %s", option_set->username)
430 ));
431
432 DBSETLUSER(login, option_set->username);
433
434 ereport(DEBUG3,
435 (errmsg("tds_fdw: Setting login password to %s", option_set->password)
436 ));
437
438 DBSETLPWD(login, option_set->password);
439
440 if (option_set->character_set)
441 {
442 ereport(DEBUG3,
443 (errmsg("tds_fdw: Setting login character set to %s", option_set->character_set)
444 ));
445
446 DBSETLCHARSET(login, option_set->character_set);
447 }
448
449 if (option_set->language)
450 {
451 DBSETLNATLANG(login, option_set->language);
452
453 ereport(DEBUG3,
454 (errmsg("tds_fdw: Setting login language to %s", option_set->language)
455 ));
456 }
457
458 if (option_set->tds_version)
459 {
460 BYTE tds_version = DBVERSION_UNKNOWN;
461
462 if (strcmp(option_set->tds_version, "4.2") == 0)
463 {
464 tds_version = DBVER42;
465 }
466
467 else if (strcmp(option_set->tds_version, "5.0") == 0)
468 {
469 tds_version = DBVERSION_100;
470 }
471
472 else if (strcmp(option_set->tds_version, "7.0") == 0)
473 {
474 tds_version = DBVER60;
475 }
476
477 #ifdef DBVERSION_71
478 else if (strcmp(option_set->tds_version, "7.1") == 0)
479 {
480 tds_version = DBVERSION_71;
481 }
482 #endif
483
484 #ifdef DBVERSION_72
485 else if (strcmp(option_set->tds_version, "7.2") == 0)
486 {
487 tds_version = DBVERSION_72;
488 }
489 #endif
490
491 #ifdef DBVERSION_73
492 else if (strcmp(option_set->tds_version, "7.3") == 0)
493 {
494 tds_version = DBVERSION_73;
495 }
496 #endif
497
498 #ifdef DBVERSION_74
499 else if (strcmp(option_set->tds_version, "7.4") == 0)
500 {
501 tds_version = DBVERSION_74;
502 }
503 #endif
504
505 if (tds_version == DBVERSION_UNKNOWN)
506 {
507 ereport(ERROR,
508 (errcode(ERRCODE_SYNTAX_ERROR),
509 errmsg("Unknown tds version: %s.", option_set->tds_version)
510 ));
511 }
512
513 dbsetlversion(login, tds_version);
514
515 ereport(DEBUG3,
516 (errmsg("tds_fdw: Setting login tds version to %s", option_set->tds_version)
517 ));
518 }
519
520 if (option_set->database && !option_set->dbuse)
521 {
522 DBSETLDBNAME(login, option_set->database);
523
524 ereport(DEBUG3,
525 (errmsg("tds_fdw: Setting login database to %s", option_set->database)
526 ));
527 }
528
529 /* set an error handler that does not abort */
530 dberrhandle(tds_err_capture);
531
532 /* try all server names until we find a good one */
533 servers = option_set->servername;
534 last_error_message = NULL;
535 while (servers != NULL)
536 {
537 /* find the length of the next server name */
538 char *next_server = strchr(servers, ',');
539 int server_len = next_server == NULL ? strlen(servers) : next_server - servers;
540
541 /* construct a connect string */
542 char *conn_string = palloc(server_len + 10);
543 strncpy(conn_string, servers, server_len);
544 if (option_set->port)
545 sprintf(conn_string + server_len, ":%i", option_set->port);
546 else
547 conn_string[server_len] = '\0';
548
549 ereport(DEBUG3,
550 (errmsg("tds_fdw: Connection string is %s", conn_string)
551 ));
552 ereport(DEBUG3,
553 (errmsg("tds_fdw: Connecting to server")
554 ));
555
556 /* try to connect */
557 if ((*dbproc = dbopen(login, conn_string)) == NULL)
558 {
559 /* failure, will continue with the next server */
560 ereport(DEBUG3,
561 (errmsg("Failed to connect using connection string %s with user %s", conn_string, option_set->username)
562 ));
563
564 pfree(conn_string);
565 }
566 else
567 {
568 /* success, break the loop */
569 ereport(DEBUG3,
570 (errmsg("tds_fdw: Connected successfully")
571 ));
572
573 pfree(conn_string);
574 break;
575 }
576
577 /* skip the comma if appropriate */
578 servers = next_server ? next_server + 1 : NULL;
579 }
580
581 /* report an error if all connections fail */
582 if (*dbproc == NULL)
583 {
584 if (last_error_message)
585 ereport(ERROR,
586 (errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION),
587 errmsg("%s", last_error_message)
588 ));
589 else
590 ereport(ERROR,
591 (errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION),
592 errmsg("Failed to connect to server %s with user %s", option_set->servername, option_set->username)
593 ));
594 }
595
596 /* set the normal error handler again */
597 dberrhandle(tds_err_handler);
598
599 if (option_set->database && option_set->dbuse)
600 {
601 ereport(DEBUG3,
602 (errmsg("tds_fdw: Selecting database %s", option_set->database)
603 ));
604
605 if ((erc = dbuse(*dbproc, option_set->database)) == FAIL)
606 {
607 ereport(ERROR,
608 (errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION),
609 errmsg("Failed to select database %s", option_set->database)
610 ));
611 }
612
613 ereport(DEBUG3,
614 (errmsg("tds_fdw: Selected database")
615 ));
616 }
617
618 #ifdef DEBUG
619 ereport(NOTICE,
620 (errmsg("----> finishing tdsSetupConnection")
621 ));
622 #endif
623
624 return 0;
625 }
626
tdsGetRowCountShowPlanAll(TdsFdwOptionSet * option_set,LOGINREC * login,DBPROCESS * dbproc)627 double tdsGetRowCountShowPlanAll(TdsFdwOptionSet* option_set, LOGINREC *login, DBPROCESS *dbproc)
628 {
629 double rows = 0;
630 RETCODE erc;
631 int ret_code;
632 char* show_plan_query = "SET SHOWPLAN_ALL ON";
633 char* show_plan_query_off = "SET SHOWPLAN_ALL OFF";
634
635 #ifdef DEBUG
636 ereport(NOTICE,
637 (errmsg("----> starting tdsGetRowCountShowPlanAll")
638 ));
639 #endif
640
641 ereport(DEBUG3,
642 (errmsg("tds_fdw: Setting database command to %s", show_plan_query)
643 ));
644
645 if ((erc = dbcmd(dbproc, show_plan_query)) == FAIL)
646 {
647 ereport(ERROR,
648 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
649 errmsg("Failed to set current query to %s", show_plan_query)
650 ));
651 }
652
653 ereport(DEBUG3,
654 (errmsg("tds_fdw: Executing the query")
655 ));
656
657 if ((erc = dbsqlexec(dbproc)) == FAIL)
658 {
659 ereport(ERROR,
660 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
661 errmsg("Failed to execute query %s", show_plan_query)
662 ));
663 }
664
665 ereport(DEBUG3,
666 (errmsg("tds_fdw: Query executed correctly")
667 ));
668 ereport(DEBUG3,
669 (errmsg("tds_fdw: Getting results")
670 ));
671
672 erc = dbresults(dbproc);
673
674 if (erc == FAIL)
675 {
676 ereport(ERROR,
677 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
678 errmsg("Failed to get results from query %s", show_plan_query)
679 ));
680 }
681
682 ereport(DEBUG3,
683 (errmsg("tds_fdw: Setting database command to %s", option_set->query)
684 ));
685
686 if ((erc = dbcmd(dbproc, option_set->query)) == FAIL)
687 {
688 ereport(ERROR,
689 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
690 errmsg("Failed to set current query to %s", option_set->query)
691 ));
692 }
693
694 ereport(DEBUG3,
695 (errmsg("tds_fdw: Executing the query")
696 ));
697
698 if ((erc = dbsqlexec(dbproc)) == FAIL)
699 {
700 ereport(ERROR,
701 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
702 errmsg("Failed to execute query %s", option_set->query)
703 ));
704 }
705
706 ereport(DEBUG3,
707 (errmsg("tds_fdw: Query executed correctly")
708 ));
709 ereport(DEBUG3,
710 (errmsg("tds_fdw: Getting results")
711 ));
712
713 erc = dbresults(dbproc);
714
715 if (erc == FAIL)
716 {
717 ereport(ERROR,
718 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
719 errmsg("Failed to get results from query %s", option_set->query)
720 ));
721 }
722
723 else if (erc == NO_MORE_RESULTS)
724 {
725 ereport(DEBUG3,
726 (errmsg("tds_fdw: There appears to be no results from query %s", option_set->query)
727 ));
728
729 goto cleanup_after_show_plan;
730 }
731
732 else if (erc == SUCCEED)
733 {
734 int ncol;
735 int ncols;
736 int parent = 0;
737 double estimate_rows = 0;
738
739 ncols = dbnumcols(dbproc);
740
741 ereport(DEBUG3,
742 (errmsg("tds_fdw: %i columns", ncols)
743 ));
744
745 for (ncol = 0; ncol < ncols; ncol++)
746 {
747 char *col_name;
748
749 col_name = dbcolname(dbproc, ncol + 1);
750
751 if (strcmp(col_name, "Parent") == 0)
752 {
753 ereport(DEBUG3,
754 (errmsg("tds_fdw: Binding column %s (%i)", col_name, ncol + 1)
755 ));
756
757 erc = dbbind(dbproc, ncol + 1, INTBIND, sizeof(int), (BYTE *)&parent);
758
759 if (erc == FAIL)
760 {
761 ereport(ERROR,
762 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
763 errmsg("Failed to bind results for column %s to a variable.", col_name)
764 ));
765 }
766 }
767
768 if (strcmp(col_name, "EstimateRows") == 0)
769 {
770 ereport(DEBUG3,
771 (errmsg("tds_fdw: Binding column %s (%i)", col_name, ncol + 1)
772 ));
773
774 erc = dbbind(dbproc, ncol + 1, FLT8BIND, sizeof(double), (BYTE *)&estimate_rows);
775
776 if (erc == FAIL)
777 {
778 ereport(ERROR,
779 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
780 errmsg("Failed to bind results for column %s to a variable.", col_name)
781 ));
782 }
783 }
784 }
785
786 ereport(DEBUG3,
787 (errmsg("tds_fdw: Successfully got results")
788 ));
789
790 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
791 {
792 switch (ret_code)
793 {
794 case REG_ROW:
795
796 ereport(DEBUG3,
797 (errmsg("tds_fdw: Parent is %i. EstimateRows is %g.", parent, estimate_rows)
798 ));
799
800 if (parent == 0)
801 {
802 rows += estimate_rows;
803 }
804
805 break;
806
807 case BUF_FULL:
808 ereport(ERROR,
809 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
810 errmsg("Buffer filled up while getting plan for query")
811 ));
812
813 case FAIL:
814 ereport(ERROR,
815 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
816 errmsg("Failed to get row while getting plan for query")
817 ));
818
819 default:
820 ereport(ERROR,
821 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
822 errmsg("Failed to get plan for query. Unknown return code.")
823 ));
824 }
825 }
826
827 ereport(DEBUG3,
828 (errmsg("tds_fdw: We estimated %g rows.", rows)
829 ));
830 }
831
832 else
833 {
834 ereport(ERROR,
835 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
836 errmsg("Unknown return code getting results from query %s", option_set->query)
837 ));
838 }
839
840 cleanup_after_show_plan:
841 ereport(DEBUG3,
842 (errmsg("tds_fdw: Setting database command to %s", show_plan_query_off)
843 ));
844
845 if ((erc = dbcmd(dbproc, show_plan_query_off)) == FAIL)
846 {
847 ereport(ERROR,
848 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
849 errmsg("Failed to set current query to %s", show_plan_query_off)
850 ));
851 }
852
853 ereport(DEBUG3,
854 (errmsg("tds_fdw: Executing the query")
855 ));
856
857 if ((erc = dbsqlexec(dbproc)) == FAIL)
858 {
859 ereport(ERROR,
860 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
861 errmsg("Failed to execute query %s", show_plan_query_off)
862 ));
863 }
864
865 ereport(DEBUG3,
866 (errmsg("tds_fdw: Query executed correctly")
867 ));
868 ereport(DEBUG3,
869 (errmsg("tds_fdw: Getting results")
870 ));
871
872 erc = dbresults(dbproc);
873
874 if (erc == FAIL)
875 {
876 ereport(ERROR,
877 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
878 errmsg("Failed to get results from query %s", show_plan_query)
879 ));
880 }
881
882 #ifdef DEBUG
883 ereport(NOTICE,
884 (errmsg("----> finishing tdsGetRowCountShowPlanAll")
885 ));
886 #endif
887
888
889 return rows;
890 }
891
892 /* get the number of rows returned by a query */
893
tdsGetRowCountExecute(TdsFdwOptionSet * option_set,LOGINREC * login,DBPROCESS * dbproc)894 double tdsGetRowCountExecute(TdsFdwOptionSet* option_set, LOGINREC *login, DBPROCESS *dbproc)
895 {
896 int rows_report = 0;
897 long long int rows_increment = 0;
898 RETCODE erc;
899 int ret_code;
900 int iscount = 0;
901
902 #ifdef DEBUG
903 ereport(NOTICE,
904 (errmsg("----> starting tdsGetRowCountExecute")
905 ));
906 #endif
907
908 ereport(DEBUG3,
909 (errmsg("tds_fdw: Setting database command to %s", option_set->query)
910 ));
911
912 if ((erc = dbcmd(dbproc, option_set->query)) == FAIL)
913 {
914 ereport(ERROR,
915 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
916 errmsg("Failed to set current query to %s", option_set->query)
917 ));
918 }
919
920 ereport(DEBUG3,
921 (errmsg("tds_fdw: Executing the query")
922 ));
923
924 if ((erc = dbsqlexec(dbproc)) == FAIL)
925 {
926 ereport(ERROR,
927 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
928 errmsg("Failed to execute query %s", option_set->query)
929 ));
930 }
931
932 ereport(NOTICE,
933 (errmsg("tds_fdw: Query executed correctly")
934 ));
935 ereport(NOTICE,
936 (errmsg("tds_fdw: Getting results")
937 ));
938
939 erc = dbresults(dbproc);
940
941 if (erc == FAIL)
942 {
943 ereport(ERROR,
944 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
945 errmsg("Failed to get results from query %s", option_set->query)
946 ));
947 }
948
949 else if (erc == NO_MORE_RESULTS)
950 {
951 ereport(DEBUG3,
952 (errmsg("tds_fdw: There appears to be no results from query %s", option_set->query)
953 ));
954
955 goto cleanup;
956 }
957
958 else if (erc == SUCCEED)
959 {
960 ereport(DEBUG3,
961 (errmsg("tds_fdw: Successfully got results")
962 ));
963
964 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
965 {
966 switch (ret_code)
967 {
968 case REG_ROW:
969 rows_increment++;
970 break;
971
972 case BUF_FULL:
973 ereport(ERROR,
974 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
975 errmsg("Buffer filled up while getting plan for query")
976 ));
977
978 case FAIL:
979 ereport(ERROR,
980 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
981 errmsg("Failed to get row while getting plan for query")
982 ));
983
984 default:
985 ereport(ERROR,
986 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
987 errmsg("Failed to get plan for query. Unknown return code.")
988 ));
989 }
990 }
991
992 rows_report = DBCOUNT(dbproc);
993 iscount = dbiscount(dbproc);
994
995 ereport(DEBUG3,
996 (errmsg("tds_fdw: We counted %lli rows, and dbcount says %i rows.", rows_increment, rows_report)
997 ));
998 ereport(DEBUG3,
999 (errmsg("tds_fdw: dbiscount says %i.", iscount)
1000 ));
1001 }
1002
1003 else
1004 {
1005 ereport(ERROR,
1006 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1007 errmsg("Unknown return code getting results from query %s", option_set->query)
1008 ));
1009 }
1010
1011 cleanup:
1012 #ifdef DEBUG
1013 ereport(NOTICE,
1014 (errmsg("----> finishing tdsGetRowCountExecute")
1015 ));
1016 #endif
1017
1018 if (iscount)
1019 {
1020 return rows_report;
1021 }
1022
1023 else
1024 {
1025 return rows_increment;
1026 }
1027 }
1028
tdsGetRowCount(TdsFdwOptionSet * option_set,LOGINREC * login,DBPROCESS * dbproc)1029 double tdsGetRowCount(TdsFdwOptionSet* option_set, LOGINREC *login, DBPROCESS *dbproc)
1030 {
1031 double rows = 0;
1032
1033 #ifdef DEBUG
1034 ereport(NOTICE,
1035 (errmsg("----> starting tdsGetRowCount")
1036 ));
1037 #endif
1038
1039 if (strcmp(option_set->row_estimate_method, "execute") == 0)
1040 {
1041 rows = tdsGetRowCountExecute(option_set, login, dbproc);
1042 }
1043
1044 else if (strcmp(option_set->row_estimate_method, "showplan_all") == 0)
1045 {
1046 rows = tdsGetRowCountShowPlanAll(option_set, login, dbproc);
1047 }
1048
1049 #ifdef DEBUG
1050 ereport(NOTICE,
1051 (errmsg("----> finishing tdsGetRowCount")
1052 ));
1053 #endif
1054
1055 return rows;
1056 }
1057
1058 /* get the startup cost for the query */
1059
tdsGetStartupCost(TdsFdwOptionSet * option_set)1060 double tdsGetStartupCost(TdsFdwOptionSet* option_set)
1061 {
1062 double startup_cost;
1063
1064 #ifdef DEBUG
1065 ereport(NOTICE,
1066 (errmsg("----> starting tdsGetStartupCost")
1067 ));
1068 #endif
1069
1070 if (strcmp(option_set->servername, "127.0.0.1") == 0 || strcmp(option_set->servername, "localhost") == 0)
1071 startup_cost = 0;
1072 else
1073 startup_cost = 25;
1074
1075 #ifdef DEBUG
1076 ereport(NOTICE,
1077 (errmsg("----> finishing tdsGetStartupCost")
1078 ));
1079 #endif
1080
1081 return startup_cost;
1082 }
1083
1084 #if (PG_VERSION_NUM >= 90400)
tdsDatetimeToDatum(DBPROCESS * dbproc,DBDATETIME * src,Datum * datetime_out)1085 int tdsDatetimeToDatum(DBPROCESS *dbproc, DBDATETIME *src, Datum *datetime_out)
1086 {
1087 DBDATEREC datetime_in;
1088 RETCODE erc = dbdatecrack(dbproc, &datetime_in, src);
1089
1090 if (erc == SUCCEED)
1091 {
1092 float8 seconds;
1093
1094 #ifdef MSDBLIB
1095 seconds = (float8)datetime_in.second + ((float8)datetime_in.millisecond/1000);
1096
1097 ereport(DEBUG3,
1098 (errmsg("tds_fdw: Datetime value: year=%i, month=%i, day=%i, hour=%i, minute=%i, second=%i, millisecond=%i, timezone=%i,",
1099 datetime_in.year, datetime_in.month, datetime_in.day,
1100 datetime_in.hour, datetime_in.minute, datetime_in.second,
1101 datetime_in.millisecond, datetime_in.tzone)
1102 ));
1103 ereport(DEBUG3,
1104 (errmsg("tds_fdw: Seconds=%f", seconds)
1105 ));
1106
1107 *datetime_out = DirectFunctionCall6(make_timestamp,
1108 Int64GetDatum(datetime_in.year), Int64GetDatum(datetime_in.month), Int64GetDatum(datetime_in.day),
1109 Int64GetDatum(datetime_in.hour), Int64GetDatum(datetime_in.minute), Float8GetDatum(seconds));
1110 #else
1111 seconds = (float8)datetime_in.datesecond + ((float8)datetime_in.datemsecond/1000);
1112
1113 ereport(DEBUG3,
1114 (errmsg("tds_fdw: Datetime value: year=%i, month=%i, day=%i, hour=%i, minute=%i, second=%i, millisecond=%i, timezone=%i,",
1115 datetime_in.dateyear, datetime_in.datemonth + 1, datetime_in.datedmonth,
1116 datetime_in.datehour, datetime_in.dateminute, datetime_in.datesecond,
1117 datetime_in.datemsecond, datetime_in.datetzone)
1118 ));
1119 ereport(DEBUG3,
1120 (errmsg("tds_fdw: Seconds=%f", seconds)
1121 ));
1122
1123 /* Sybase uses different field names, and it uses 0-11 for the month */
1124 *datetime_out = DirectFunctionCall6(make_timestamp,
1125 Int64GetDatum(datetime_in.dateyear), Int64GetDatum(datetime_in.datemonth + 1), Int64GetDatum(datetime_in.datedmonth),
1126 Int64GetDatum(datetime_in.datehour), Int64GetDatum(datetime_in.dateminute), Float8GetDatum(seconds));
1127 #endif
1128 }
1129
1130 return erc;
1131 }
1132 #endif
1133
tdsConvertToCString(DBPROCESS * dbproc,int srctype,const BYTE * src,DBINT srclen)1134 char* tdsConvertToCString(DBPROCESS* dbproc, int srctype, const BYTE* src, DBINT srclen)
1135 {
1136 char* dest = NULL;
1137 int real_destlen;
1138 DBINT destlen;
1139 int desttype;
1140 int ret_value;
1141 #if (PG_VERSION_NUM >= 90400)
1142 Datum datetime_out;
1143 RETCODE erc;
1144 #endif
1145 int use_tds_conversion = 1;
1146
1147 switch(srctype)
1148 {
1149 case SYBCHAR:
1150 case SYBVARCHAR:
1151 case SYBTEXT:
1152 real_destlen = srclen + 1; /* the size of the array */
1153 destlen = -2; /* the size to pass to dbconvert (-2 means to null terminate it) */
1154 desttype = SYBCHAR;
1155 break;
1156 case SYBBINARY:
1157 case SYBVARBINARY:
1158 real_destlen = srclen;
1159 destlen = srclen;
1160 desttype = SYBBINARY;
1161 break;
1162
1163 #if (PG_VERSION_NUM >= 90400)
1164 case SYBDATETIME:
1165 erc = tdsDatetimeToDatum(dbproc, (DBDATETIME *)src, &datetime_out);
1166
1167 if (erc == SUCCEED)
1168 {
1169 const char *datetime_str = timestamptz_to_str(DatumGetTimestamp(datetime_out));
1170
1171 dest = palloc(strlen(datetime_str) * sizeof(char));
1172 strcpy(dest, datetime_str);
1173
1174 use_tds_conversion = 0;
1175 }
1176 #endif
1177
1178 default:
1179 real_destlen = 1000; /* Probably big enough */
1180 destlen = -2;
1181 desttype = SYBCHAR;
1182 break;
1183 }
1184
1185 ereport(DEBUG3,
1186 (errmsg("tds_fdw: Source type is %i. Destination type is %i", srctype, desttype)
1187 ));
1188 ereport(DEBUG3,
1189 (errmsg("tds_fdw: Source length is %i. Destination length is %i. Real destination length is %i", srclen, destlen, real_destlen)
1190 ));
1191
1192 if (use_tds_conversion)
1193 {
1194 if (dbwillconvert(srctype, desttype) != FALSE)
1195 {
1196 dest = palloc(real_destlen * sizeof(char));
1197 ret_value = dbconvert(dbproc, srctype, src, srclen, desttype, (BYTE *) dest, destlen);
1198
1199 if (ret_value == FAIL)
1200 {
1201 ereport(DEBUG3,
1202 (errmsg("tds_fdw: Failed to convert column")
1203 ));
1204 }
1205
1206 else if (ret_value == -1)
1207 {
1208 ereport(DEBUG3,
1209 (errmsg("tds_fdw: Failed to convert column. Could have been a NULL pointer or bad data type.")
1210 ));
1211 }
1212 }
1213
1214 else
1215 {
1216 ereport(DEBUG3,
1217 (errmsg("tds_fdw: Column cannot be converted to this type.")
1218 ));
1219 }
1220 }
1221
1222 return dest;
1223
1224 }
1225
1226 /* get output for EXPLAIN */
1227
tdsExplainForeignScan(ForeignScanState * node,ExplainState * es)1228 void tdsExplainForeignScan(ForeignScanState *node, ExplainState *es)
1229 {
1230 #ifdef DEBUG
1231 ereport(NOTICE,
1232 (errmsg("----> starting tdsExplainForeignScan")
1233 ));
1234 #endif
1235
1236 #ifdef DEBUG
1237 ereport(NOTICE,
1238 (errmsg("----> finishing tdsExplainForeignScan")
1239 ));
1240 #endif
1241 }
1242
1243 /* initiate access to foreign server and database */
1244
tdsBeginForeignScan(ForeignScanState * node,int eflags)1245 void tdsBeginForeignScan(ForeignScanState *node, int eflags)
1246 {
1247 TdsFdwOptionSet option_set;
1248 LOGINREC *login;
1249 DBPROCESS *dbproc;
1250 TdsFdwExecutionState *festate;
1251 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1252 EState *estate = node->ss.ps.state;
1253
1254 #ifdef DEBUG
1255 ereport(NOTICE,
1256 (errmsg("----> starting tdsBeginForeignScan")
1257 ));
1258 #endif
1259
1260 tdsGetForeignTableOptionsFromCatalog(RelationGetRelid(node->ss.ss_currentRelation), &option_set);
1261
1262 ereport(DEBUG3,
1263 (errmsg("tds_fdw: Initiating DB-Library")
1264 ));
1265
1266 if (dbinit() == FAIL)
1267 {
1268 ereport(ERROR,
1269 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1270 errmsg("Failed to initialize DB-Library environment")
1271 ));
1272 }
1273
1274 dberrhandle(tds_err_handler);
1275
1276 if (option_set.msg_handler)
1277 {
1278 if (strcmp(option_set.msg_handler, "notice") == 0)
1279 {
1280 dbmsghandle(tds_notice_msg_handler);
1281 }
1282
1283 else if (strcmp(option_set.msg_handler, "blackhole") == 0)
1284 {
1285 dbmsghandle(tds_blackhole_msg_handler);
1286 }
1287
1288 else
1289 {
1290 ereport(ERROR,
1291 (errcode(ERRCODE_SYNTAX_ERROR),
1292 errmsg("Unknown msg handler: %s.", option_set.msg_handler)
1293 ));
1294 }
1295 }
1296
1297 ereport(DEBUG3,
1298 (errmsg("tds_fdw: Getting login structure")
1299 ));
1300
1301 if ((login = dblogin()) == NULL)
1302 {
1303 ereport(ERROR,
1304 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1305 errmsg("Failed to initialize DB-Library login structure")
1306 ));
1307 }
1308
1309 if (tdsSetupConnection(&option_set, login, &dbproc) != 0)
1310 {
1311 goto cleanup;
1312 }
1313
1314 festate = (TdsFdwExecutionState *) palloc(sizeof(TdsFdwExecutionState));
1315 node->fdw_state = (void *) festate;
1316 festate->login = login;
1317 festate->dbproc = dbproc;
1318 festate->query = strVal(list_nth(fsplan->fdw_private,
1319 FdwScanPrivateSelectSql));
1320 festate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1321 FdwScanPrivateRetrievedAttrs);
1322 festate->first = 1;
1323 festate->row = 0;
1324 festate->mem_cxt = AllocSetContextCreate(estate->es_query_cxt,
1325 "tds_fdw data",
1326 ALLOCSET_DEFAULT_SIZES);
1327
1328 cleanup:
1329 ;
1330
1331 #ifdef DEBUG
1332 ereport(NOTICE,
1333 (errmsg("----> finishing tdsBeginForeignScan")
1334 ));
1335 #endif
1336 }
1337
tdsGetColumnMetadata(ForeignScanState * node,TdsFdwOptionSet * option_set)1338 void tdsGetColumnMetadata(ForeignScanState *node, TdsFdwOptionSet *option_set)
1339 {
1340 MemoryContext old_cxt;
1341 int ncol;
1342 char* local_columns_found = NULL;
1343 TdsFdwExecutionState *festate = (TdsFdwExecutionState *)node->fdw_state;
1344 int num_retrieved_attrs = list_length(festate->retrieved_attrs);
1345 Oid relOid = RelationGetRelid(node->ss.ss_currentRelation);
1346
1347 old_cxt = MemoryContextSwitchTo(festate->mem_cxt);
1348
1349 festate->attinmeta = TupleDescGetAttInMetadata(node->ss.ss_currentRelation->rd_att);
1350
1351 if (option_set->match_column_names && festate->ncols < num_retrieved_attrs)
1352 {
1353 ereport(ERROR,
1354 (errcode(ERRCODE_FDW_INCONSISTENT_DESCRIPTOR_INFORMATION),
1355 errmsg("Table definition mismatch: Foreign source returned %d column(s),"
1356 " but query expected %d column(s)",
1357 festate->ncols,
1358 num_retrieved_attrs)
1359 ));
1360 }
1361
1362 else if (!option_set->match_column_names && festate->ncols < festate->attinmeta->tupdesc->natts)
1363 {
1364 ereport(ERROR,
1365 (errcode(ERRCODE_FDW_INCONSISTENT_DESCRIPTOR_INFORMATION),
1366 errmsg("Table definition mismatch: Foreign source returned %d column(s),"
1367 " but local table has %d column(s)",
1368 festate->ncols,
1369 festate->attinmeta->tupdesc->natts)
1370 ));
1371 }
1372
1373 festate->columns = palloc(festate->ncols * sizeof(COL));
1374 festate->datums = palloc(festate->attinmeta->tupdesc->natts * sizeof(*festate->datums));
1375 festate->isnull = palloc(festate->attinmeta->tupdesc->natts * sizeof(*festate->isnull));
1376
1377 if (option_set->match_column_names)
1378 {
1379 local_columns_found = palloc0(festate->attinmeta->tupdesc->natts);
1380 }
1381
1382 for (ncol = 0; ncol < festate->ncols; ncol++)
1383 {
1384 COL* column;
1385
1386 column = &festate->columns[ncol];
1387 column->name = dbcolname(festate->dbproc, ncol + 1);
1388
1389 ereport(DEBUG3,
1390 (errmsg("tds_fdw: Fetching column %i (%s)", ncol, column->name)
1391 ));
1392
1393 column->srctype = dbcoltype(festate->dbproc, ncol + 1);
1394
1395 ereport(DEBUG3,
1396 (errmsg("tds_fdw: Type is %i", column->srctype)
1397 ));
1398
1399 if (option_set->match_column_names)
1400 {
1401 ListCell *lc;
1402
1403 ereport(DEBUG3,
1404 (errmsg("tds_fdw: Matching foreign column with local column by name.")
1405 ));
1406
1407 column->local_index = -1;
1408
1409 //for (local_ncol = 0; local_ncol < festate->attinmeta->tupdesc->natts; local_ncol++)
1410 foreach(lc, festate->retrieved_attrs)
1411 {
1412 /* this is indexed starting from 1, not 0 */
1413 int local_ncol = lfirst_int(lc) - 1;
1414 char* local_name = NULL;
1415 List *options;
1416 ListCell *inner_lc;
1417
1418 ereport(DEBUG3,
1419 (errmsg("tds_fdw: Comparing it to the following retrived column: %i", local_ncol)
1420 ));
1421
1422 options = GetForeignColumnOptions(relOid, local_ncol + 1);
1423
1424 foreach(inner_lc, options)
1425 {
1426 DefElem *def = (DefElem *) lfirst(inner_lc);
1427
1428 ereport(DEBUG3,
1429 (errmsg("tds_fdw: Checking if column_name is set as an option:%s => %s", def->defname, defGetString(def))
1430 ));
1431
1432 if (strcmp(def->defname, "column_name") == 0
1433 && strncmp(defGetString(def), column->name, NAMEDATALEN) == 0)
1434 {
1435 ereport(DEBUG3,
1436 (errmsg("tds_fdw: It matches!")
1437 ));
1438
1439 local_name = defGetString(def);
1440 column->local_index = local_ncol;
1441 column->attr_oid = TupleDescAttr(festate->attinmeta->tupdesc, local_ncol)->atttypid;
1442 local_columns_found[local_ncol] = 1;
1443 break;
1444 }
1445 }
1446
1447 if (!local_name)
1448 {
1449
1450 local_name = TupleDescAttr(festate->attinmeta->tupdesc, local_ncol)->attname.data;
1451
1452 ereport(DEBUG3,
1453 (errmsg("tds_fdw: Comparing retrieved column name to the following local column name: %s", local_name)
1454 ));
1455
1456 if (strncmp(local_name, column->name, NAMEDATALEN) == 0)
1457 {
1458 ereport(DEBUG3,
1459 (errmsg("tds_fdw: It matches!")
1460 ));
1461
1462 column->local_index = local_ncol;
1463 column->attr_oid = TupleDescAttr(festate->attinmeta->tupdesc, local_ncol)->atttypid;
1464 local_columns_found[local_ncol] = 1;
1465 break;
1466 }
1467 }
1468 }
1469
1470 if (column->local_index == -1)
1471 {
1472 ereport(WARNING,
1473 (errcode(ERRCODE_FDW_INCONSISTENT_DESCRIPTOR_INFORMATION),
1474 errmsg("Table definition mismatch: Foreign source has column named %s,"
1475 " but target table does not. Column will be ignored.",
1476 column->name)
1477 ));
1478 }
1479 }
1480
1481 else
1482 {
1483
1484 ereport(DEBUG3,
1485 (errmsg("tds_fdw: Matching foreign column with local column by position.")
1486 ));
1487
1488 column->local_index = ncol;
1489 column->attr_oid = TupleDescAttr(festate->attinmeta->tupdesc, ncol)->atttypid;
1490 }
1491
1492 ereport(DEBUG3,
1493 (errmsg("tds_fdw: Local index = %i, local type OID = %i", column->local_index, column->attr_oid)
1494 ));
1495 }
1496
1497 if (option_set->match_column_names)
1498 {
1499 for (ncol = 0; ncol < festate->attinmeta->tupdesc->natts; ncol++)
1500 {
1501 if (local_columns_found[ncol] == 0)
1502 {
1503 ereport(DEBUG3,
1504 (errcode(ERRCODE_FDW_INCONSISTENT_DESCRIPTOR_INFORMATION),
1505 errmsg("Table definition mismatch: Could not match local column %s"
1506 " with column from foreign table. It probably was not selected.",
1507 TupleDescAttr(festate->attinmeta->tupdesc, ncol)->attname.data)
1508 ));
1509
1510 /* pretend this is NULL, so Pg won't try to access an invalid Datum */
1511 festate->isnull[ncol] = 1;
1512 }
1513 }
1514
1515 pfree(local_columns_found);
1516 }
1517
1518 MemoryContextSwitchTo(old_cxt);
1519 }
1520
1521 /* get next row from foreign table */
1522
tdsIterateForeignScan(ForeignScanState * node)1523 TupleTableSlot* tdsIterateForeignScan(ForeignScanState *node)
1524 {
1525 TdsFdwOptionSet option_set;
1526 RETCODE erc;
1527 int ret_code;
1528 HeapTuple tuple;
1529 TdsFdwExecutionState *festate = (TdsFdwExecutionState *) node->fdw_state;
1530 EState *estate = node->ss.ps.state;
1531 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1532 int ncol;
1533
1534 /* Cleanup */
1535 ExecClearTuple(slot);
1536
1537 #ifdef DEBUG
1538 ereport(NOTICE,
1539 (errmsg("----> starting tdsIterateForeignScan")
1540 ));
1541 #endif
1542
1543 if (festate->first)
1544 {
1545 ereport(DEBUG3,
1546 (errmsg("tds_fdw: This is the first iteration")
1547 ));
1548 ereport(DEBUG3,
1549 (errmsg("tds_fdw: Setting database command to %s", festate->query)
1550 ));
1551
1552 festate->first = 0;
1553
1554 if ((erc = dbcmd(festate->dbproc, festate->query)) == FAIL)
1555 {
1556 ereport(ERROR,
1557 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1558 errmsg("Failed to set current query to %s", festate->query)
1559 ));
1560 }
1561
1562 ereport(DEBUG3,
1563 (errmsg("tds_fdw: Executing the query")
1564 ));
1565
1566 if ((erc = dbsqlexec(festate->dbproc)) == FAIL)
1567 {
1568 ereport(ERROR,
1569 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1570 errmsg("Failed to execute query %s", festate->query)
1571 ));
1572 }
1573
1574 ereport(DEBUG3,
1575 (errmsg("tds_fdw: Query executed correctly")
1576 ));
1577 ereport(DEBUG3,
1578 (errmsg("tds_fdw: Getting results")
1579 ));
1580
1581 erc = dbresults(festate->dbproc);
1582
1583 if (erc == FAIL)
1584 {
1585 ereport(ERROR,
1586 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1587 errmsg("Failed to get results from query %s", festate->query)
1588 ));
1589 }
1590
1591 else if (erc == NO_MORE_RESULTS)
1592 {
1593 ereport(ERROR,
1594 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1595 errmsg("There appears to be no results from query %s", festate->query)
1596 ));
1597 }
1598
1599 else if (erc == SUCCEED)
1600 {
1601 Oid relOid;
1602
1603 ereport(DEBUG3,
1604 (errmsg("tds_fdw: Successfully got results")
1605 ));
1606
1607 ereport(DEBUG3,
1608 (errmsg("tds_fdw: Getting column info")
1609 ));
1610
1611 festate->ncols = dbnumcols(festate->dbproc);
1612
1613 ereport(DEBUG3,
1614 (errmsg("tds_fdw: %i columns", festate->ncols)
1615 ));
1616
1617 MemoryContextReset(festate->mem_cxt);
1618
1619 relOid = RelationGetRelid(node->ss.ss_currentRelation);
1620
1621 ereport(DEBUG3,
1622 (errmsg("tds_fdw: Table OID is %i", relOid)
1623 ));
1624
1625 tdsGetForeignTableOptionsFromCatalog(relOid, &option_set);
1626 tdsGetColumnMetadata(node, &option_set);
1627
1628 for (ncol = 0; ncol < festate->ncols; ncol++) {
1629 COL* column = &festate->columns[ncol];
1630 const int srctype = column->srctype;
1631 const Oid attr_oid = column->attr_oid;
1632
1633 if (column->local_index == -1)
1634 {
1635 continue;
1636 }
1637
1638 erc = SUCCEED;
1639 column->useraw = false;
1640
1641 ereport(DEBUG3,
1642 (errmsg("tds_fdw: The foreign type is %i. The local type is %i.", srctype, attr_oid)
1643 ));
1644
1645 if (srctype == SYBINT2 && attr_oid == INT2OID)
1646 {
1647 erc = dbbind(festate->dbproc, ncol + 1, SMALLBIND, sizeof(DBSMALLINT), (BYTE *)(&column->value.dbsmallint));
1648 column->useraw = true;
1649 }
1650 else if (srctype == SYBINT4 && attr_oid == INT4OID)
1651 {
1652 erc = dbbind(festate->dbproc, ncol + 1, INTBIND, sizeof(DBINT), (BYTE *)(&column->value.dbint));
1653 column->useraw = true;
1654 }
1655 else if (srctype == SYBINT8 && attr_oid == INT8OID)
1656 {
1657 erc = dbbind(festate->dbproc, ncol + 1, BIGINTBIND, sizeof(DBBIGINT), (BYTE *)(&column->value.dbbigint));
1658 column->useraw = true;
1659 }
1660 else if (srctype == SYBREAL && attr_oid == FLOAT4OID)
1661 {
1662 erc = dbbind(festate->dbproc, ncol + 1, REALBIND, sizeof(DBREAL), (BYTE *)(&column->value.dbreal));
1663 column->useraw = true;
1664 }
1665 else if (srctype == SYBFLT8 && attr_oid == FLOAT8OID)
1666 {
1667 erc = dbbind(festate->dbproc, ncol + 1, FLT8BIND, sizeof(DBFLT8), (BYTE *)(&column->value.dbflt8));
1668 column->useraw = true;
1669 }
1670 else if ((srctype == SYBCHAR || srctype == SYBVARCHAR || srctype == SYBTEXT) &&
1671 (attr_oid == TEXTOID))
1672 {
1673 column->useraw = true;
1674 }
1675 else if ((srctype == SYBBINARY || srctype == SYBVARBINARY || srctype == SYBIMAGE) &&
1676 (attr_oid == BYTEAOID))
1677 {
1678 column->useraw = true;
1679 }
1680 #if (PG_VERSION_NUM >= 90400)
1681 else if (srctype == SYBDATETIME && attr_oid == TIMESTAMPOID)
1682 {
1683 column->useraw = true;
1684 }
1685 #endif
1686
1687 if (erc == FAIL)
1688 {
1689 ereport(ERROR,
1690 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1691 errmsg("Failed to bind results for column %s to a variable.",
1692 dbcolname(festate->dbproc, ncol + 1))));
1693 }
1694 }
1695 }
1696
1697 else
1698 {
1699 ereport(ERROR,
1700 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1701 errmsg("Unknown return code getting results from query %s", festate->query)
1702 ));
1703 }
1704 }
1705
1706 ereport(DEBUG3,
1707 (errmsg("tds_fdw: Fetching next row")
1708 ));
1709
1710 if ((ret_code = dbnextrow(festate->dbproc)) != NO_MORE_ROWS)
1711 {
1712 int ncol;
1713
1714 switch (ret_code)
1715 {
1716 case REG_ROW:
1717 festate->row++;
1718
1719 ereport(DEBUG3,
1720 (errmsg("tds_fdw: Row %i fetched", festate->row)
1721 ));
1722
1723 if (show_before_row_memory_stats)
1724 {
1725 fprintf(stderr,"Showing memory statistics before row %d.\n", festate->row);
1726
1727 MemoryContextStats(estate->es_query_cxt);
1728 }
1729
1730 for (ncol = 0; ncol < festate->ncols; ncol++)
1731 {
1732 COL* column;
1733 DBINT srclen;
1734 BYTE* src;
1735 char *cstring;
1736 Oid attr_oid;
1737 bytea *bytes;
1738
1739 column = &festate->columns[ncol];
1740 attr_oid = column->attr_oid;
1741
1742 if (column->local_index == -1)
1743 {
1744 ereport(DEBUG3,
1745 (errmsg("tds_fdw: Skipping column %s because it is not present in local table", column->name)
1746 ));
1747
1748 continue;
1749 }
1750
1751 srclen = dbdatlen(festate->dbproc, ncol + 1);
1752
1753 ereport(DEBUG3,
1754 (errmsg("tds_fdw: Data length is %i", srclen)
1755 ));
1756
1757 src = dbdata(festate->dbproc, ncol + 1);
1758
1759 if (srclen == 0 && src == NULL)
1760 {
1761 ereport(DEBUG3,
1762 (errmsg("tds_fdw: Column value is NULL")
1763 ));
1764
1765 festate->isnull[column->local_index] = true;
1766 continue;
1767 }
1768 else if (src == NULL)
1769 {
1770 ereport(DEBUG3,
1771 (errmsg("tds_fdw: Column value pointer is NULL, but probably shouldn't be")
1772 ));
1773 }
1774 else
1775 {
1776 festate->isnull[column->local_index] = false;
1777 }
1778
1779 if (column->useraw)
1780 {
1781 switch (attr_oid)
1782 {
1783 case INT2OID:
1784 festate->datums[column->local_index] = Int16GetDatum(column->value.dbsmallint);
1785 break;
1786 case INT4OID:
1787 festate->datums[column->local_index] = Int32GetDatum(column->value.dbint);
1788 break;
1789 case INT8OID:
1790 festate->datums[column->local_index] = Int64GetDatum(column->value.dbbigint);
1791 break;
1792 case FLOAT4OID:
1793 festate->datums[column->local_index] = Float4GetDatum(column->value.dbreal);
1794 break;
1795 case FLOAT8OID:
1796 festate->datums[column->local_index] = Float8GetDatum(column->value.dbflt8);
1797 break;
1798 case TEXTOID:
1799 festate->datums[column->local_index] = PointerGetDatum(cstring_to_text_with_len((char *)src, srclen));
1800 break;
1801 case BYTEAOID:
1802 bytes = palloc(srclen + VARHDRSZ);
1803 SET_VARSIZE(bytes, srclen + VARHDRSZ);
1804 memcpy(VARDATA(bytes), src, srclen);
1805 festate->datums[column->local_index] = PointerGetDatum(bytes);
1806 break;
1807 #if (PG_VERSION_NUM >= 90400)
1808 case TIMESTAMPOID:
1809 erc = tdsDatetimeToDatum(festate->dbproc, (DBDATETIME *)src, &festate->datums[column->local_index]);
1810 if (erc != SUCCEED)
1811 {
1812 ereport(ERROR,
1813 (errcode(ERRCODE_FDW_INVALID_ATTRIBUTE_VALUE),
1814 errmsg("Possibly invalid date value")));
1815 }
1816 break;
1817 #endif
1818 default:
1819 ereport(ERROR,
1820 (errcode(ERRCODE_FDW_ERROR),
1821 errmsg("%s marked useraw but wrong type (internal tds_fdw error)",
1822 dbcolname(festate->dbproc, ncol+1))));
1823 break;
1824 }
1825 }
1826 else
1827 {
1828 cstring = tdsConvertToCString(festate->dbproc, column->srctype, src, srclen);
1829 festate->datums[column->local_index] = InputFunctionCall(&festate->attinmeta->attinfuncs[column->local_index],
1830 cstring,
1831 festate->attinmeta->attioparams[column->local_index],
1832 festate->attinmeta->atttypmods[column->local_index]);
1833 }
1834 }
1835
1836 if (show_after_row_memory_stats)
1837 {
1838 fprintf(stderr,"Showing memory statistics after row %d.\n", festate->row);
1839
1840 MemoryContextStats(estate->es_query_cxt);
1841 }
1842
1843 tuple = heap_form_tuple(node->ss.ss_currentRelation->rd_att, festate->datums, festate->isnull);
1844 #if PG_VERSION_NUM < 120000
1845 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
1846 #else
1847 ExecStoreHeapTuple(tuple, slot, false);
1848 #endif
1849 break;
1850
1851 case BUF_FULL:
1852 ereport(ERROR,
1853 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1854 errmsg("Buffer filled up during query")
1855 ));
1856 break;
1857
1858 case FAIL:
1859 ereport(ERROR,
1860 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1861 errmsg("Failed to get row during query")
1862 ));
1863 break;
1864
1865 default:
1866 ereport(ERROR,
1867 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
1868 errmsg("Failed to get row during query. Unknown return code.")
1869 ));
1870 }
1871 }
1872
1873 else
1874 {
1875 ereport(DEBUG3,
1876 (errmsg("tds_fdw: No more rows")
1877 ));
1878 }
1879
1880 #ifdef DEBUG
1881 ereport(NOTICE,
1882 (errmsg("----> finishing tdsIterateForeignScan")
1883 ));
1884 #endif
1885
1886 return slot;
1887 }
1888
1889 /* rescan foreign table */
1890
tdsReScanForeignScan(ForeignScanState * node)1891 void tdsReScanForeignScan(ForeignScanState *node)
1892 {
1893 #ifdef DEBUG
1894 ereport(NOTICE,
1895 (errmsg("----> starting tdsReScanForeignScan")
1896 ));
1897 #endif
1898
1899 #ifdef DEBUG
1900 ereport(NOTICE,
1901 (errmsg("----> finishing tdsReScanForeignScan")
1902 ));
1903 #endif
1904 }
1905
1906 /* cleanup objects related to scan */
1907
tdsEndForeignScan(ForeignScanState * node)1908 void tdsEndForeignScan(ForeignScanState *node)
1909 {
1910 MemoryContext old_cxt;
1911 TdsFdwExecutionState *festate = (TdsFdwExecutionState *) node->fdw_state;
1912 EState *estate = node->ss.ps.state;
1913
1914 #ifdef DEBUG
1915 ereport(NOTICE,
1916 (errmsg("----> starting tdsEndForeignScan")
1917 ));
1918 #endif
1919
1920 old_cxt = MemoryContextSwitchTo(festate->mem_cxt);
1921
1922 if (show_finished_memory_stats)
1923 {
1924 fprintf(stderr,"Showing memory statistics after query finished.\n");
1925
1926 MemoryContextStats(estate->es_query_cxt);
1927 }
1928
1929 if (festate->query)
1930 {
1931 pfree(festate->query);
1932 }
1933
1934 ereport(DEBUG3,
1935 (errmsg("tds_fdw: Closing database connection")
1936 ));
1937
1938 dbclose(festate->dbproc);
1939
1940 ereport(DEBUG3,
1941 (errmsg("tds_fdw: Freeing login structure")
1942 ));
1943
1944 dbloginfree(festate->login);
1945
1946 ereport(DEBUG3,
1947 (errmsg("tds_fdw: Closing DB-Library")
1948 ));
1949
1950 dbexit();
1951
1952 #ifdef DEBUG
1953 ereport(NOTICE,
1954 (errmsg("----> finishing tdsEndForeignScan")
1955 ));
1956 #endif
1957
1958 MemoryContextSwitchTo(old_cxt);
1959 MemoryContextReset(festate->mem_cxt);
1960 }
1961
1962 /*
1963 * estimate_path_cost_size
1964 * Get cost and size estimates for a foreign scan
1965 *
1966 * We assume that all the baserestrictinfo clauses will be applied, plus
1967 * any join clauses listed in join_conds.
1968 */
1969 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * baserel,List * join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost,TdsFdwOptionSet * option_set)1970 estimate_path_cost_size(PlannerInfo *root,
1971 RelOptInfo *baserel,
1972 List *join_conds,
1973 List *pathkeys,
1974 double *p_rows, int *p_width,
1975 Cost *p_startup_cost, Cost *p_total_cost, TdsFdwOptionSet *option_set)
1976 {
1977 TdsFdwRelationInfo *fpinfo = (TdsFdwRelationInfo *) baserel->fdw_private;
1978 double rows = 0.0;
1979 double retrieved_rows = 0.0;
1980 int width = 0;
1981 Cost startup_cost = 0;
1982 Cost total_cost = 0;
1983 Cost run_cost;
1984 Cost cpu_per_tuple;
1985
1986 /*
1987 * If the table or the server is configured to use remote estimates,
1988 * connect to the foreign server and execute EXPLAIN to estimate the
1989 * number of rows selected by the restriction+join clauses. Otherwise,
1990 * estimate rows using whatever statistics we have locally, in a way
1991 * similar to ordinary tables.
1992 */
1993 if (fpinfo->use_remote_estimate)
1994 {
1995 LOGINREC *login;
1996 DBPROCESS *dbproc;
1997 Selectivity local_sel;
1998 QualCost local_cost;
1999 List *remote_join_conds;
2000 List *local_join_conds;
2001 List *usable_pathkeys = NIL;
2002 ListCell *lc;
2003 List *retrieved_attrs;
2004
2005 /*
2006 * join_conds might contain both clauses that are safe to send across,
2007 * and clauses that aren't.
2008 */
2009 classifyConditions(root, baserel, baserel->baserestrictinfo,
2010 &remote_join_conds, &local_join_conds);
2011
2012 /*
2013 * Determine whether we can potentially push query pathkeys to the remote
2014 * side, avoiding a local sort.
2015 */
2016 foreach(lc, pathkeys)
2017 {
2018 PathKey *pathkey = (PathKey *) lfirst(lc);
2019 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
2020 Expr *em_expr;
2021
2022 /*
2023 * is_foreign_expr would detect volatile expressions as well, but
2024 * ec_has_volatile saves some cycles.
2025 */
2026 if (!pathkey_ec->ec_has_volatile &&
2027 (em_expr = find_em_expr_for_rel(pathkey_ec, baserel)) &&
2028 is_foreign_expr(root, baserel, em_expr))
2029 usable_pathkeys = lappend(usable_pathkeys, pathkey);
2030 else
2031 {
2032 /*
2033 * The planner and executor don't have any clever strategy for
2034 * taking data sorted by a prefix of the query's pathkeys and
2035 * getting it to be sorted by all of those pathekeys. We'll just
2036 * end up resorting the entire data set. So, unless we can push
2037 * down all of the query pathkeys, forget it.
2038 */
2039 list_free(usable_pathkeys);
2040 usable_pathkeys = NIL;
2041 break;
2042 }
2043 }
2044
2045 tdsBuildForeignQuery(root, baserel, option_set,
2046 fpinfo->attrs_used, &retrieved_attrs,
2047 fpinfo->remote_conds, remote_join_conds, usable_pathkeys);
2048
2049 /* Get the remote estimate */
2050
2051 ereport(DEBUG3,
2052 (errmsg("tds_fdw: Initiating DB-Library")
2053 ));
2054
2055 if (dbinit() == FAIL)
2056 {
2057 ereport(ERROR,
2058 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2059 errmsg("Failed to initialize DB-Library environment")
2060 ));
2061 goto cleanup_before_init;
2062 }
2063
2064 dberrhandle(tds_err_handler);
2065
2066 if (option_set->msg_handler)
2067 {
2068 if (strcmp(option_set->msg_handler, "notice") == 0)
2069 {
2070 dbmsghandle(tds_notice_msg_handler);
2071 }
2072
2073 else if (strcmp(option_set->msg_handler, "blackhole") == 0)
2074 {
2075 dbmsghandle(tds_blackhole_msg_handler);
2076 }
2077
2078 else
2079 {
2080 ereport(ERROR,
2081 (errcode(ERRCODE_SYNTAX_ERROR),
2082 errmsg("Unknown msg handler: %s.", option_set->msg_handler)
2083 ));
2084 }
2085 }
2086
2087 ereport(DEBUG3,
2088 (errmsg("tds_fdw: Getting login structure")
2089 ));
2090
2091 if ((login = dblogin()) == NULL)
2092 {
2093 ereport(ERROR,
2094 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2095 errmsg("Failed to initialize DB-Library login structure")
2096 ));
2097 goto cleanup_before_login;
2098 }
2099
2100 if (tdsSetupConnection(option_set, login, &dbproc) != 0)
2101 {
2102 goto cleanup;
2103 }
2104
2105 rows = tdsGetRowCount(option_set, login, dbproc);
2106 retrieved_rows = rows;
2107
2108 width = option_set->fdw_tuple_cost;
2109 startup_cost = option_set->fdw_startup_cost;
2110 total_cost = 0;
2111
2112 /* Factor in the selectivity of the locally-checked quals */
2113 local_sel = clauselist_selectivity(root,
2114 join_conds,
2115 baserel->relid,
2116 JOIN_INNER,
2117 NULL);
2118 local_sel *= fpinfo->local_conds_sel;
2119
2120 rows = clamp_row_est(rows * local_sel);
2121
2122 /* Add in the eval cost of the locally-checked quals */
2123 startup_cost += fpinfo->local_conds_cost.startup;
2124 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2125 cost_qual_eval(&local_cost, join_conds, root);
2126 startup_cost += local_cost.startup;
2127 total_cost += local_cost.per_tuple * retrieved_rows;
2128
2129 cleanup:
2130 dbclose(dbproc);
2131 dbloginfree(login);
2132
2133 cleanup_before_login:
2134 dbexit();
2135
2136 cleanup_before_init:
2137 ;
2138 }
2139 else
2140 {
2141 /*
2142 * We don't support join conditions in this mode (hence, no
2143 * parameterized paths can be made).
2144 */
2145 Assert(join_conds == NIL);
2146
2147 /* Use rows/width estimates made by set_baserel_size_estimates. */
2148 rows = baserel->rows;
2149 #if (PG_VERSION_NUM < 90600)
2150 width = baserel->width;
2151 #else
2152 width = baserel->reltarget->width;
2153 #endif /* PG_VERSION_NUM < 90600 */
2154
2155 /*
2156 * Back into an estimate of the number of retrieved rows. Just in
2157 * case this is nuts, clamp to at most baserel->tuples.
2158 */
2159 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2160 retrieved_rows = Min(retrieved_rows, baserel->tuples);
2161
2162 /*
2163 * Cost as though this were a seqscan, which is pessimistic. We
2164 * effectively imagine the local_conds are being evaluated remotely,
2165 * too.
2166 */
2167 startup_cost = 0;
2168 run_cost = 0;
2169 run_cost += seq_page_cost * baserel->pages;
2170
2171 startup_cost += baserel->baserestrictcost.startup;
2172 cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
2173 run_cost += cpu_per_tuple * baserel->tuples;
2174
2175 /*
2176 * Without remote estimates, we have no real way to estimate the cost
2177 * of generating sorted output. It could be free if the query plan
2178 * the remote side would have chosen generates properly-sorted output
2179 * anyway, but in most cases it will cost something. Estimate a value
2180 * high enough that we won't pick the sorted path when the ordering
2181 * isn't locally useful, but low enough that we'll err on the side of
2182 * pushing down the ORDER BY clause when it's useful to do so.
2183 */
2184 if (pathkeys != NIL)
2185 {
2186 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2187 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2188 }
2189
2190 total_cost = startup_cost + run_cost;
2191 }
2192
2193 /*
2194 * Add some additional cost factors to account for connection overhead
2195 * (fdw_startup_cost), transferring data across the network
2196 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2197 * (cpu_tuple_cost per retrieved row).
2198 */
2199 startup_cost += fpinfo->fdw_startup_cost;
2200 total_cost += fpinfo->fdw_startup_cost;
2201 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2202 total_cost += cpu_tuple_cost * retrieved_rows;
2203
2204 /* Return results. */
2205 *p_rows = rows;
2206 *p_width = width;
2207 *p_startup_cost = startup_cost;
2208 *p_total_cost = total_cost;
2209 }
2210
tdsGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)2211 void tdsGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
2212 {
2213 TdsFdwRelationInfo *fpinfo;
2214 ListCell *lc;
2215 TdsFdwOptionSet option_set;
2216
2217 #ifdef DEBUG
2218 ereport(NOTICE,
2219 (errmsg("----> starting tdsGetForeignRelSize")
2220 ));
2221 #endif
2222
2223 /*
2224 * We use PgFdwRelationInfo to pass various information to subsequent
2225 * functions.
2226 */
2227 fpinfo = (TdsFdwRelationInfo *) palloc0(sizeof(TdsFdwRelationInfo));
2228 baserel->fdw_private = (void *) fpinfo;
2229
2230 /* Look up foreign-table catalog info. */
2231 fpinfo->table = GetForeignTable(foreigntableid);
2232 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
2233
2234 tdsGetForeignTableOptionsFromCatalog(foreigntableid, &option_set);
2235
2236 fpinfo->use_remote_estimate = option_set.use_remote_estimate;
2237 fpinfo->fdw_startup_cost = option_set.fdw_startup_cost;
2238 fpinfo->fdw_tuple_cost = option_set.fdw_tuple_cost;
2239
2240 /*
2241 * Identify which baserestrictinfo clauses can be sent to the remote
2242 * server and which can't.
2243 */
2244 classifyConditions(root, baserel, baserel->baserestrictinfo,
2245 &fpinfo->remote_conds, &fpinfo->local_conds);
2246
2247 /*
2248 * Identify which attributes will need to be retrieved from the remote
2249 * server. These include all attrs needed for joins or final output, plus
2250 * all attrs used in the local_conds. (Note: if we end up using a
2251 * parameterized scan, it's possible that some of the join clauses will be
2252 * sent to the remote and thus we wouldn't really need to retrieve the
2253 * columns used in them. Doesn't seem worth detecting that case though.)
2254 */
2255 fpinfo->attrs_used = NULL;
2256 #if (PG_VERSION_NUM < 90600)
2257 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
2258 &fpinfo->attrs_used);
2259 #else
2260 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
2261 &fpinfo->attrs_used);
2262 #endif /* PG_VERSION_NUM < 90600 */
2263 foreach(lc, fpinfo->local_conds)
2264 {
2265 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
2266
2267 pull_varattnos((Node *) rinfo->clause, baserel->relid,
2268 &fpinfo->attrs_used);
2269 }
2270
2271 /*
2272 * Compute the selectivity and cost of the local_conds, so we don't have
2273 * to do it over again for each path. The best we can do for these
2274 * conditions is to estimate selectivity on the basis of local statistics.
2275 */
2276 fpinfo->local_conds_sel = clauselist_selectivity(root,
2277 fpinfo->local_conds,
2278 baserel->relid,
2279 JOIN_INNER,
2280 NULL);
2281
2282 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
2283
2284 /*
2285 * If the table or the server is configured to use remote estimates,
2286 * connect to the foreign server and execute EXPLAIN to estimate the
2287 * number of rows selected by the restriction clauses, as well as the
2288 * average row width. Otherwise, estimate using whatever statistics we
2289 * have locally, in a way similar to ordinary tables.
2290 */
2291 if (fpinfo->use_remote_estimate)
2292 {
2293 ereport(DEBUG3,
2294 (errmsg("tds_fdw: Using remote estimate")
2295 ));
2296
2297 /*
2298 * Get cost/size estimates with help of remote server. Save the
2299 * values in fpinfo so we don't need to do it again to generate the
2300 * basic foreign path.
2301 */
2302 estimate_path_cost_size(root, baserel, NIL, NIL,
2303 &fpinfo->rows, &fpinfo->width,
2304 &fpinfo->startup_cost, &fpinfo->total_cost, &option_set);
2305
2306 /* Report estimated baserel size to planner. */
2307 baserel->rows = fpinfo->rows;
2308 #if (PG_VERSION_NUM < 90600)
2309 baserel->width = fpinfo->width;
2310 #else
2311 baserel->reltarget->width = fpinfo->width;
2312 #endif /* PG_VERSION_NUM < 90600 */
2313 }
2314 else
2315 {
2316 ereport(DEBUG3,
2317 (errmsg("tds_fdw: Using local estimate")
2318 ));
2319
2320 /*
2321 * If the foreign table has never been ANALYZEd, it will have relpages
2322 * and reltuples equal to zero, which most likely has nothing to do
2323 * with reality. We can't do a whole lot about that if we're not
2324 * allowed to consult the remote server, but we can use a hack similar
2325 * to plancat.c's treatment of empty relations: use a minimum size
2326 * estimate of 10 pages, and divide by the column-datatype-based width
2327 * estimate to get the corresponding number of tuples.
2328 */
2329 if (baserel->tuples == 0)
2330 {
2331 baserel->tuples = option_set.local_tuple_estimate;
2332 }
2333
2334 /* Estimate baserel size as best we can with local statistics. */
2335 set_baserel_size_estimates(root, baserel);
2336
2337 /* Fill in basically-bogus cost estimates for use later. */
2338 estimate_path_cost_size(root, baserel, NIL, NIL,
2339 &fpinfo->rows, &fpinfo->width,
2340 &fpinfo->startup_cost, &fpinfo->total_cost, &option_set);
2341 }
2342
2343
2344 #if (PG_VERSION_NUM < 90600)
2345 ereport(DEBUG3,
2346 (errmsg("tds_fdw: Estimated rows = %f, estimated width = %d", baserel->rows, baserel->width)
2347 ));
2348 #else
2349 ereport(DEBUG3,
2350 (errmsg("tds_fdw: Estimated rows = %f, estimated width = %d", baserel->rows,
2351 baserel->reltarget->width)
2352 ));
2353 #endif /* PG_VERSION_NUM < 90600 */
2354
2355 #ifdef DEBUG
2356 ereport(NOTICE,
2357 (errmsg("----> finishing tdsGetForeignRelSize")
2358 ));
2359 #endif
2360 }
2361
tdsEstimateCosts(PlannerInfo * root,RelOptInfo * baserel,Cost * startup_cost,Cost * total_cost,Oid foreigntableid)2362 void tdsEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Cost *startup_cost, Cost *total_cost, Oid foreigntableid)
2363 {
2364 TdsFdwOptionSet option_set;
2365
2366 #ifdef DEBUG
2367 ereport(NOTICE,
2368 (errmsg("----> starting tdsEstimateCosts")
2369 ));
2370 #endif
2371
2372 tdsGetForeignTableOptionsFromCatalog(foreigntableid, &option_set);
2373
2374 *startup_cost = tdsGetStartupCost(&option_set);
2375
2376 *total_cost = baserel->rows + *startup_cost;
2377
2378 #ifdef DEBUG
2379 ereport(NOTICE,
2380 (errmsg("----> finishing tdsEstimateCosts")
2381 ));
2382 #endif
2383 }
2384
tdsGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)2385 void tdsGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
2386 {
2387 TdsFdwOptionSet option_set;
2388 TdsFdwRelationInfo *fpinfo = (TdsFdwRelationInfo *) baserel->fdw_private;
2389 ForeignPath *path;
2390 #if (PG_VERSION_NUM >= 90500)
2391 List *ppi_list;
2392 #endif
2393 ListCell *lc;
2394 List *usable_pathkeys = NIL;
2395
2396 #ifdef DEBUG
2397 ereport(NOTICE,
2398 (errmsg("----> starting tdsGetForeignPaths")
2399 ));
2400 #endif
2401
2402 tdsGetForeignTableOptionsFromCatalog(foreigntableid, &option_set);
2403
2404 /*
2405 * Create simplest ForeignScan path node and add it to baserel. This path
2406 * corresponds to SeqScan path of regular tables (though depending on what
2407 * baserestrict conditions we were able to send to remote, there might
2408 * actually be an indexscan happening there). We already did all the work
2409 * to estimate cost and size of this path.
2410 */
2411 #if PG_VERSION_NUM < 90500
2412 path = create_foreignscan_path(root, baserel,
2413 fpinfo->rows,
2414 fpinfo->startup_cost,
2415 fpinfo->total_cost,
2416 NIL, /* no pathkeys */
2417 NULL, /* no outer rel either */
2418 NIL); /* no fdw_private list */
2419 #elif PG_VERSION_NUM < 90600
2420 path = create_foreignscan_path(root, baserel,
2421 fpinfo->rows,
2422 fpinfo->startup_cost,
2423 fpinfo->total_cost,
2424 NIL, /* no pathkeys */
2425 NULL, /* no outer rel either */
2426 NULL, /* no extra plan */
2427 NIL); /* no fdw_private list */
2428 #else
2429 path = create_foreignscan_path(root, baserel, NULL,
2430 fpinfo->rows,
2431 fpinfo->startup_cost,
2432 fpinfo->total_cost,
2433 NIL, /* no pathkeys */
2434 NULL, /* no outer rel either */
2435 NULL, /* no extra plan */
2436 NIL); /* no fdw_private list */
2437 #endif /* PG_VERSION_NUM < 90500 */
2438
2439 add_path(baserel, (Path *) path);
2440
2441 /*
2442 * Determine whether we can potentially push query pathkeys to the remote
2443 * side, avoiding a local sort.
2444 */
2445 foreach(lc, root->query_pathkeys)
2446 {
2447 PathKey *pathkey = (PathKey *) lfirst(lc);
2448 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
2449 Expr *em_expr;
2450
2451 /*
2452 * is_foreign_expr would detect volatile expressions as well, but
2453 * ec_has_volatile saves some cycles.
2454 */
2455 if (!pathkey_ec->ec_has_volatile &&
2456 (em_expr = find_em_expr_for_rel(pathkey_ec, baserel)) &&
2457 is_foreign_expr(root, baserel, em_expr))
2458 usable_pathkeys = lappend(usable_pathkeys, pathkey);
2459 else
2460 {
2461 /*
2462 * The planner and executor don't have any clever strategy for
2463 * taking data sorted by a prefix of the query's pathkeys and
2464 * getting it to be sorted by all of those pathekeys. We'll just
2465 * end up resorting the entire data set. So, unless we can push
2466 * down all of the query pathkeys, forget it.
2467 */
2468 list_free(usable_pathkeys);
2469 usable_pathkeys = NIL;
2470 break;
2471 }
2472 }
2473
2474 /* Create a path with useful pathkeys, if we found one. */
2475 if (usable_pathkeys != NULL)
2476 {
2477 double rows;
2478 int width;
2479 Cost startup_cost;
2480 Cost total_cost;
2481
2482 estimate_path_cost_size(root, baserel, NIL, usable_pathkeys,
2483 &rows, &width, &startup_cost, &total_cost, &option_set);
2484
2485 add_path(baserel, (Path *)
2486 #if PG_VERSION_NUM < 90500
2487 create_foreignscan_path(root, baserel,
2488 rows,
2489 startup_cost,
2490 total_cost,
2491 usable_pathkeys,
2492 NULL,
2493 NIL));
2494 #elif PG_VERSION_NUM < 90600
2495 create_foreignscan_path(root, baserel,
2496 rows,
2497 startup_cost,
2498 total_cost,
2499 usable_pathkeys,
2500 NULL,
2501 NULL,
2502 NIL));
2503 #else
2504 create_foreignscan_path(root, baserel,
2505 NULL,
2506 rows,
2507 startup_cost,
2508 total_cost,
2509 usable_pathkeys,
2510 NULL,
2511 NULL,
2512 NIL));
2513 #endif /* PG_VERSION_NUM < 90500 */
2514 }
2515
2516 /* Don't worry about join pushdowns unless this is PostgreSQL 9.5+ */
2517 #if (PG_VERSION_NUM >= 90500)
2518
2519 /*
2520 * If we're not using remote estimates, stop here. We have no way to
2521 * estimate whether any join clauses would be worth sending across, so
2522 * don't bother building parameterized paths.
2523 */
2524 if (!fpinfo->use_remote_estimate)
2525 return;
2526
2527 /*
2528 * Thumb through all join clauses for the rel to identify which outer
2529 * relations could supply one or more safe-to-send-to-remote join clauses.
2530 * We'll build a parameterized path for each such outer relation.
2531 *
2532 * It's convenient to manage this by representing each candidate outer
2533 * relation by the ParamPathInfo node for it. We can then use the
2534 * ppi_clauses list in the ParamPathInfo node directly as a list of the
2535 * interesting join clauses for that rel. This takes care of the
2536 * possibility that there are multiple safe join clauses for such a rel,
2537 * and also ensures that we account for unsafe join clauses that we'll
2538 * still have to enforce locally (since the parameterized-path machinery
2539 * insists that we handle all movable clauses).
2540 */
2541 ppi_list = NIL;
2542 foreach(lc, baserel->joininfo)
2543 {
2544 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
2545 Relids required_outer;
2546 ParamPathInfo *param_info;
2547
2548 /* Check if clause can be moved to this rel */
2549 if (!join_clause_is_movable_to(rinfo, baserel))
2550 continue;
2551
2552 /* See if it is safe to send to remote */
2553 if (!is_foreign_expr(root, baserel, rinfo->clause))
2554 continue;
2555
2556 /* Calculate required outer rels for the resulting path */
2557 required_outer = bms_union(rinfo->clause_relids,
2558 baserel->lateral_relids);
2559 /* We do not want the foreign rel itself listed in required_outer */
2560 required_outer = bms_del_member(required_outer, baserel->relid);
2561
2562 /*
2563 * required_outer probably can't be empty here, but if it were, we
2564 * couldn't make a parameterized path.
2565 */
2566 if (bms_is_empty(required_outer))
2567 continue;
2568
2569 /* Get the ParamPathInfo */
2570 param_info = get_baserel_parampathinfo(root, baserel,
2571 required_outer);
2572 Assert(param_info != NULL);
2573
2574 /*
2575 * Add it to list unless we already have it. Testing pointer equality
2576 * is OK since get_baserel_parampathinfo won't make duplicates.
2577 */
2578 ppi_list = list_append_unique_ptr(ppi_list, param_info);
2579 }
2580
2581 /*
2582 * The above scan examined only "generic" join clauses, not those that
2583 * were absorbed into EquivalenceClauses. See if we can make anything out
2584 * of EquivalenceClauses.
2585 */
2586 if (baserel->has_eclass_joins)
2587 {
2588 /*
2589 * We repeatedly scan the eclass list looking for column references
2590 * (or expressions) belonging to the foreign rel. Each time we find
2591 * one, we generate a list of equivalence joinclauses for it, and then
2592 * see if any are safe to send to the remote. Repeat till there are
2593 * no more candidate EC members.
2594 */
2595 ec_member_foreign_arg arg;
2596
2597 arg.already_used = NIL;
2598 for (;;)
2599 {
2600 List *clauses;
2601
2602 /* Make clauses, skipping any that join to lateral_referencers */
2603 arg.current = NULL;
2604 clauses = generate_implied_equalities_for_column(root,
2605 baserel,
2606 ec_member_matches_foreign,
2607 (void *) &arg,
2608 baserel->lateral_referencers);
2609
2610 /* Done if there are no more expressions in the foreign rel */
2611 if (arg.current == NULL)
2612 {
2613 Assert(clauses == NIL);
2614 break;
2615 }
2616
2617 /* Scan the extracted join clauses */
2618 foreach(lc, clauses)
2619 {
2620 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
2621 Relids required_outer;
2622 ParamPathInfo *param_info;
2623
2624 /* Check if clause can be moved to this rel */
2625 if (!join_clause_is_movable_to(rinfo, baserel))
2626 continue;
2627
2628 /* See if it is safe to send to remote */
2629 if (!is_foreign_expr(root, baserel, rinfo->clause))
2630 continue;
2631
2632 /* Calculate required outer rels for the resulting path */
2633 required_outer = bms_union(rinfo->clause_relids,
2634 baserel->lateral_relids);
2635 required_outer = bms_del_member(required_outer, baserel->relid);
2636 if (bms_is_empty(required_outer))
2637 continue;
2638
2639 /* Get the ParamPathInfo */
2640 param_info = get_baserel_parampathinfo(root, baserel,
2641 required_outer);
2642 Assert(param_info != NULL);
2643
2644 /* Add it to list unless we already have it */
2645 ppi_list = list_append_unique_ptr(ppi_list, param_info);
2646 }
2647
2648 /* Try again, now ignoring the expression we found this time */
2649 arg.already_used = lappend(arg.already_used, arg.current);
2650 }
2651 }
2652
2653 /*
2654 * Now build a path for each useful outer relation.
2655 */
2656 foreach(lc, ppi_list)
2657 {
2658 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
2659 double rows;
2660 int width;
2661 Cost startup_cost;
2662 Cost total_cost;
2663
2664 /* Get a cost estimate from the remote */
2665 estimate_path_cost_size(root, baserel,
2666 param_info->ppi_clauses, NIL,
2667 &rows, &width,
2668 &startup_cost, &total_cost, &option_set);
2669
2670 /*
2671 * ppi_rows currently won't get looked at by anything, but still we
2672 * may as well ensure that it matches our idea of the rowcount.
2673 */
2674 param_info->ppi_rows = rows;
2675
2676 /* Make the path */
2677 #if PG_VERSION_NUM < 90500
2678 path = create_foreignscan_path(root, baserel,
2679 rows,
2680 startup_cost,
2681 total_cost,
2682 NIL, /* no pathkeys */
2683 param_info->ppi_req_outer,
2684 NIL); /* no fdw_private list */
2685 #elif PG_VERSION_NUM < 90600
2686 path = create_foreignscan_path(root, baserel,
2687 rows,
2688 startup_cost,
2689 total_cost,
2690 NIL, /* no pathkeys */
2691 param_info->ppi_req_outer,
2692 NULL,
2693 NIL); /* no fdw_private list */
2694 #else
2695 path = create_foreignscan_path(root, baserel,
2696 NULL,
2697 rows,
2698 startup_cost,
2699 total_cost,
2700 NIL, /* no pathkeys */
2701 param_info->ppi_req_outer,
2702 NULL,
2703 NIL); /* no fdw_private list */
2704 #endif /* PG_VERSION_NUM < 90500 */
2705 add_path(baserel, (Path *) path);
2706 }
2707
2708 #endif
2709
2710 #ifdef DEBUG
2711 ereport(NOTICE,
2712 (errmsg("----> finishing tdsGetForeignPaths")
2713 ));
2714 #endif
2715 }
2716
tdsAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)2717 bool tdsAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
2718 {
2719 #ifdef DEBUG
2720 ereport(NOTICE,
2721 (errmsg("----> starting tdsAnalyzeForeignTable")
2722 ));
2723 #endif
2724
2725 #ifdef DEBUG
2726 ereport(NOTICE,
2727 (errmsg("----> finishing tdsAnalyzeForeignTable")
2728 ));
2729 #endif
2730
2731 return false;
2732 }
2733 #if (PG_VERSION_NUM >= 90500)
tdsGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)2734 ForeignScan* tdsGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel,
2735 Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
2736 #else
2737 ForeignScan* tdsGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel,
2738 Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses)
2739 #endif
2740 {
2741 TdsFdwRelationInfo *fpinfo = (TdsFdwRelationInfo *) baserel->fdw_private;
2742 TdsFdwOptionSet option_set;
2743 Index scan_relid = baserel->relid;
2744 List *fdw_private;
2745 List *remote_conds = NIL;
2746 List *remote_exprs = NIL;
2747 List *local_exprs = NIL;
2748 List *params_list = NIL;
2749 List *retrieved_attrs = NIL;
2750 ListCell *lc;
2751
2752 #ifdef DEBUG
2753 ereport(NOTICE,
2754 (errmsg("----> starting tdsGetForeignPlan")
2755 ));
2756 #endif
2757
2758 tdsGetForeignTableOptionsFromCatalog(foreigntableid, &option_set);
2759
2760 /*
2761 * Separate the scan_clauses into those that can be executed remotely and
2762 * those that can't. baserestrictinfo clauses that were previously
2763 * determined to be safe or unsafe by classifyConditions are shown in
2764 * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
2765 * scan_clauses list will be a join clause, which we have to check for
2766 * remote-safety.
2767 *
2768 * Note: the join clauses we see here should be the exact same ones
2769 * previously examined by postgresGetForeignPaths. Possibly it'd be worth
2770 * passing forward the classification work done then, rather than
2771 * repeating it here.
2772 *
2773 * This code must match "extract_actual_clauses(scan_clauses, false)"
2774 * except for the additional decision about remote versus local execution.
2775 * Note however that we don't strip the RestrictInfo nodes from the
2776 * remote_conds list, since appendWhereClause expects a list of
2777 * RestrictInfos.
2778 */
2779 foreach(lc, scan_clauses)
2780 {
2781 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
2782
2783 Assert(IsA(rinfo, RestrictInfo));
2784
2785 /* Ignore any pseudoconstants, they're dealt with elsewhere */
2786 if (rinfo->pseudoconstant)
2787 continue;
2788
2789 if (list_member_ptr(fpinfo->remote_conds, rinfo))
2790 {
2791 remote_conds = lappend(remote_conds, rinfo);
2792 remote_exprs = lappend(remote_exprs, rinfo->clause);
2793 }
2794 else if (list_member_ptr(fpinfo->local_conds, rinfo))
2795 local_exprs = lappend(local_exprs, rinfo->clause);
2796 else if (is_foreign_expr(root, baserel, rinfo->clause))
2797 {
2798 remote_conds = lappend(remote_conds, rinfo);
2799 remote_exprs = lappend(remote_exprs, rinfo->clause);
2800 }
2801 else
2802 local_exprs = lappend(local_exprs, rinfo->clause);
2803 }
2804
2805 /*
2806 * Build the query string to be sent for execution, and identify
2807 * expressions to be sent as parameters.
2808 */
2809
2810 tdsBuildForeignQuery(root, baserel, &option_set,
2811 fpinfo->attrs_used, &retrieved_attrs,
2812 remote_conds, NULL, best_path->path.pathkeys);
2813
2814 /*
2815 * Build the fdw_private list that will be available to the executor.
2816 * Items in the list must match enum FdwScanPrivateIndex, above.
2817 */
2818 fdw_private = list_make2(makeString(option_set.query),
2819 retrieved_attrs);
2820
2821 /*
2822 * Create the ForeignScan node from target list, filtering expressions,
2823 * remote parameter expressions, and FDW private information.
2824 *
2825 * Note that the remote parameter expressions are stored in the fdw_exprs
2826 * field of the finished plan node; we can't keep them in private state
2827 * because then they wouldn't be subject to later planner processing.
2828 */
2829
2830
2831 #ifdef DEBUG
2832 ereport(NOTICE,
2833 (errmsg("----> finishing tdsGetForeignPlan")
2834 ));
2835 #endif
2836
2837 #if (PG_VERSION_NUM >= 90500)
2838 return make_foreignscan(tlist,
2839 local_exprs,
2840 scan_relid,
2841 params_list,
2842 fdw_private,
2843 NIL, /* no custom tlist */
2844 remote_exprs,
2845 outer_plan);
2846 #else
2847 return make_foreignscan(tlist, local_exprs, scan_relid, params_list, fdw_private);
2848 #endif
2849 }
2850
2851 static bool
tdsExecuteQuery(char * query,DBPROCESS * dbproc)2852 tdsExecuteQuery(char *query, DBPROCESS *dbproc)
2853 {
2854 RETCODE erc;
2855
2856 #ifdef DEBUG
2857 ereport(NOTICE,
2858 (errmsg("----> starting tdsExecuteQuery")
2859 ));
2860 #endif
2861
2862 ereport(DEBUG3,
2863 (errmsg("tds_fdw: Setting database command to %s", query)
2864 ));
2865
2866 if ((erc = dbcmd(dbproc, query)) == FAIL)
2867 {
2868 ereport(ERROR,
2869 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2870 errmsg("Failed to set current query to %s", query)
2871 ));
2872 }
2873
2874 ereport(DEBUG3,
2875 (errmsg("tds_fdw: Executing the query")
2876 ));
2877
2878 if ((erc = dbsqlexec(dbproc)) == FAIL)
2879 {
2880 ereport(ERROR,
2881 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2882 errmsg("Failed to execute query %s", query)
2883 ));
2884 }
2885
2886 ereport(DEBUG3,
2887 (errmsg("tds_fdw: Query executed correctly")
2888 ));
2889
2890 ereport(DEBUG3,
2891 (errmsg("tds_fdw: Getting results")
2892 ));
2893
2894 erc = dbresults(dbproc);
2895
2896 if (erc == FAIL)
2897 {
2898 ereport(ERROR,
2899 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2900 errmsg("Failed to get results from query %s", query)
2901 ));
2902 }
2903
2904 else if (erc == NO_MORE_RESULTS)
2905 {
2906 ereport(DEBUG3,
2907 (errmsg("tds_fdw: There appears to be no results from query %s", query)
2908 ));
2909
2910 goto cleanup;
2911 }
2912
2913 else if (erc == SUCCEED)
2914 {
2915 ereport(DEBUG3,
2916 (errmsg("tds_fdw: Successfully got results")
2917 ));
2918
2919 goto cleanup;
2920 }
2921
2922 else
2923 {
2924 ereport(ERROR,
2925 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
2926 errmsg("Unknown return code getting results from query %s", query)
2927 ));
2928 }
2929
2930 cleanup:
2931 #ifdef DEBUG
2932 ereport(NOTICE,
2933 (errmsg("----> finishing tdsExecuteQuery")
2934 ));
2935 #endif
2936
2937 return (erc == SUCCEED);
2938 }
2939
2940 #ifdef IMPORT_API
2941
2942 static List *
tdsImportSqlServerSchema(ImportForeignSchemaStmt * stmt,DBPROCESS * dbproc,TdsFdwOptionSet option_set,bool import_default,bool import_not_null)2943 tdsImportSqlServerSchema(ImportForeignSchemaStmt *stmt, DBPROCESS *dbproc,
2944 TdsFdwOptionSet option_set,
2945 bool import_default, bool import_not_null)
2946 {
2947 List *commands = NIL;
2948 ListCell *lc;
2949 StringInfoData buf;
2950
2951 RETCODE erc;
2952 int ret_code;
2953
2954 initStringInfo(&buf);
2955
2956 /* Check that the schema really exists */
2957 appendStringInfoString(&buf, "SELECT schema_name FROM INFORMATION_SCHEMA.SCHEMATA WHERE schema_name = ");
2958 deparseStringLiteral(&buf, stmt->remote_schema);
2959
2960 if (!tdsExecuteQuery(buf.data, dbproc))
2961 ereport(ERROR,
2962 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
2963 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
2964 stmt->remote_schema, option_set.servername)));
2965 else
2966 /* Process results */
2967 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
2968 {
2969 /* Do nothing */
2970 }
2971 resetStringInfo(&buf);
2972
2973 /*
2974 * Fetch all table data from this schema, possibly restricted by
2975 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
2976 * to EXCEPT/LIMIT TO here, because the core code will filter the
2977 * statements we return according to those lists anyway. But it
2978 * should save a few cycles to not process excluded tables in the
2979 * first place.)
2980 */
2981 appendStringInfoString(&buf,
2982 "SELECT t.table_name,"
2983 " c.column_name, "
2984 " c.data_type, "
2985 " c.column_default, "
2986 " c.is_nullable, "
2987 " c.character_maximum_length, "
2988 " c.numeric_precision, "
2989 " c.numeric_precision_radix, "
2990 " c.numeric_scale, "
2991 " c.datetime_precision "
2992 "FROM INFORMATION_SCHEMA.TABLES t "
2993 " LEFT JOIN INFORMATION_SCHEMA.COLUMNS c ON "
2994 " t.table_schema = c.table_schema "
2995 " AND t.table_name = c.table_name "
2996 "WHERE t.table_type IN ('BASE TABLE','VIEW') "
2997 " AND t.table_schema = ");
2998 deparseStringLiteral(&buf, stmt->remote_schema);
2999
3000 /* Apply restrictions for LIMIT TO and EXCEPT */
3001 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3002 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3003 {
3004 bool first_item = true;
3005
3006 appendStringInfoString(&buf, " AND t.table_name ");
3007 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3008 appendStringInfoString(&buf, "NOT ");
3009 appendStringInfoString(&buf, "IN (");
3010
3011 /* Append list of table names within IN clause */
3012 foreach(lc, stmt->table_list)
3013 {
3014 RangeVar *rv = (RangeVar *) lfirst(lc);
3015
3016 if (first_item)
3017 first_item = false;
3018 else
3019 appendStringInfoString(&buf, ", ");
3020 deparseStringLiteral(&buf, rv->relname);
3021 }
3022 appendStringInfoChar(&buf, ')');
3023 }
3024
3025 /* Append ORDER BY at the end of query to ensure output ordering */
3026 appendStringInfoString(&buf, " ORDER BY t.table_name, c.ordinal_position");
3027
3028 if (tdsExecuteQuery(buf.data, dbproc))
3029 {
3030 char table_name[255],
3031 prev_table[255],
3032 column_name[255],
3033 data_type[255],
3034 column_default[4000],
3035 is_nullable[10];
3036 int char_len,
3037 numeric_precision,
3038 numeric_precision_radix,
3039 numeric_scale,
3040 datetime_precision;
3041 bool first_column = true;
3042 bool first_table = true;
3043
3044 /* Check if there's rows in resultset and if not do not execute the rest */
3045 erc = dbrows(dbproc);
3046
3047 if (erc == FAIL)
3048 {
3049 ereport(NOTICE,
3050 (errmsg("tds_fdw: No table were found in schema %s", stmt->remote_schema))
3051 );
3052 return commands;
3053 }
3054
3055 prev_table[0] = '\0';
3056
3057 erc = dbbind(dbproc, 1, NTBSTRINGBIND, sizeof(table_name), (BYTE *) table_name);
3058 if (erc == FAIL)
3059 {
3060 ereport(ERROR,
3061 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3062 errmsg("Failed to bind results for column \"table_name\" to a variable.")
3063 ));
3064 }
3065
3066 erc = dbbind(dbproc, 2, NTBSTRINGBIND, sizeof(column_name), (BYTE *) column_name);
3067 if (erc == FAIL)
3068 {
3069 ereport(ERROR,
3070 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3071 errmsg("Failed to bind results for column \"column_name\" to a variable.")
3072 ));
3073 }
3074
3075 erc = dbbind(dbproc, 3, NTBSTRINGBIND, sizeof(data_type), (BYTE *) data_type);
3076 if (erc == FAIL)
3077 {
3078 ereport(ERROR,
3079 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3080 errmsg("Failed to bind results for column \"data_type\" to a variable.")
3081 ));
3082 }
3083
3084 erc = dbbind(dbproc, 4, NTBSTRINGBIND, sizeof(column_default), (BYTE *) column_default);
3085 if (erc == FAIL)
3086 {
3087 ereport(ERROR,
3088 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3089 errmsg("Failed to bind results for column \"column_default\" to a variable.")
3090 ));
3091 }
3092
3093 erc = dbbind(dbproc, 5, NTBSTRINGBIND, sizeof(is_nullable), (BYTE *) is_nullable);
3094 if (erc == FAIL)
3095 {
3096 ereport(ERROR,
3097 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3098 errmsg("Failed to bind results for column \"is_nullable\" to a variable.")
3099 ));
3100 }
3101
3102 erc = dbbind(dbproc, 6, INTBIND, sizeof(int), (BYTE *) &char_len);
3103 if (erc == FAIL)
3104 {
3105 ereport(ERROR,
3106 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3107 errmsg("Failed to bind results for column \"character_maximum_length\" to a variable.")
3108 ));
3109 }
3110
3111 erc = dbbind(dbproc, 7, INTBIND, sizeof(int), (BYTE *) &numeric_precision);
3112 if (erc == FAIL)
3113 {
3114 ereport(ERROR,
3115 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3116 errmsg("Failed to bind results for column \"numeric_precision\" to a variable.")
3117 ));
3118 }
3119
3120 erc = dbbind(dbproc, 8, INTBIND, sizeof(int), (BYTE *) &numeric_precision_radix);
3121 if (erc == FAIL)
3122 {
3123 ereport(ERROR,
3124 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3125 errmsg("Failed to bind results for column \"numeric_precision_radix\" to a variable.")
3126 ));
3127 }
3128
3129 erc = dbbind(dbproc, 9, INTBIND, sizeof(int), (BYTE *) &numeric_scale);
3130 if (erc == FAIL)
3131 {
3132 ereport(ERROR,
3133 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3134 errmsg("Failed to bind results for column \"numeric_scale\" to a variable.")
3135 ));
3136 }
3137
3138 erc = dbbind(dbproc, 10, INTBIND, sizeof(int), (BYTE *) &datetime_precision);
3139 if (erc == FAIL)
3140 {
3141 ereport(ERROR,
3142 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3143 errmsg("Failed to bind results for column \"datetime_precision\" to a variable.")
3144 ));
3145 }
3146
3147 /* Process results */
3148 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
3149 {
3150 switch (ret_code)
3151 {
3152 case REG_ROW:
3153 ereport(DEBUG3,
3154 (errmsg("tds_fdw: column \"%s.%s\"", table_name, column_name)
3155 ));
3156
3157 /* Build query for the new table */
3158 if (first_table || strcmp(prev_table, table_name) != 0)
3159 {
3160 if (!first_table)
3161 {
3162 /*
3163 * Add server name and table-level options. We specify remote
3164 * schema and table name as options (the latter to ensure that
3165 * renaming the foreign table doesn't break the association).
3166 */
3167 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3168 quote_identifier(stmt->server_name));
3169
3170 appendStringInfoString(&buf, "schema_name ");
3171 deparseStringLiteral(&buf, stmt->remote_schema);
3172 appendStringInfoString(&buf, ", table_name ");
3173 deparseStringLiteral(&buf, prev_table);
3174
3175 appendStringInfoString(&buf, ");");
3176
3177 commands = lappend(commands, pstrdup(buf.data));
3178 }
3179
3180 resetStringInfo(&buf);
3181 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3182 quote_identifier(table_name));
3183 first_column = true;
3184 first_table = false;
3185 }
3186
3187 if (first_column)
3188 first_column = false;
3189 else
3190 appendStringInfoString(&buf, ",\n");
3191
3192 /* Print column name */
3193 appendStringInfo(&buf, " %s",
3194 quote_identifier(column_name));
3195
3196 /* Print column type */
3197
3198 /* Numeric types */
3199 if (strcmp(data_type, "bit") == 0 ||
3200 strcmp(data_type, "smallint") == 0 ||
3201 strcmp(data_type, "tinyint") == 0)
3202 appendStringInfoString(&buf, " smallint");
3203 else if (strcmp(data_type, "int") == 0)
3204 appendStringInfoString(&buf, " integer");
3205 else if (strcmp(data_type, "bigint") == 0)
3206 appendStringInfoString(&buf, " bigint");
3207 else if (strcmp(data_type, "decimal") == 0)
3208 {
3209 if (numeric_scale == 0)
3210 appendStringInfo(&buf, " decimal(%d)", numeric_precision);
3211 else
3212 appendStringInfo(&buf, " decimal(%d, %d)",
3213 numeric_precision, numeric_scale);
3214 }
3215 else if (strcmp(data_type, "numeric") == 0)
3216 {
3217 if (numeric_scale == 0)
3218 appendStringInfo(&buf, " numeric(%d)", numeric_precision);
3219 else
3220 appendStringInfo(&buf, " numeric(%d, %d)",
3221 numeric_precision, numeric_scale);
3222 }
3223 else if (strcmp(data_type, "money") == 0 ||
3224 strcmp(data_type, "smallmoney") == 0)
3225 appendStringInfoString(&buf, " money");
3226
3227 /* Floating-point types */
3228 else if (strcmp(data_type, "float") == 0)
3229 appendStringInfo(&buf, " float(%d)", numeric_precision);
3230 else if (strcmp(data_type, "real") == 0)
3231 appendStringInfoString(&buf, " real");
3232
3233 /* Date/type types */
3234 else if (strcmp(data_type, "date") == 0)
3235 appendStringInfoString(&buf, " date");
3236 else if (strcmp(data_type, "datetime") == 0 ||
3237 strcmp(data_type, "datetime2") == 0 ||
3238 strcmp(data_type, "smalldatetime") == 0)
3239 appendStringInfo(&buf, " timestamp(%d) without time zone", (datetime_precision > 6) ? 6 : datetime_precision);
3240 else if (strcmp(data_type, "datetimeoffset") == 0)
3241 appendStringInfo(&buf, " timestamp(%d) with time zone", (datetime_precision > 6) ? 6 : datetime_precision);
3242 else if (strcmp(data_type, "time") == 0)
3243 appendStringInfoString(&buf, " time");
3244
3245 /* Character types */
3246 else if (strcmp(data_type, "char") == 0 ||
3247 strcmp(data_type, "nchar") == 0)
3248 appendStringInfo(&buf, " char(%d)", char_len);
3249 else if (strcmp(data_type, "varchar") == 0 ||
3250 strcmp(data_type, "nvarchar") == 0)
3251 {
3252 if (char_len == -1)
3253 appendStringInfoString(&buf, " text");
3254 else
3255 appendStringInfo(&buf, " varchar(%d)", char_len);
3256 }
3257 else if (strcmp(data_type, "text") == 0 ||
3258 strcmp(data_type, "ntext") == 0)
3259 appendStringInfoString(&buf, " text");
3260
3261 /* Binary types */
3262 else if (strcmp(data_type, "binary") == 0 ||
3263 strcmp(data_type, "varbinary") == 0 ||
3264 strcmp(data_type, "image") == 0 ||
3265 strcmp(data_type, "rowversion") == 0 ||
3266 strcmp(data_type, "timestamp") == 0 )
3267 appendStringInfoString(&buf, " bytea");
3268
3269 /* Other types */
3270 else if (strcmp(data_type, "xml") == 0)
3271 appendStringInfoString(&buf, " xml");
3272 else if (strcmp(data_type, "uniqueidentifier") == 0)
3273 appendStringInfoString(&buf, " uuid");
3274 else
3275 {
3276 ereport(DEBUG3,
3277 (errmsg("tds_fdw: column \"%s\" of table \"%s\" has an untranslatable data type", column_name, table_name)
3278 ));
3279 appendStringInfoString(&buf, " text");
3280 }
3281
3282 /*
3283 * Add column_name option so that renaming the foreign table's
3284 * column doesn't break the association to the underlying
3285 * column.
3286 */
3287 appendStringInfoString(&buf, " OPTIONS (column_name ");
3288 deparseStringLiteral(&buf, column_name);
3289 appendStringInfoChar(&buf, ')');
3290
3291 /* Add DEFAULT if needed */
3292 if (import_default && column_default[0] != '\0')
3293 appendStringInfo(&buf, " DEFAULT %s", column_default);
3294
3295 /* Add NOT NULL if needed */
3296 if (import_not_null && strcmp(is_nullable, "NO") == 0)
3297 appendStringInfoString(&buf, " NOT NULL");
3298
3299 strcpy(prev_table, table_name);
3300
3301 break;
3302
3303 case BUF_FULL:
3304 ereport(ERROR,
3305 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
3306 errmsg("Buffer filled up while getting plan for query")
3307 ));
3308
3309 case FAIL:
3310 ereport(ERROR,
3311 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3312 errmsg("Failed to get row while getting plan for query")
3313 ));
3314
3315 default:
3316 ereport(ERROR,
3317 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3318 errmsg("Failed to get plan for query. Unknown return code.")
3319 ));
3320 }
3321 }
3322
3323 /*
3324 * Add server name and table-level options. We specify remote
3325 * schema and table name as options (the latter to ensure that
3326 * renaming the foreign table doesn't break the association).
3327 */
3328 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3329 quote_identifier(stmt->server_name));
3330
3331 appendStringInfoString(&buf, "schema_name ");
3332 deparseStringLiteral(&buf, stmt->remote_schema);
3333 appendStringInfoString(&buf, ", table_name ");
3334 deparseStringLiteral(&buf, prev_table);
3335
3336 appendStringInfoString(&buf, ");");
3337
3338 commands = lappend(commands, pstrdup(buf.data));
3339 }
3340
3341 return commands;
3342 }
3343
3344 static List *
tdsImportSybaseSchema(ImportForeignSchemaStmt * stmt,DBPROCESS * dbproc,TdsFdwOptionSet option_set,bool import_default,bool import_not_null)3345 tdsImportSybaseSchema(ImportForeignSchemaStmt *stmt, DBPROCESS *dbproc,
3346 TdsFdwOptionSet option_set,
3347 bool import_default, bool import_not_null)
3348 {
3349 List *commands = NIL;
3350 ListCell *lc;
3351 StringInfoData buf;
3352
3353 RETCODE erc;
3354 int ret_code;
3355
3356 initStringInfo(&buf);
3357
3358 /* Check that the schema really exists */
3359 appendStringInfoString(&buf, "SELECT name FROM sysusers WHERE name = ");
3360 deparseStringLiteral(&buf, stmt->remote_schema);
3361
3362 if (!tdsExecuteQuery(buf.data, dbproc))
3363 ereport(ERROR,
3364 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3365 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3366 stmt->remote_schema, option_set.servername)));
3367 else
3368 /* Process results */
3369 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
3370 {
3371 /* Do nothing */
3372 }
3373 resetStringInfo(&buf);
3374
3375 /*
3376 * Fetch all table data from this schema, possibly restricted by
3377 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3378 * to EXCEPT/LIMIT TO here, because the core code will filter the
3379 * statements we return according to those lists anyway. But it
3380 * should save a few cycles to not process excluded tables in the
3381 * first place.)
3382 */
3383 appendStringInfoString(&buf,
3384 "SELECT so.name AS table_name,"
3385 " sc.name AS column_name, "
3386 " st.name AS data_type, "
3387 " SUBSTRING(sm.text, 10, 255) AS column_default, "
3388 " CASE (sc.status & 0x08) "
3389 " WHEN 8 THEN 'YES' ELSE 'NO' "
3390 " END AS is_nullable, "
3391 " sc.length, "
3392 " sc.prec, "
3393 " sc.scale "
3394 "FROM sysobjects so "
3395 " INNER JOIN sysusers su ON su.uid = so.uid"
3396 " LEFT JOIN syscolumns sc ON sc.id = so.id "
3397 " LEFT JOIN systypes st ON st.usertype = sc.usertype "
3398 " LEFT JOIN syscomments sm ON sm.id = sc.cdefault "
3399 "WHERE so.type = 'U' AND su.name = ");
3400
3401 deparseStringLiteral(&buf, stmt->remote_schema);
3402
3403 /* Apply restrictions for LIMIT TO and EXCEPT */
3404 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3405 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3406 {
3407 bool first_item = true;
3408
3409 appendStringInfoString(&buf, " AND so.name ");
3410 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3411 appendStringInfoString(&buf, "NOT ");
3412 appendStringInfoString(&buf, "IN (");
3413
3414 /* Append list of table names within IN clause */
3415 foreach(lc, stmt->table_list)
3416 {
3417 RangeVar *rv = (RangeVar *) lfirst(lc);
3418
3419 if (first_item)
3420 first_item = false;
3421 else
3422 appendStringInfoString(&buf, ", ");
3423 deparseStringLiteral(&buf, rv->relname);
3424 }
3425 appendStringInfoChar(&buf, ')');
3426 }
3427
3428 /* Append ORDER BY at the end of query to ensure output ordering */
3429 appendStringInfoString(&buf, " ORDER BY so.name, sc.colid");
3430
3431 if (tdsExecuteQuery(buf.data, dbproc))
3432 {
3433 char table_name[255],
3434 prev_table[255],
3435 column_name[255],
3436 data_type[255],
3437 column_default[4000],
3438 is_nullable[10];
3439 int char_len,
3440 numeric_precision,
3441 numeric_scale;
3442 bool first_column = true;
3443 bool first_table = true;
3444 /* Check if there's rows in resultset and if not do not execute the rest */
3445 erc = dbrows(dbproc);
3446
3447 if (erc == FAIL)
3448 {
3449 ereport(NOTICE,
3450 (errmsg("tds_fdw: No table were found in schema %s", stmt->remote_schema))
3451 );
3452 return commands;
3453 }
3454
3455 prev_table[0] = '\0';
3456
3457 erc = dbbind(dbproc, 1, NTBSTRINGBIND, sizeof(table_name), (BYTE *) table_name);
3458 if (erc == FAIL)
3459 {
3460 ereport(ERROR,
3461 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3462 errmsg("Failed to bind results for column \"table_name\" to a variable.")
3463 ));
3464 }
3465
3466 erc = dbbind(dbproc, 2, NTBSTRINGBIND, sizeof(column_name), (BYTE *) column_name);
3467 if (erc == FAIL)
3468 {
3469 ereport(ERROR,
3470 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3471 errmsg("Failed to bind results for column \"column_name\" to a variable.")
3472 ));
3473 }
3474
3475 erc = dbbind(dbproc, 3, NTBSTRINGBIND, sizeof(data_type), (BYTE *) data_type);
3476 if (erc == FAIL)
3477 {
3478 ereport(ERROR,
3479 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3480 errmsg("Failed to bind results for column \"data_type\" to a variable.")
3481 ));
3482 }
3483
3484 erc = dbbind(dbproc, 4, NTBSTRINGBIND, sizeof(column_default), (BYTE *) column_default);
3485 if (erc == FAIL)
3486 {
3487 ereport(ERROR,
3488 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3489 errmsg("Failed to bind results for column \"column_default\" to a variable.")
3490 ));
3491 }
3492
3493 erc = dbbind(dbproc, 5, NTBSTRINGBIND, sizeof(is_nullable), (BYTE *) is_nullable);
3494 if (erc == FAIL)
3495 {
3496 ereport(ERROR,
3497 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3498 errmsg("Failed to bind results for column \"is_nullable\" to a variable.")
3499 ));
3500 }
3501
3502 erc = dbbind(dbproc, 6, INTBIND, sizeof(int), (BYTE *) &char_len);
3503 if (erc == FAIL)
3504 {
3505 ereport(ERROR,
3506 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3507 errmsg("Failed to bind results for column \"length\" to a variable.")
3508 ));
3509 }
3510
3511 erc = dbbind(dbproc, 7, INTBIND, sizeof(int), (BYTE *) &numeric_precision);
3512 if (erc == FAIL)
3513 {
3514 ereport(ERROR,
3515 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3516 errmsg("Failed to bind results for column \"prec\" to a variable.")
3517 ));
3518 }
3519
3520 erc = dbbind(dbproc, 8, INTBIND, sizeof(int), (BYTE *) &numeric_scale);
3521 if (erc == FAIL)
3522 {
3523 ereport(ERROR,
3524 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3525 errmsg("Failed to bind results for column \"scale\" to a variable.")
3526 ));
3527 }
3528
3529 /* Process results */
3530 while ((ret_code = dbnextrow(dbproc)) != NO_MORE_ROWS)
3531 {
3532 switch (ret_code)
3533 {
3534 case REG_ROW:
3535 ereport(DEBUG3,
3536 (errmsg("tds_fdw: column \"%s.%s\"", table_name, column_name)
3537 ));
3538
3539 /* Build query for the new table */
3540 if (first_table || strcmp(prev_table, table_name) != 0)
3541 {
3542 if (!first_table)
3543 {
3544 /*
3545 * Add server name and table-level options. We specify remote
3546 * schema and table name as options (the latter to ensure that
3547 * renaming the foreign table doesn't break the association).
3548 */
3549 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3550 quote_identifier(stmt->server_name));
3551
3552 appendStringInfoString(&buf, "schema_name ");
3553 deparseStringLiteral(&buf, stmt->remote_schema);
3554 appendStringInfoString(&buf, ", table_name ");
3555 deparseStringLiteral(&buf, prev_table);
3556
3557 appendStringInfoString(&buf, ");");
3558
3559 commands = lappend(commands, pstrdup(buf.data));
3560 }
3561
3562 resetStringInfo(&buf);
3563 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3564 quote_identifier(table_name));
3565 first_column = true;
3566 first_table = false;
3567 }
3568
3569 if (first_column)
3570 first_column = false;
3571 else
3572 appendStringInfoString(&buf, ",\n");
3573
3574 /* Print column name */
3575 appendStringInfo(&buf, " %s",
3576 quote_identifier(column_name));
3577
3578 /* Print column type */
3579
3580 /* Numeric types */
3581 if (strcmp(data_type, "bit") == 0 ||
3582 strcmp(data_type, "smallint") == 0 ||
3583 strcmp(data_type, "tinyint") == 0)
3584 appendStringInfoString(&buf, " smallint");
3585 else if (strcmp(data_type, "int") == 0)
3586 appendStringInfoString(&buf, " integer");
3587 else if (strcmp(data_type, "bigint") == 0)
3588 appendStringInfoString(&buf, " bigint");
3589 else if (strcmp(data_type, "decimal") == 0)
3590 {
3591 if (numeric_scale == 0)
3592 appendStringInfo(&buf, " decimal(%d)", numeric_precision);
3593 else
3594 appendStringInfo(&buf, " decimal(%d, %d)",
3595 numeric_precision, numeric_scale);
3596 }
3597 else if (strcmp(data_type, "numeric") == 0)
3598 {
3599 if (numeric_scale == 0)
3600 appendStringInfo(&buf, " numeric(%d)", numeric_precision);
3601 else
3602 appendStringInfo(&buf, " numeric(%d, %d)",
3603 numeric_precision, numeric_scale);
3604 }
3605 else if (strcmp(data_type, "money") == 0 ||
3606 strcmp(data_type, "smallmoney") == 0)
3607 appendStringInfoString(&buf, " money");
3608
3609 /* Floating-point types */
3610 else if (strcmp(data_type, "float") == 0)
3611 appendStringInfo(&buf, " float(%d)", numeric_precision);
3612 else if (strcmp(data_type, "real") == 0)
3613 appendStringInfoString(&buf, " real");
3614
3615 /* Date/type types */
3616 else if (strcmp(data_type, "date") == 0)
3617 appendStringInfoString(&buf, " date");
3618 else if (strcmp(data_type, "datetime") == 0 ||
3619 strcmp(data_type, "smalldatetime") == 0 ||
3620 strcmp(data_type, "bigdatetime") == 0)
3621 appendStringInfoString(&buf, " timestamp without time zone");
3622 else if (strcmp(data_type, "time") == 0 ||
3623 strcmp(data_type, "bigtime") == 0)
3624 appendStringInfoString(&buf, " time");
3625
3626 /* Character types */
3627 else if (strcmp(data_type, "char") == 0 ||
3628 strcmp(data_type, "nchar") == 0 ||
3629 strcmp(data_type, "unichar") == 0)
3630 appendStringInfo(&buf, " char(%d)", char_len);
3631 else if (strcmp(data_type, "varchar") == 0 ||
3632 strcmp(data_type, "nvarchar") == 0 ||
3633 strcmp(data_type, "univarchar") == 0)
3634 {
3635 if (char_len == -1)
3636 appendStringInfoString(&buf, " text");
3637 else
3638 appendStringInfo(&buf, " varchar(%d)", char_len);
3639 }
3640 else if (strcmp(data_type, "text") == 0 ||
3641 strcmp(data_type, "unitext") == 0)
3642 appendStringInfoString(&buf, " text");
3643
3644 /* Binary types */
3645 else if (strcmp(data_type, "binary") == 0 ||
3646 strcmp(data_type, "varbinary") == 0 ||
3647 strcmp(data_type, "image") == 0 ||
3648 strcmp(data_type, "timestamp") == 0 )
3649 appendStringInfoString(&buf, " bytea");
3650
3651 /* Other types */
3652 else if (strcmp(data_type, "xml") == 0)
3653 appendStringInfoString(&buf, " xml");
3654 else
3655 {
3656 ereport(DEBUG3,
3657 (errmsg("tds_fdw: column \"%s\" of table \"%s\" has an untranslatable data type", column_name, table_name)
3658 ));
3659 appendStringInfoString(&buf, " text");
3660 }
3661
3662 /*
3663 * Add column_name option so that renaming the foreign table's
3664 * column doesn't break the association to the underlying
3665 * column.
3666 */
3667 appendStringInfoString(&buf, " OPTIONS (column_name ");
3668 deparseStringLiteral(&buf, column_name);
3669 appendStringInfoChar(&buf, ')');
3670
3671 /* Add DEFAULT if needed */
3672 if (import_default && column_default[0] != '\0')
3673 appendStringInfo(&buf, " DEFAULT %s", column_default);
3674
3675 /* Add NOT NULL if needed */
3676 if (import_not_null && strcmp(is_nullable, "NO") == 0)
3677 appendStringInfoString(&buf, " NOT NULL");
3678
3679 strcpy(prev_table, table_name);
3680
3681 break;
3682
3683 case BUF_FULL:
3684 ereport(ERROR,
3685 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
3686 errmsg("Buffer filled up while getting plan for query")
3687 ));
3688
3689 case FAIL:
3690 ereport(ERROR,
3691 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3692 errmsg("Failed to get row while getting plan for query")
3693 ));
3694
3695 default:
3696 ereport(ERROR,
3697 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3698 errmsg("Failed to get plan for query. Unknown return code.")
3699 ));
3700 }
3701 }
3702
3703 /*
3704 * Add server name and table-level options. We specify remote
3705 * schema and table name as options (the latter to ensure that
3706 * renaming the foreign table doesn't break the association).
3707 */
3708 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3709 quote_identifier(stmt->server_name));
3710
3711 appendStringInfoString(&buf, "schema_name ");
3712 deparseStringLiteral(&buf, stmt->remote_schema);
3713 appendStringInfoString(&buf, ", table_name ");
3714 deparseStringLiteral(&buf, prev_table);
3715
3716 appendStringInfoString(&buf, ");");
3717
3718 commands = lappend(commands, pstrdup(buf.data));
3719 }
3720
3721 return commands;
3722 }
3723
tdsImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)3724 List *tdsImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
3725 {
3726 TdsFdwOptionSet option_set;
3727 List *commands = NIL;
3728 bool import_default = false;
3729 bool import_not_null = true;
3730 bool is_sql_server = true;
3731 StringInfoData buf;
3732 ListCell *lc;
3733
3734 LOGINREC *login;
3735 DBPROCESS *dbproc;
3736
3737 #ifdef DEBUG
3738 ereport(NOTICE,
3739 (errmsg("----> starting tdsImportForeignSchema")
3740 ));
3741 #endif
3742
3743 /* Parse statement options */
3744 foreach(lc, stmt->options)
3745 {
3746 DefElem *def = (DefElem *) lfirst(lc);
3747
3748 if (strcmp(def->defname, "import_default") == 0)
3749 import_default = defGetBoolean(def);
3750 else if (strcmp(def->defname, "import_not_null") == 0)
3751 import_not_null = defGetBoolean(def);
3752 else
3753 ereport(ERROR,
3754 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3755 errmsg("invalid option \"%s\"", def->defname)));
3756 }
3757
3758 tdsGetForeignServerOptionsFromCatalog(serverOid, &option_set);
3759
3760 ereport(DEBUG3,
3761 (errmsg("tds_fdw: Initiating DB-Library")
3762 ));
3763
3764 if (dbinit() == FAIL)
3765 {
3766 ereport(ERROR,
3767 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
3768 errmsg("Failed to initialize DB-Library environment")
3769 ));
3770 goto cleanup_before_init;
3771 }
3772
3773 dberrhandle(tds_err_handler);
3774
3775 if (option_set.msg_handler)
3776 {
3777 if (strcmp(option_set.msg_handler, "notice") == 0)
3778 {
3779 dbmsghandle(tds_notice_msg_handler);
3780 }
3781
3782 else if (strcmp(option_set.msg_handler, "blackhole") == 0)
3783 {
3784 dbmsghandle(tds_blackhole_msg_handler);
3785 }
3786
3787 else
3788 {
3789 ereport(ERROR,
3790 (errcode(ERRCODE_SYNTAX_ERROR),
3791 errmsg("Unknown msg handler: %s.", option_set.msg_handler)
3792 ));
3793 }
3794 }
3795
3796 ereport(DEBUG3,
3797 (errmsg("tds_fdw: Getting login structure")
3798 ));
3799
3800 if ((login = dblogin()) == NULL)
3801 {
3802 ereport(ERROR,
3803 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
3804 errmsg("Failed to initialize DB-Library login structure")
3805 ));
3806 goto cleanup_before_login;
3807 }
3808
3809 if (tdsSetupConnection(&option_set, login, &dbproc) != 0)
3810 {
3811 goto cleanup;
3812 }
3813
3814 /* Create workspace for strings */
3815 initStringInfo(&buf);
3816
3817 /* Determine server: is MS Sql Server or Sybase */
3818 appendStringInfoString(&buf, "SELECT CHARINDEX('Microsoft', @@version) AS is_sql_server");
3819
3820 if (!tdsExecuteQuery(buf.data, dbproc))
3821 ereport(ERROR,
3822 (errcode(ERRCODE_FDW_ERROR),
3823 errmsg("Failed to check server version")
3824 ));
3825 else
3826 {
3827 RETCODE erc;
3828 int ret_code,
3829 is_sql_server_pos;
3830
3831 erc = dbbind(dbproc, 1, INTBIND, sizeof(int), (BYTE *) &is_sql_server_pos);
3832 if (erc == FAIL)
3833 {
3834 ereport(ERROR,
3835 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3836 errmsg("Failed to bind results for column \"is_sql_server\" to a variable.")
3837 ));
3838 }
3839
3840 /* Process result */
3841 ret_code = dbnextrow(dbproc);
3842 if (ret_code == NO_MORE_ROWS)
3843 ereport(ERROR,
3844 (errcode(ERRCODE_FDW_ERROR),
3845 errmsg("Failed to check server version")
3846 ));
3847
3848 switch (ret_code)
3849 {
3850 case REG_ROW:
3851 ereport(DEBUG3,
3852 (errmsg("tds_fdw: is_sql_server %d", is_sql_server_pos)
3853 ));
3854
3855 if (is_sql_server_pos == 0)
3856 is_sql_server = false;
3857
3858 break;
3859
3860 case BUF_FULL:
3861 ereport(ERROR,
3862 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
3863 errmsg("Buffer filled up while getting plan for query")
3864 ));
3865
3866 case FAIL:
3867 ereport(ERROR,
3868 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3869 errmsg("Failed to get row while getting plan for query")
3870 ));
3871
3872 default:
3873 ereport(ERROR,
3874 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3875 errmsg("Failed to get plan for query. Unknown return code.")
3876 ));
3877 }
3878 }
3879
3880 if (is_sql_server)
3881 commands = tdsImportSqlServerSchema(stmt, dbproc, option_set,
3882 import_default, import_not_null);
3883 else
3884 commands = tdsImportSybaseSchema(stmt, dbproc, option_set,
3885 import_default, import_not_null);
3886
3887 cleanup:
3888 dbclose(dbproc);
3889 dbloginfree(login);
3890
3891 cleanup_before_login:
3892 dbexit();
3893
3894 cleanup_before_init:
3895 ;
3896
3897 #ifdef DEBUG
3898 ereport(NOTICE,
3899 (errmsg("----> finishing tdsImportForeignSchema")
3900 ));
3901 #endif
3902
3903 return commands;
3904 }
3905 #endif /* IMPORT_API */
3906
tds_err_msg(int severity,int dberr,int oserr,char * dberrstr,char * oserrstr)3907 char *tds_err_msg(int severity, int dberr, int oserr, char *dberrstr, char *oserrstr)
3908 {
3909 StringInfoData buf;
3910
3911 initStringInfo(&buf);
3912 appendStringInfo(
3913 &buf,
3914 "DB-Library error: DB #: %i, DB Msg: %s, OS #: %i, OS Msg: %s, Level: %i",
3915 dberr,
3916 dberrstr ? dberrstr : "",
3917 oserr,
3918 oserrstr ? oserrstr : "",
3919 severity
3920 );
3921
3922 return buf.data;
3923 }
3924
tds_err_capture(DBPROCESS * dbproc,int severity,int dberr,int oserr,char * dberrstr,char * oserrstr)3925 int tds_err_capture(DBPROCESS *dbproc, int severity, int dberr, int oserr, char *dberrstr, char *oserrstr)
3926 {
3927 last_error_message = tds_err_msg(severity, dberr, oserr, dberrstr, oserrstr);
3928
3929 return INT_CANCEL;
3930 }
3931
tds_err_handler(DBPROCESS * dbproc,int severity,int dberr,int oserr,char * dberrstr,char * oserrstr)3932 int tds_err_handler(DBPROCESS *dbproc, int severity, int dberr, int oserr, char *dberrstr, char *oserrstr)
3933 {
3934 #ifdef DEBUG
3935 ereport(NOTICE,
3936 (errmsg("----> starting tds_err_handler")
3937 ));
3938 #endif
3939
3940 ereport(ERROR,
3941 (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
3942 errmsg("%s", tds_err_msg(severity, dberr, oserr, dberrstr, oserrstr))
3943 ));
3944
3945 return INT_CANCEL;
3946 }
3947
tds_notice_msg_handler(DBPROCESS * dbproc,DBINT msgno,int msgstate,int severity,char * msgtext,char * svr_name,char * proc_name,int line)3948 int tds_notice_msg_handler(DBPROCESS *dbproc, DBINT msgno, int msgstate, int severity, char *msgtext, char *svr_name, char *proc_name, int line)
3949 {
3950 #ifdef DEBUG
3951 ereport(NOTICE,
3952 (errmsg("----> starting tds_notice_msg_handler")
3953 ));
3954 #endif
3955
3956 ereport(NOTICE,
3957 (errmsg("DB-Library notice: Msg #: %ld, Msg state: %i, Msg: %s, Server: %s, Process: %s, Line: %i, Level: %i",
3958 (long)msgno, msgstate, msgtext, svr_name, proc_name, line, severity)
3959 ));
3960
3961 #ifdef DEBUG
3962 ereport(NOTICE,
3963 (errmsg("----> finishing tds_notice_msg_handler")
3964 ));
3965 #endif
3966
3967 return 0;
3968 }
3969
tds_blackhole_msg_handler(DBPROCESS * dbproc,DBINT msgno,int msgstate,int severity,char * msgtext,char * svr_name,char * proc_name,int line)3970 int tds_blackhole_msg_handler(DBPROCESS *dbproc, DBINT msgno, int msgstate, int severity, char *msgtext, char *svr_name, char *proc_name, int line)
3971 {
3972 #ifdef DEBUG
3973 ereport(NOTICE,
3974 (errmsg("----> starting tds_blackhole_msg_handler")
3975 ));
3976 #endif
3977
3978 #ifdef DEBUG
3979 ereport(NOTICE,
3980 (errmsg("----> finishing tds_blackhole_msg_handler")
3981 ));
3982 #endif
3983
3984 return 0;
3985 }
3986