1 /* -------------------------------------------------------------------------
2 *
3 * worker_spi.c
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
9 *
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
15 *
16 * Copyright (c) 2013-2020, PostgreSQL Global Development Group
17 *
18 * IDENTIFICATION
19 * src/test/modules/worker_spi/worker_spi.c
20 *
21 * -------------------------------------------------------------------------
22 */
23 #include "postgres.h"
24
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43
44 PG_MODULE_MAGIC;
45
46 PG_FUNCTION_INFO_V1(worker_spi_launch);
47
48 void _PG_init(void);
49 void worker_spi_main(Datum) pg_attribute_noreturn();
50
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54
55 /* GUC variables */
56 static int worker_spi_naptime = 10;
57 static int worker_spi_total_workers = 2;
58 static char *worker_spi_database = NULL;
59
60
61 typedef struct worktable
62 {
63 const char *schema;
64 const char *name;
65 } worktable;
66
67 /*
68 * Signal handler for SIGTERM
69 * Set a flag to let the main loop to terminate, and set our latch to wake
70 * it up.
71 */
72 static void
worker_spi_sigterm(SIGNAL_ARGS)73 worker_spi_sigterm(SIGNAL_ARGS)
74 {
75 int save_errno = errno;
76
77 got_sigterm = true;
78 SetLatch(MyLatch);
79
80 errno = save_errno;
81 }
82
83 /*
84 * Signal handler for SIGHUP
85 * Set a flag to tell the main loop to reread the config file, and set
86 * our latch to wake it up.
87 */
88 static void
worker_spi_sighup(SIGNAL_ARGS)89 worker_spi_sighup(SIGNAL_ARGS)
90 {
91 int save_errno = errno;
92
93 got_sighup = true;
94 SetLatch(MyLatch);
95
96 errno = save_errno;
97 }
98
99 /*
100 * Initialize workspace for a worker process: create the schema if it doesn't
101 * already exist.
102 */
103 static void
initialize_worker_spi(worktable * table)104 initialize_worker_spi(worktable *table)
105 {
106 int ret;
107 int ntup;
108 bool isnull;
109 StringInfoData buf;
110
111 SetCurrentStatementStartTimestamp();
112 StartTransactionCommand();
113 SPI_connect();
114 PushActiveSnapshot(GetTransactionSnapshot());
115 pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
116
117 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
118 initStringInfo(&buf);
119 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
120 table->schema);
121
122 ret = SPI_execute(buf.data, true, 0);
123 if (ret != SPI_OK_SELECT)
124 elog(FATAL, "SPI_execute failed: error code %d", ret);
125
126 if (SPI_processed != 1)
127 elog(FATAL, "not a singleton result");
128
129 ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
130 SPI_tuptable->tupdesc,
131 1, &isnull));
132 if (isnull)
133 elog(FATAL, "null result");
134
135 if (ntup == 0)
136 {
137 resetStringInfo(&buf);
138 appendStringInfo(&buf,
139 "CREATE SCHEMA \"%s\" "
140 "CREATE TABLE \"%s\" ("
141 " type text CHECK (type IN ('total', 'delta')), "
142 " value integer)"
143 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
144 "WHERE type = 'total'",
145 table->schema, table->name, table->name, table->name);
146
147 /* set statement start time */
148 SetCurrentStatementStartTimestamp();
149
150 ret = SPI_execute(buf.data, false, 0);
151
152 if (ret != SPI_OK_UTILITY)
153 elog(FATAL, "failed to create my schema");
154 }
155
156 SPI_finish();
157 PopActiveSnapshot();
158 CommitTransactionCommand();
159 pgstat_report_activity(STATE_IDLE, NULL);
160 }
161
162 void
worker_spi_main(Datum main_arg)163 worker_spi_main(Datum main_arg)
164 {
165 int index = DatumGetInt32(main_arg);
166 worktable *table;
167 StringInfoData buf;
168 char name[20];
169
170 table = palloc(sizeof(worktable));
171 sprintf(name, "schema%d", index);
172 table->schema = pstrdup(name);
173 table->name = pstrdup("counted");
174
175 /* Establish signal handlers before unblocking signals. */
176 pqsignal(SIGHUP, worker_spi_sighup);
177 pqsignal(SIGTERM, worker_spi_sigterm);
178
179 /* We're now ready to receive signals */
180 BackgroundWorkerUnblockSignals();
181
182 /* Connect to our database */
183 BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
184
185 elog(LOG, "%s initialized with %s.%s",
186 MyBgworkerEntry->bgw_name, table->schema, table->name);
187 initialize_worker_spi(table);
188
189 /*
190 * Quote identifiers passed to us. Note that this must be done after
191 * initialize_worker_spi, because that routine assumes the names are not
192 * quoted.
193 *
194 * Note some memory might be leaked here.
195 */
196 table->schema = quote_identifier(table->schema);
197 table->name = quote_identifier(table->name);
198
199 initStringInfo(&buf);
200 appendStringInfo(&buf,
201 "WITH deleted AS (DELETE "
202 "FROM %s.%s "
203 "WHERE type = 'delta' RETURNING value), "
204 "total AS (SELECT coalesce(sum(value), 0) as sum "
205 "FROM deleted) "
206 "UPDATE %s.%s "
207 "SET value = %s.value + total.sum "
208 "FROM total WHERE type = 'total' "
209 "RETURNING %s.value",
210 table->schema, table->name,
211 table->schema, table->name,
212 table->name,
213 table->name);
214
215 /*
216 * Main loop: do this until the SIGTERM handler tells us to terminate
217 */
218 while (!got_sigterm)
219 {
220 int ret;
221
222 /*
223 * Background workers mustn't call usleep() or any direct equivalent:
224 * instead, they may wait on their process latch, which sleeps as
225 * necessary, but is awakened if postmaster dies. That way the
226 * background process goes away immediately in an emergency.
227 */
228 (void) WaitLatch(MyLatch,
229 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
230 worker_spi_naptime * 1000L,
231 PG_WAIT_EXTENSION);
232 ResetLatch(MyLatch);
233
234 CHECK_FOR_INTERRUPTS();
235
236 /*
237 * In case of a SIGHUP, just reload the configuration.
238 */
239 if (got_sighup)
240 {
241 got_sighup = false;
242 ProcessConfigFile(PGC_SIGHUP);
243 }
244
245 /*
246 * Start a transaction on which we can run queries. Note that each
247 * StartTransactionCommand() call should be preceded by a
248 * SetCurrentStatementStartTimestamp() call, which sets both the time
249 * for the statement we're about the run, and also the transaction
250 * start time. Also, each other query sent to SPI should probably be
251 * preceded by SetCurrentStatementStartTimestamp(), so that statement
252 * start time is always up to date.
253 *
254 * The SPI_connect() call lets us run queries through the SPI manager,
255 * and the PushActiveSnapshot() call creates an "active" snapshot
256 * which is necessary for queries to have MVCC data to work on.
257 *
258 * The pgstat_report_activity() call makes our activity visible
259 * through the pgstat views.
260 */
261 SetCurrentStatementStartTimestamp();
262 StartTransactionCommand();
263 SPI_connect();
264 PushActiveSnapshot(GetTransactionSnapshot());
265 pgstat_report_activity(STATE_RUNNING, buf.data);
266
267 /* We can now execute queries via SPI */
268 ret = SPI_execute(buf.data, false, 0);
269
270 if (ret != SPI_OK_UPDATE_RETURNING)
271 elog(FATAL, "cannot select from table %s.%s: error code %d",
272 table->schema, table->name, ret);
273
274 if (SPI_processed > 0)
275 {
276 bool isnull;
277 int32 val;
278
279 val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
280 SPI_tuptable->tupdesc,
281 1, &isnull));
282 if (!isnull)
283 elog(LOG, "%s: count in %s.%s is now %d",
284 MyBgworkerEntry->bgw_name,
285 table->schema, table->name, val);
286 }
287
288 /*
289 * And finish our transaction.
290 */
291 SPI_finish();
292 PopActiveSnapshot();
293 CommitTransactionCommand();
294 pgstat_report_stat(false);
295 pgstat_report_activity(STATE_IDLE, NULL);
296 }
297
298 proc_exit(1);
299 }
300
301 /*
302 * Entrypoint of this module.
303 *
304 * We register more than one worker process here, to demonstrate how that can
305 * be done.
306 */
307 void
_PG_init(void)308 _PG_init(void)
309 {
310 BackgroundWorker worker;
311 unsigned int i;
312
313 /* get the configuration */
314 DefineCustomIntVariable("worker_spi.naptime",
315 "Duration between each check (in seconds).",
316 NULL,
317 &worker_spi_naptime,
318 10,
319 1,
320 INT_MAX,
321 PGC_SIGHUP,
322 0,
323 NULL,
324 NULL,
325 NULL);
326
327 if (!process_shared_preload_libraries_in_progress)
328 return;
329
330 DefineCustomIntVariable("worker_spi.total_workers",
331 "Number of workers.",
332 NULL,
333 &worker_spi_total_workers,
334 2,
335 1,
336 100,
337 PGC_POSTMASTER,
338 0,
339 NULL,
340 NULL,
341 NULL);
342
343 DefineCustomStringVariable("worker_spi.database",
344 "Database to connect to.",
345 NULL,
346 &worker_spi_database,
347 "postgres",
348 PGC_POSTMASTER,
349 0,
350 NULL, NULL, NULL);
351
352 /* set up common data for all our workers */
353 memset(&worker, 0, sizeof(worker));
354 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
355 BGWORKER_BACKEND_DATABASE_CONNECTION;
356 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
357 worker.bgw_restart_time = BGW_NEVER_RESTART;
358 sprintf(worker.bgw_library_name, "worker_spi");
359 sprintf(worker.bgw_function_name, "worker_spi_main");
360 worker.bgw_notify_pid = 0;
361
362 /*
363 * Now fill in worker-specific data, and do the actual registrations.
364 */
365 for (i = 1; i <= worker_spi_total_workers; i++)
366 {
367 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
368 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
369 worker.bgw_main_arg = Int32GetDatum(i);
370
371 RegisterBackgroundWorker(&worker);
372 }
373 }
374
375 /*
376 * Dynamically launch an SPI worker.
377 */
378 Datum
worker_spi_launch(PG_FUNCTION_ARGS)379 worker_spi_launch(PG_FUNCTION_ARGS)
380 {
381 int32 i = PG_GETARG_INT32(0);
382 BackgroundWorker worker;
383 BackgroundWorkerHandle *handle;
384 BgwHandleStatus status;
385 pid_t pid;
386
387 memset(&worker, 0, sizeof(worker));
388 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
389 BGWORKER_BACKEND_DATABASE_CONNECTION;
390 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
391 worker.bgw_restart_time = BGW_NEVER_RESTART;
392 sprintf(worker.bgw_library_name, "worker_spi");
393 sprintf(worker.bgw_function_name, "worker_spi_main");
394 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
395 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
396 worker.bgw_main_arg = Int32GetDatum(i);
397 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
398 worker.bgw_notify_pid = MyProcPid;
399
400 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
401 PG_RETURN_NULL();
402
403 status = WaitForBackgroundWorkerStartup(handle, &pid);
404
405 if (status == BGWH_STOPPED)
406 ereport(ERROR,
407 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
408 errmsg("could not start background process"),
409 errhint("More details may be available in the server log.")));
410 if (status == BGWH_POSTMASTER_DIED)
411 ereport(ERROR,
412 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
413 errmsg("cannot start background processes without postmaster"),
414 errhint("Kill all remaining database processes and restart the database.")));
415 Assert(status == BGWH_STARTED);
416
417 PG_RETURN_INT32(pid);
418 }
419