1 /*
2  * pg_partman_bgw.c
3  *
4  * A background worker process for the pg_partman extension to allow
5  * partition maintenance to be scheduled and run within the database
6  * itself without required a third-party scheduler (ex. cron)
7  *
8  */
9 
10 #include "postgres.h"
11 
12 /* These are always necessary for a bgworker */
13 #include "miscadmin.h"
14 #include "postmaster/bgworker.h"
15 #include "storage/ipc.h"
16 #include "storage/latch.h"
17 #include "storage/lwlock.h"
18 #include "storage/proc.h"
19 #include "storage/shmem.h"
20 
21 /* these headers are used by this particular worker's code */
22 #include "access/xact.h"
23 #include "executor/spi.h"
24 #include "fmgr.h"
25 #include "lib/stringinfo.h"
26 #include "pgstat.h"
27 #include "utils/builtins.h"
28 #include "utils/snapmgr.h"
29 #include "tcop/utility.h"
30 
31 #if (PG_VERSION_NUM >= 100000)
32 #include "utils/varlena.h"
33 #endif
34 
35 
36 PG_MODULE_MAGIC;
37 
38 void        _PG_init(void);
39 void        pg_partman_bgw_main(Datum);
40 void        pg_partman_bgw_run_maint(Datum);
41 
42 /* flags set by signal handlers */
43 static volatile sig_atomic_t got_sighup = false;
44 static volatile sig_atomic_t got_sigterm = false;
45 
46 /* GUC variables */
47 static int pg_partman_bgw_interval = 3600; // Default hourly
48 static char *pg_partman_bgw_role = "postgres"; // Default to postgres role
49 
50 // Do not analyze by default on PG11+
51 #if (PG_VERSION_NUM >= 110000)
52 static char *pg_partman_bgw_analyze = "off";
53 #else
54 static char *pg_partman_bgw_analyze = "on";
55 #endif
56 
57 static char *pg_partman_bgw_jobmon = "on";
58 static char *pg_partman_bgw_dbname = NULL;
59 
60 #if (PG_VERSION_NUM < 100500)
61 static bool (*split_function_ptr)(char *, char, List **) = &SplitIdentifierString;
62 #else
63 static bool (*split_function_ptr)(char *, char, List **) = &SplitGUCList;
64 #endif
65 
66 /*
67  * Signal handler for SIGTERM
68  *      Set a flag to let the main loop to terminate, and set our latch to wake
69  *      it up.
70  */
71 static void
pg_partman_bgw_sigterm(SIGNAL_ARGS)72 pg_partman_bgw_sigterm(SIGNAL_ARGS)
73 {
74     int         save_errno = errno;
75 
76     got_sigterm = true;
77 
78     if (MyProc)
79         SetLatch(&MyProc->procLatch);
80 
81     errno = save_errno;
82 }
83 
84 /*
85  * Signal handler for SIGHUP
86  *      Set a flag to tell the main loop to reread the config file, and set
87  *      our latch to wake it up.
88  */
pg_partman_bgw_sighup(SIGNAL_ARGS)89 static void pg_partman_bgw_sighup(SIGNAL_ARGS) {
90     int         save_errno = errno;
91 
92     got_sighup = true;
93 
94     if (MyProc)
95         SetLatch(&MyProc->procLatch);
96 
97     errno = save_errno;
98 }
99 
100 /*
101  * Entrypoint of this module.
102  */
103 void
_PG_init(void)104 _PG_init(void)
105 {
106     BackgroundWorker worker;
107 
108     DefineCustomIntVariable("pg_partman_bgw.interval",
109                             "How often run_maintenance() is called (in seconds).",
110                             NULL,
111                             &pg_partman_bgw_interval,
112                             3600,
113                             1,
114                             INT_MAX,
115                             PGC_SIGHUP,
116                             0,
117                             NULL,
118                             NULL,
119                             NULL);
120 
121     #if (PG_VERSION_NUM >= 110000)
122     DefineCustomStringVariable("pg_partman_bgw.analyze",
123                             "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
124                             NULL,
125                             &pg_partman_bgw_analyze,
126                             "off",
127                             PGC_SIGHUP,
128                             0,
129                             NULL,
130                             NULL,
131                             NULL);
132     #else
133     DefineCustomStringVariable("pg_partman_bgw.analyze",
134                             "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
135                             NULL,
136                             &pg_partman_bgw_analyze,
137                             "on",
138                             PGC_SIGHUP,
139                             0,
140                             NULL,
141                             NULL,
142                             NULL);
143     #endif
144 
145     DefineCustomStringVariable("pg_partman_bgw.dbname",
146                             "CSV list of specific databases in the cluster to run pg_partman BGW on.",
147                             NULL,
148                             &pg_partman_bgw_dbname,
149                             NULL,
150                             PGC_SIGHUP,
151                             0,
152                             NULL,
153                             NULL,
154                             NULL);
155 
156     DefineCustomStringVariable("pg_partman_bgw.jobmon",
157                             "Whether to log run_maintenance() calls to pg_jobmon if it is installed. Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
158                             NULL,
159                             &pg_partman_bgw_jobmon,
160                             "on",
161                             PGC_SIGHUP,
162                             0,
163                             NULL,
164                             NULL,
165                             NULL);
166 
167     DefineCustomStringVariable("pg_partman_bgw.role",
168                                "Role to be used by BGW. Must have execute permissions on run_maintenance()",
169                                NULL,
170                                &pg_partman_bgw_role,
171                                "postgres",
172                                PGC_SIGHUP,
173                                0,
174                                NULL,
175                                NULL,
176                                NULL);
177 
178 /* Kept as comment for reference for future development
179     DefineCustomStringVariable("pg_partman_bgw.maintenance_db",
180                             "The BGW requires connecting to a local database for reading system catalogs. By default it uses template1. You can change that with this setting if needed.",
181                             NULL,
182                             &pg_partman_bgw_maint_db,
183                             "template1",
184                             PGC_SIGHUP,
185                             0,
186                             NULL,
187                             NULL,
188                             NULL);
189 */
190 
191     if (!process_shared_preload_libraries_in_progress)
192         return;
193 
194     // Start BGW when database starts
195     sprintf(worker.bgw_name, "pg_partman master background worker");
196     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
197         BGWORKER_BACKEND_DATABASE_CONNECTION;
198     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
199     worker.bgw_restart_time = 600;
200     #if (PG_VERSION_NUM < 100000)
201     worker.bgw_main = pg_partman_bgw_main;
202     #endif
203     #if (PG_VERSION_NUM >= 100000)
204     sprintf(worker.bgw_library_name, "pg_partman_bgw");
205     sprintf(worker.bgw_function_name, "pg_partman_bgw_main");
206     #endif
207     worker.bgw_main_arg = CStringGetDatum(pg_partman_bgw_dbname);
208     worker.bgw_notify_pid = 0;
209     RegisterBackgroundWorker(&worker);
210 
211 }
212 
pg_partman_bgw_main(Datum main_arg)213 void pg_partman_bgw_main(Datum main_arg) {
214     StringInfoData buf;
215 
216     /* Establish signal handlers before unblocking signals. */
217     pqsignal(SIGHUP, pg_partman_bgw_sighup);
218     pqsignal(SIGTERM, pg_partman_bgw_sigterm);
219 
220     /* We're now ready to receive signals */
221     BackgroundWorkerUnblockSignals();
222 
223 /* Keep for when master requires persistent connection
224    elog(LOG, "%s master process initialized with role %s on database %s"
225             , MyBgworkerEntry->bgw_name
226             , pg_partman_bgw_role
227             , pg_partman_bgw_dbname);
228 */
229     elog(LOG, "%s master process initialized with role %s"
230             , MyBgworkerEntry->bgw_name
231             , pg_partman_bgw_role);
232 
233     initStringInfo(&buf);
234 
235     /*
236      * Main loop: do this until the SIGTERM handler tells us to terminate
237      */
238     while (!got_sigterm) {
239         BackgroundWorker        worker;
240         BackgroundWorkerHandle  *handle;
241         BgwHandleStatus         status;
242         char                    *rawstring;
243         int                     dbcounter;
244         int                     rc;
245         int                     full_string_length;
246         List                    *elemlist;
247         ListCell                *l;
248         pid_t                   pid;
249 
250         /* Using Latch loop method suggested in latch.h
251          * Uses timeout flag in WaitLatch() further below instead of sleep to allow clean shutdown */
252         ResetLatch(&MyProc->procLatch);
253 
254         CHECK_FOR_INTERRUPTS();
255 
256         /* In case of a SIGHUP, just reload the configuration. */
257         if (got_sighup) {
258             got_sighup = false;
259             ProcessConfigFile(PGC_SIGHUP);
260         }
261         elog(DEBUG1, "After sighup check (got_sighup: %d)", got_sighup);
262 
263         /* In case of a SIGTERM in middle of loop, stop all further processing and return from BGW function to allow it to exit cleanly. */
264         if (got_sigterm) {
265             elog(LOG, "pg_partman master BGW received SIGTERM. Shutting down. (got_sigterm: %d)", got_sigterm);
266             return;
267         }
268 
269         // Use method of shared_preload_libraries to split the pg_partman_bgw_dbname string found in src/backend/utils/init/miscinit.c
270         // Need a modifiable copy of string
271         if (pg_partman_bgw_dbname != NULL) {
272             rawstring = pstrdup(pg_partman_bgw_dbname);
273             // Parse string into list of identifiers
274             if (!(*split_function_ptr)(rawstring, ',', &elemlist)) {
275                 // syntax error in list
276                 pfree(rawstring);
277                 list_free(elemlist);
278                 ereport(LOG,
279                         (errcode(ERRCODE_SYNTAX_ERROR),
280                          errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf")));
281                 return;
282             }
283 
284             dbcounter = 0;
285             foreach(l, elemlist) {
286 
287                 char *dbname = (char *) lfirst(l);
288 
289                 elog(DEBUG1, "Dynamic bgw launch begun for %s (%d)", dbname, dbcounter);
290                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
291                     BGWORKER_BACKEND_DATABASE_CONNECTION;
292                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
293                 worker.bgw_restart_time = BGW_NEVER_RESTART;
294                 #if (PG_VERSION_NUM < 100000)
295                 worker.bgw_main = NULL;
296                 #endif
297                 sprintf(worker.bgw_library_name, "pg_partman_bgw");
298                 sprintf(worker.bgw_function_name, "pg_partman_bgw_run_maint");
299                 full_string_length = snprintf(worker.bgw_name, sizeof(worker.bgw_name),
300                               "pg_partman dynamic background worker (dbname=%s)", dbname);
301                 if (full_string_length >= sizeof(worker.bgw_name)) {
302                     /* dbname was truncated, add an ellipsis to denote it */
303                     const char truncated_mark[] = "...)";
304                     memcpy(worker.bgw_name + sizeof(worker.bgw_name) - sizeof(truncated_mark),
305                            truncated_mark, sizeof(truncated_mark));
306                 }
307                 worker.bgw_main_arg = Int32GetDatum(dbcounter);
308                 worker.bgw_notify_pid = MyProcPid;
309 
310                 dbcounter++;
311 
312                 elog(DEBUG1, "Registering dynamic background worker...");
313                 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
314                     elog(ERROR, "Unable to register dynamic background worker for pg_partman. Consider increasing max_worker_processes if you see this frequently. Main background worker process will try restarting in 10 minutes.");
315                 }
316 
317                 elog(DEBUG1, "Waiting for BGW startup...");
318                 status = WaitForBackgroundWorkerStartup(handle, &pid);
319                 elog(DEBUG1, "BGW startup status: %d", status);
320 
321                 if (status == BGWH_STOPPED) {
322                     ereport(ERROR,
323                             (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
324                              errmsg("Could not start dynamic pg_partman background process"),
325                            errhint("More details may be available in the server log.")));
326                 }
327 
328                 if (status == BGWH_POSTMASTER_DIED) {
329                     ereport(ERROR,
330                             (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
331                           errmsg("Cannot start dynamic pg_partman background processes without postmaster"),
332                              errhint("Kill all remaining database processes and restart the database.")));
333                 }
334                 Assert(status == BGWH_STARTED);
335 
336                 #if (PG_VERSION_NUM >= 90500)
337                 // Shutdown wait function introduced in 9.5. The latch problems this wait fixes are only encountered in
338                 // 9.6 and later. So this shouldn't be a problem for 9.4.
339                 elog(DEBUG1, "Waiting for BGW shutdown...");
340                 status = WaitForBackgroundWorkerShutdown(handle);
341                 elog(DEBUG1, "BGW shutdown status: %d", status);
342                 Assert(status == BGWH_STOPPED);
343                 #endif
344             }
345 
346             pfree(rawstring);
347             list_free(elemlist);
348         } else { // pg_partman_bgw_dbname if null
349             elog(DEBUG1, "pg_partman_bgw.dbname GUC is NULL. Nothing to do in main loop.");
350         }
351 
352 
353         elog(DEBUG1, "Latch status just before waitlatch call: %d", MyProc->procLatch.is_set);
354 
355         #if (PG_VERSION_NUM >= 100000)
356         rc = WaitLatch(&MyProc->procLatch,
357                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
358                        pg_partman_bgw_interval * 1000L,
359                        PG_WAIT_EXTENSION);
360         #endif
361         #if (PG_VERSION_NUM < 100000)
362         rc = WaitLatch(&MyProc->procLatch,
363                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
364                        pg_partman_bgw_interval * 1000L);
365         #endif
366         /* emergency bailout if postmaster has died */
367         if (rc & WL_POSTMASTER_DEATH) {
368             proc_exit(1);
369         }
370 
371         elog(DEBUG1, "Latch status after waitlatch call: %d", MyProc->procLatch.is_set);
372 
373     } // end sigterm while
374 
375 } // end main
376 
377 /*
378  * Unable to pass the database name as a string argument (not sure why yet)
379  * Instead, the GUC is parsed both in the main function and below and a counter integer
380  *  is passed to determine which database the BGW will run in.
381  */
pg_partman_bgw_run_maint(Datum arg)382 void pg_partman_bgw_run_maint(Datum arg) {
383 
384     char                *analyze;
385     char                *dbname = "template1";
386     char                *jobmon;
387     char                *partman_schema;
388     char                *rawstring;
389     int                 db_main_counter = DatumGetInt32(arg);
390     List                *elemlist;
391     int                 ret;
392     StringInfoData      buf;
393 
394     /* Establish signal handlers before unblocking signals. */
395     pqsignal(SIGHUP, pg_partman_bgw_sighup);
396     pqsignal(SIGTERM, pg_partman_bgw_sigterm);
397 
398     /* We're now ready to receive signals */
399     BackgroundWorkerUnblockSignals();
400 
401     elog(DEBUG1, "Before parsing dbname GUC in dynamic main func: %s", pg_partman_bgw_dbname);
402     rawstring = pstrdup(pg_partman_bgw_dbname);
403     elog(DEBUG1, "GUC rawstring copy: %s", rawstring);
404     // Parse string into list of identifiers
405     if (!(*split_function_ptr)(rawstring, ',', &elemlist)) {
406         // syntax error in list
407         pfree(rawstring);
408         list_free(elemlist);
409         ereport(LOG,
410                 (errcode(ERRCODE_SYNTAX_ERROR),
411                  errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf")));
412         return;
413     }
414 
415     dbname = list_nth(elemlist, db_main_counter);
416     elog(DEBUG1, "Parsing dbname list: %s (%d)", dbname, db_main_counter);
417 
418     if (strcmp(dbname, "template1") == 0) {
419         elog(DEBUG1, "Default database name found in dbname local variable (\"template1\").");
420     }
421 
422     elog(DEBUG1, "Before run_maint initialize connection for db %s", dbname);
423 
424     #if (PG_VERSION_NUM < 110000)
425     BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role);
426     #endif
427     #if (PG_VERSION_NUM >= 110000)
428     BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role, 0);
429     #endif
430 
431     elog(DEBUG1, "After run_maint initialize connection for db %s", dbname);
432 
433     initStringInfo(&buf);
434 
435     SetCurrentStatementStartTimestamp();
436     StartTransactionCommand();
437     SPI_connect();
438     PushActiveSnapshot(GetTransactionSnapshot());
439     pgstat_report_appname("pg_partman dynamic background worker");
440 
441     // First determine if pg_partman is even installed in this database
442     appendStringInfo(&buf, "SELECT extname FROM pg_catalog.pg_extension WHERE extname = 'pg_partman'");
443     pgstat_report_activity(STATE_RUNNING, buf.data);
444     elog(DEBUG1, "Checking if pg_partman extension is installed in database: %s" , dbname);
445     ret = SPI_execute(buf.data, true, 1);
446     if (ret != SPI_OK_SELECT) {
447         elog(FATAL, "Cannot determine if pg_partman is installed in database %s: error code %d", dbname, ret);
448     }
449     if (SPI_processed <= 0) {
450         elog(DEBUG1, "pg_partman not installed in database %s. Nothing to do so dynamic worker exiting gracefully.", dbname);
451         // Nothing left to do. Return end the run of BGW function.
452         SPI_finish();
453         PopActiveSnapshot();
454         CommitTransactionCommand();
455         pgstat_report_activity(STATE_IDLE, NULL);
456 
457         pfree(rawstring);
458         list_free(elemlist);
459 
460         return;
461     }
462 
463     // If so then actually log that it's started for that database.
464     elog(LOG, "%s dynamic background worker initialized with role %s on database %s"
465             , MyBgworkerEntry->bgw_name
466             , pg_partman_bgw_role
467             , dbname);
468 
469     resetStringInfo(&buf);
470     appendStringInfo(&buf, "SELECT n.nspname FROM pg_catalog.pg_extension e JOIN pg_catalog.pg_namespace n ON e.extnamespace = n.oid WHERE extname = 'pg_partman'");
471     pgstat_report_activity(STATE_RUNNING, buf.data);
472     ret = SPI_execute(buf.data, true, 1);
473 
474     if (ret != SPI_OK_SELECT) {
475         elog(FATAL, "Cannot determine which schema pg_partman has been installed to: error code %d", ret);
476     }
477 
478     if (SPI_processed > 0) {
479         bool isnull;
480 
481         partman_schema = DatumGetCString(SPI_getbinval(SPI_tuptable->vals[0]
482                 , SPI_tuptable->tupdesc
483                 , 1
484                 , &isnull));
485 
486         if (isnull)
487             elog(FATAL, "Query to determine pg_partman schema returned NULL.");
488 
489     } else {
490         elog(FATAL, "Query to determine pg_partman schema returned zero rows.");
491     }
492 
493     resetStringInfo(&buf);
494     if (strcmp(pg_partman_bgw_analyze, "on") == 0) {
495         analyze = "true";
496     } else {
497         analyze = "false";
498     }
499     if (strcmp(pg_partman_bgw_jobmon, "on") == 0) {
500         jobmon = "true";
501     } else {
502         jobmon = "false";
503     }
504     appendStringInfo(&buf, "SELECT \"%s\".run_maintenance(p_analyze := %s, p_jobmon := %s)", partman_schema, analyze, jobmon);
505 
506     pgstat_report_activity(STATE_RUNNING, buf.data);
507 
508     ret = SPI_execute(buf.data, false, 0);
509 
510     if (ret != SPI_OK_SELECT)
511         elog(FATAL, "Cannot call pg_partman run_maintenance() function: error code %d", ret);
512 
513     elog(LOG, "%s: %s called by role %s on database %s"
514             , MyBgworkerEntry->bgw_name
515             , buf.data
516             , pg_partman_bgw_role
517             , dbname);
518 
519     SPI_finish();
520     PopActiveSnapshot();
521     CommitTransactionCommand();
522     pgstat_report_activity(STATE_IDLE, NULL);
523     elog(DEBUG1, "pg_partman dynamic BGW shutting down gracefully for database %s.", dbname);
524 
525     pfree(rawstring);
526     list_free(elemlist);
527 
528     return;
529 }
530 
531