1 /*
2 * pg_partman_bgw.c
3 *
4 * A background worker process for the pg_partman extension to allow
5 * partition maintenance to be scheduled and run within the database
6 * itself without required a third-party scheduler (ex. cron)
7 *
8 */
9
10 #include "postgres.h"
11
12 /* These are always necessary for a bgworker */
13 #include "miscadmin.h"
14 #include "postmaster/bgworker.h"
15 #include "storage/ipc.h"
16 #include "storage/latch.h"
17 #include "storage/lwlock.h"
18 #include "storage/proc.h"
19 #include "storage/shmem.h"
20
21 /* these headers are used by this particular worker's code */
22 #include "access/xact.h"
23 #include "executor/spi.h"
24 #include "fmgr.h"
25 #include "lib/stringinfo.h"
26 #include "pgstat.h"
27 #include "utils/builtins.h"
28 #include "utils/snapmgr.h"
29 #include "tcop/utility.h"
30
31 #if (PG_VERSION_NUM >= 100000)
32 #include "utils/varlena.h"
33 #endif
34
35
36 PG_MODULE_MAGIC;
37
38 void _PG_init(void);
39 void pg_partman_bgw_main(Datum);
40 void pg_partman_bgw_run_maint(Datum);
41
42 /* flags set by signal handlers */
43 static volatile sig_atomic_t got_sighup = false;
44 static volatile sig_atomic_t got_sigterm = false;
45
46 /* GUC variables */
47 static int pg_partman_bgw_interval = 3600; // Default hourly
48 static char *pg_partman_bgw_role = "postgres"; // Default to postgres role
49
50 // Do not analyze by default on PG11+
51 #if (PG_VERSION_NUM >= 110000)
52 static char *pg_partman_bgw_analyze = "off";
53 #else
54 static char *pg_partman_bgw_analyze = "on";
55 #endif
56
57 static char *pg_partman_bgw_jobmon = "on";
58 static char *pg_partman_bgw_dbname = NULL;
59
60 #if (PG_VERSION_NUM < 100500)
61 static bool (*split_function_ptr)(char *, char, List **) = &SplitIdentifierString;
62 #else
63 static bool (*split_function_ptr)(char *, char, List **) = &SplitGUCList;
64 #endif
65
66 /*
67 * Signal handler for SIGTERM
68 * Set a flag to let the main loop to terminate, and set our latch to wake
69 * it up.
70 */
71 static void
pg_partman_bgw_sigterm(SIGNAL_ARGS)72 pg_partman_bgw_sigterm(SIGNAL_ARGS)
73 {
74 int save_errno = errno;
75
76 got_sigterm = true;
77
78 if (MyProc)
79 SetLatch(&MyProc->procLatch);
80
81 errno = save_errno;
82 }
83
84 /*
85 * Signal handler for SIGHUP
86 * Set a flag to tell the main loop to reread the config file, and set
87 * our latch to wake it up.
88 */
pg_partman_bgw_sighup(SIGNAL_ARGS)89 static void pg_partman_bgw_sighup(SIGNAL_ARGS) {
90 int save_errno = errno;
91
92 got_sighup = true;
93
94 if (MyProc)
95 SetLatch(&MyProc->procLatch);
96
97 errno = save_errno;
98 }
99
100 /*
101 * Entrypoint of this module.
102 */
103 void
_PG_init(void)104 _PG_init(void)
105 {
106 BackgroundWorker worker;
107
108 DefineCustomIntVariable("pg_partman_bgw.interval",
109 "How often run_maintenance() is called (in seconds).",
110 NULL,
111 &pg_partman_bgw_interval,
112 3600,
113 1,
114 INT_MAX,
115 PGC_SIGHUP,
116 0,
117 NULL,
118 NULL,
119 NULL);
120
121 #if (PG_VERSION_NUM >= 110000)
122 DefineCustomStringVariable("pg_partman_bgw.analyze",
123 "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
124 NULL,
125 &pg_partman_bgw_analyze,
126 "off",
127 PGC_SIGHUP,
128 0,
129 NULL,
130 NULL,
131 NULL);
132 #else
133 DefineCustomStringVariable("pg_partman_bgw.analyze",
134 "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
135 NULL,
136 &pg_partman_bgw_analyze,
137 "on",
138 PGC_SIGHUP,
139 0,
140 NULL,
141 NULL,
142 NULL);
143 #endif
144
145 DefineCustomStringVariable("pg_partman_bgw.dbname",
146 "CSV list of specific databases in the cluster to run pg_partman BGW on.",
147 NULL,
148 &pg_partman_bgw_dbname,
149 NULL,
150 PGC_SIGHUP,
151 0,
152 NULL,
153 NULL,
154 NULL);
155
156 DefineCustomStringVariable("pg_partman_bgw.jobmon",
157 "Whether to log run_maintenance() calls to pg_jobmon if it is installed. Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
158 NULL,
159 &pg_partman_bgw_jobmon,
160 "on",
161 PGC_SIGHUP,
162 0,
163 NULL,
164 NULL,
165 NULL);
166
167 DefineCustomStringVariable("pg_partman_bgw.role",
168 "Role to be used by BGW. Must have execute permissions on run_maintenance()",
169 NULL,
170 &pg_partman_bgw_role,
171 "postgres",
172 PGC_SIGHUP,
173 0,
174 NULL,
175 NULL,
176 NULL);
177
178 /* Kept as comment for reference for future development
179 DefineCustomStringVariable("pg_partman_bgw.maintenance_db",
180 "The BGW requires connecting to a local database for reading system catalogs. By default it uses template1. You can change that with this setting if needed.",
181 NULL,
182 &pg_partman_bgw_maint_db,
183 "template1",
184 PGC_SIGHUP,
185 0,
186 NULL,
187 NULL,
188 NULL);
189 */
190
191 if (!process_shared_preload_libraries_in_progress)
192 return;
193
194 // Start BGW when database starts
195 sprintf(worker.bgw_name, "pg_partman master background worker");
196 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
197 BGWORKER_BACKEND_DATABASE_CONNECTION;
198 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
199 worker.bgw_restart_time = 600;
200 #if (PG_VERSION_NUM < 100000)
201 worker.bgw_main = pg_partman_bgw_main;
202 #endif
203 #if (PG_VERSION_NUM >= 100000)
204 sprintf(worker.bgw_library_name, "pg_partman_bgw");
205 sprintf(worker.bgw_function_name, "pg_partman_bgw_main");
206 #endif
207 worker.bgw_main_arg = CStringGetDatum(pg_partman_bgw_dbname);
208 worker.bgw_notify_pid = 0;
209 RegisterBackgroundWorker(&worker);
210
211 }
212
pg_partman_bgw_main(Datum main_arg)213 void pg_partman_bgw_main(Datum main_arg) {
214 StringInfoData buf;
215
216 /* Establish signal handlers before unblocking signals. */
217 pqsignal(SIGHUP, pg_partman_bgw_sighup);
218 pqsignal(SIGTERM, pg_partman_bgw_sigterm);
219
220 /* We're now ready to receive signals */
221 BackgroundWorkerUnblockSignals();
222
223 /* Keep for when master requires persistent connection
224 elog(LOG, "%s master process initialized with role %s on database %s"
225 , MyBgworkerEntry->bgw_name
226 , pg_partman_bgw_role
227 , pg_partman_bgw_dbname);
228 */
229 elog(LOG, "%s master process initialized with role %s"
230 , MyBgworkerEntry->bgw_name
231 , pg_partman_bgw_role);
232
233 initStringInfo(&buf);
234
235 /*
236 * Main loop: do this until the SIGTERM handler tells us to terminate
237 */
238 while (!got_sigterm) {
239 BackgroundWorker worker;
240 BackgroundWorkerHandle *handle;
241 BgwHandleStatus status;
242 char *rawstring;
243 int dbcounter;
244 int rc;
245 int full_string_length;
246 List *elemlist;
247 ListCell *l;
248 pid_t pid;
249
250 /* Using Latch loop method suggested in latch.h
251 * Uses timeout flag in WaitLatch() further below instead of sleep to allow clean shutdown */
252 ResetLatch(&MyProc->procLatch);
253
254 CHECK_FOR_INTERRUPTS();
255
256 /* In case of a SIGHUP, just reload the configuration. */
257 if (got_sighup) {
258 got_sighup = false;
259 ProcessConfigFile(PGC_SIGHUP);
260 }
261 elog(DEBUG1, "After sighup check (got_sighup: %d)", got_sighup);
262
263 /* In case of a SIGTERM in middle of loop, stop all further processing and return from BGW function to allow it to exit cleanly. */
264 if (got_sigterm) {
265 elog(LOG, "pg_partman master BGW received SIGTERM. Shutting down. (got_sigterm: %d)", got_sigterm);
266 return;
267 }
268
269 // Use method of shared_preload_libraries to split the pg_partman_bgw_dbname string found in src/backend/utils/init/miscinit.c
270 // Need a modifiable copy of string
271 if (pg_partman_bgw_dbname != NULL) {
272 rawstring = pstrdup(pg_partman_bgw_dbname);
273 // Parse string into list of identifiers
274 if (!(*split_function_ptr)(rawstring, ',', &elemlist)) {
275 // syntax error in list
276 pfree(rawstring);
277 list_free(elemlist);
278 ereport(LOG,
279 (errcode(ERRCODE_SYNTAX_ERROR),
280 errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf")));
281 return;
282 }
283
284 dbcounter = 0;
285 foreach(l, elemlist) {
286
287 char *dbname = (char *) lfirst(l);
288
289 elog(DEBUG1, "Dynamic bgw launch begun for %s (%d)", dbname, dbcounter);
290 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
291 BGWORKER_BACKEND_DATABASE_CONNECTION;
292 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
293 worker.bgw_restart_time = BGW_NEVER_RESTART;
294 #if (PG_VERSION_NUM < 100000)
295 worker.bgw_main = NULL;
296 #endif
297 sprintf(worker.bgw_library_name, "pg_partman_bgw");
298 sprintf(worker.bgw_function_name, "pg_partman_bgw_run_maint");
299 full_string_length = snprintf(worker.bgw_name, sizeof(worker.bgw_name),
300 "pg_partman dynamic background worker (dbname=%s)", dbname);
301 if (full_string_length >= sizeof(worker.bgw_name)) {
302 /* dbname was truncated, add an ellipsis to denote it */
303 const char truncated_mark[] = "...)";
304 memcpy(worker.bgw_name + sizeof(worker.bgw_name) - sizeof(truncated_mark),
305 truncated_mark, sizeof(truncated_mark));
306 }
307 worker.bgw_main_arg = Int32GetDatum(dbcounter);
308 worker.bgw_notify_pid = MyProcPid;
309
310 dbcounter++;
311
312 elog(DEBUG1, "Registering dynamic background worker...");
313 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
314 elog(ERROR, "Unable to register dynamic background worker for pg_partman. Consider increasing max_worker_processes if you see this frequently. Main background worker process will try restarting in 10 minutes.");
315 }
316
317 elog(DEBUG1, "Waiting for BGW startup...");
318 status = WaitForBackgroundWorkerStartup(handle, &pid);
319 elog(DEBUG1, "BGW startup status: %d", status);
320
321 if (status == BGWH_STOPPED) {
322 ereport(ERROR,
323 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
324 errmsg("Could not start dynamic pg_partman background process"),
325 errhint("More details may be available in the server log.")));
326 }
327
328 if (status == BGWH_POSTMASTER_DIED) {
329 ereport(ERROR,
330 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
331 errmsg("Cannot start dynamic pg_partman background processes without postmaster"),
332 errhint("Kill all remaining database processes and restart the database.")));
333 }
334 Assert(status == BGWH_STARTED);
335
336 #if (PG_VERSION_NUM >= 90500)
337 // Shutdown wait function introduced in 9.5. The latch problems this wait fixes are only encountered in
338 // 9.6 and later. So this shouldn't be a problem for 9.4.
339 elog(DEBUG1, "Waiting for BGW shutdown...");
340 status = WaitForBackgroundWorkerShutdown(handle);
341 elog(DEBUG1, "BGW shutdown status: %d", status);
342 Assert(status == BGWH_STOPPED);
343 #endif
344 }
345
346 pfree(rawstring);
347 list_free(elemlist);
348 } else { // pg_partman_bgw_dbname if null
349 elog(DEBUG1, "pg_partman_bgw.dbname GUC is NULL. Nothing to do in main loop.");
350 }
351
352
353 elog(DEBUG1, "Latch status just before waitlatch call: %d", MyProc->procLatch.is_set);
354
355 #if (PG_VERSION_NUM >= 100000)
356 rc = WaitLatch(&MyProc->procLatch,
357 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
358 pg_partman_bgw_interval * 1000L,
359 PG_WAIT_EXTENSION);
360 #endif
361 #if (PG_VERSION_NUM < 100000)
362 rc = WaitLatch(&MyProc->procLatch,
363 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
364 pg_partman_bgw_interval * 1000L);
365 #endif
366 /* emergency bailout if postmaster has died */
367 if (rc & WL_POSTMASTER_DEATH) {
368 proc_exit(1);
369 }
370
371 elog(DEBUG1, "Latch status after waitlatch call: %d", MyProc->procLatch.is_set);
372
373 } // end sigterm while
374
375 } // end main
376
377 /*
378 * Unable to pass the database name as a string argument (not sure why yet)
379 * Instead, the GUC is parsed both in the main function and below and a counter integer
380 * is passed to determine which database the BGW will run in.
381 */
pg_partman_bgw_run_maint(Datum arg)382 void pg_partman_bgw_run_maint(Datum arg) {
383
384 char *analyze;
385 char *dbname = "template1";
386 char *jobmon;
387 char *partman_schema;
388 char *rawstring;
389 int db_main_counter = DatumGetInt32(arg);
390 List *elemlist;
391 int ret;
392 StringInfoData buf;
393
394 /* Establish signal handlers before unblocking signals. */
395 pqsignal(SIGHUP, pg_partman_bgw_sighup);
396 pqsignal(SIGTERM, pg_partman_bgw_sigterm);
397
398 /* We're now ready to receive signals */
399 BackgroundWorkerUnblockSignals();
400
401 elog(DEBUG1, "Before parsing dbname GUC in dynamic main func: %s", pg_partman_bgw_dbname);
402 rawstring = pstrdup(pg_partman_bgw_dbname);
403 elog(DEBUG1, "GUC rawstring copy: %s", rawstring);
404 // Parse string into list of identifiers
405 if (!(*split_function_ptr)(rawstring, ',', &elemlist)) {
406 // syntax error in list
407 pfree(rawstring);
408 list_free(elemlist);
409 ereport(LOG,
410 (errcode(ERRCODE_SYNTAX_ERROR),
411 errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf")));
412 return;
413 }
414
415 dbname = list_nth(elemlist, db_main_counter);
416 elog(DEBUG1, "Parsing dbname list: %s (%d)", dbname, db_main_counter);
417
418 if (strcmp(dbname, "template1") == 0) {
419 elog(DEBUG1, "Default database name found in dbname local variable (\"template1\").");
420 }
421
422 elog(DEBUG1, "Before run_maint initialize connection for db %s", dbname);
423
424 #if (PG_VERSION_NUM < 110000)
425 BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role);
426 #endif
427 #if (PG_VERSION_NUM >= 110000)
428 BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role, 0);
429 #endif
430
431 elog(DEBUG1, "After run_maint initialize connection for db %s", dbname);
432
433 initStringInfo(&buf);
434
435 SetCurrentStatementStartTimestamp();
436 StartTransactionCommand();
437 SPI_connect();
438 PushActiveSnapshot(GetTransactionSnapshot());
439 pgstat_report_appname("pg_partman dynamic background worker");
440
441 // First determine if pg_partman is even installed in this database
442 appendStringInfo(&buf, "SELECT extname FROM pg_catalog.pg_extension WHERE extname = 'pg_partman'");
443 pgstat_report_activity(STATE_RUNNING, buf.data);
444 elog(DEBUG1, "Checking if pg_partman extension is installed in database: %s" , dbname);
445 ret = SPI_execute(buf.data, true, 1);
446 if (ret != SPI_OK_SELECT) {
447 elog(FATAL, "Cannot determine if pg_partman is installed in database %s: error code %d", dbname, ret);
448 }
449 if (SPI_processed <= 0) {
450 elog(DEBUG1, "pg_partman not installed in database %s. Nothing to do so dynamic worker exiting gracefully.", dbname);
451 // Nothing left to do. Return end the run of BGW function.
452 SPI_finish();
453 PopActiveSnapshot();
454 CommitTransactionCommand();
455 pgstat_report_activity(STATE_IDLE, NULL);
456
457 pfree(rawstring);
458 list_free(elemlist);
459
460 return;
461 }
462
463 // If so then actually log that it's started for that database.
464 elog(LOG, "%s dynamic background worker initialized with role %s on database %s"
465 , MyBgworkerEntry->bgw_name
466 , pg_partman_bgw_role
467 , dbname);
468
469 resetStringInfo(&buf);
470 appendStringInfo(&buf, "SELECT n.nspname FROM pg_catalog.pg_extension e JOIN pg_catalog.pg_namespace n ON e.extnamespace = n.oid WHERE extname = 'pg_partman'");
471 pgstat_report_activity(STATE_RUNNING, buf.data);
472 ret = SPI_execute(buf.data, true, 1);
473
474 if (ret != SPI_OK_SELECT) {
475 elog(FATAL, "Cannot determine which schema pg_partman has been installed to: error code %d", ret);
476 }
477
478 if (SPI_processed > 0) {
479 bool isnull;
480
481 partman_schema = DatumGetCString(SPI_getbinval(SPI_tuptable->vals[0]
482 , SPI_tuptable->tupdesc
483 , 1
484 , &isnull));
485
486 if (isnull)
487 elog(FATAL, "Query to determine pg_partman schema returned NULL.");
488
489 } else {
490 elog(FATAL, "Query to determine pg_partman schema returned zero rows.");
491 }
492
493 resetStringInfo(&buf);
494 if (strcmp(pg_partman_bgw_analyze, "on") == 0) {
495 analyze = "true";
496 } else {
497 analyze = "false";
498 }
499 if (strcmp(pg_partman_bgw_jobmon, "on") == 0) {
500 jobmon = "true";
501 } else {
502 jobmon = "false";
503 }
504 appendStringInfo(&buf, "SELECT \"%s\".run_maintenance(p_analyze := %s, p_jobmon := %s)", partman_schema, analyze, jobmon);
505
506 pgstat_report_activity(STATE_RUNNING, buf.data);
507
508 ret = SPI_execute(buf.data, false, 0);
509
510 if (ret != SPI_OK_SELECT)
511 elog(FATAL, "Cannot call pg_partman run_maintenance() function: error code %d", ret);
512
513 elog(LOG, "%s: %s called by role %s on database %s"
514 , MyBgworkerEntry->bgw_name
515 , buf.data
516 , pg_partman_bgw_role
517 , dbname);
518
519 SPI_finish();
520 PopActiveSnapshot();
521 CommitTransactionCommand();
522 pgstat_report_activity(STATE_IDLE, NULL);
523 elog(DEBUG1, "pg_partman dynamic BGW shutting down gracefully for database %s.", dbname);
524
525 pfree(rawstring);
526 list_free(elemlist);
527
528 return;
529 }
530
531