1 /* -------------------------------------------------------------------------
2 *
3 * worker_spi.c
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
9 *
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
15 *
16 * Copyright (c) 2013-2018, PostgreSQL Global Development Group
17 *
18 * IDENTIFICATION
19 * src/test/modules/worker_spi/worker_spi.c
20 *
21 * -------------------------------------------------------------------------
22 */
23 #include "postgres.h"
24
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43
44 PG_MODULE_MAGIC;
45
46 PG_FUNCTION_INFO_V1(worker_spi_launch);
47
48 void _PG_init(void);
Patch(base_path, difftar_fileobj)49 void worker_spi_main(Datum) pg_attribute_noreturn();
50
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54
55 /* GUC variables */
Patch_from_iter(base_path, fileobj_iter, restrict_index=())56 static int worker_spi_naptime = 10;
57 static int worker_spi_total_workers = 2;
58
59
60 typedef struct worktable
61 {
patch_diff_tarfile(base_path, diff_tarfile, restrict_index=())62 const char *schema;
63 const char *name;
64 } worktable;
65
66 /*
67 * Signal handler for SIGTERM
68 * Set a flag to let the main loop to terminate, and set our latch to wake
69 * it up.
70 */
71 static void
72 worker_spi_sigterm(SIGNAL_ARGS)
73 {
74 int save_errno = errno;
75
76 got_sigterm = true;
77 SetLatch(MyLatch);
78
79 errno = save_errno;
80 }
81
82 /*
83 * Signal handler for SIGHUP
84 * Set a flag to tell the main loop to reread the config file, and set
85 * our latch to wake it up.
86 */
87 static void
88 worker_spi_sighup(SIGNAL_ARGS)
89 {
90 int save_errno = errno;
91
92 got_sighup = true;
93 SetLatch(MyLatch);
94
empty_iter()95 errno = save_errno;
96 }
97
98 /*
99 * Initialize workspace for a worker process: create the schema if it doesn't
filter_path_iter(path_iter, index)100 * already exist.
101 */
102 static void
103 initialize_worker_spi(worktable *table)
104 {
105 int ret;
106 int ntup;
107 bool isnull;
108 StringInfoData buf;
109
110 SetCurrentStatementStartTimestamp();
111 StartTransactionCommand();
112 SPI_connect();
113 PushActiveSnapshot(GetTransactionSnapshot());
114 pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
115
116 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
117 initStringInfo(&buf);
118 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
119 table->schema);
120
121 ret = SPI_execute(buf.data, true, 0);
122 if (ret != SPI_OK_SELECT)
123 elog(FATAL, "SPI_execute failed: error code %d", ret);
124
125 if (SPI_processed != 1)
126 elog(FATAL, "not a singleton result");
127
128 ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
129 SPI_tuptable->tupdesc,
130 1, &isnull));
131 if (isnull)
132 elog(FATAL, "null result");
133
134 if (ntup == 0)
135 {
136 resetStringInfo(&buf);
137 appendStringInfo(&buf,
138 "CREATE SCHEMA \"%s\" "
139 "CREATE TABLE \"%s\" ("
140 " type text CHECK (type IN ('total', 'delta')), "
141 " value integer)"
142 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
143 "WHERE type = 'total'",
144 table->schema, table->name, table->name, table->name);
145
146 /* set statement start time */
147 SetCurrentStatementStartTimestamp();
148
149 ret = SPI_execute(buf.data, false, 0);
150
151 if (ret != SPI_OK_UTILITY)
152 elog(FATAL, "failed to create my schema");
153 }
154
155 SPI_finish();
156 PopActiveSnapshot();
157 CommitTransactionCommand();
158 pgstat_report_activity(STATE_IDLE, NULL);
159 }
160
161 void
162 worker_spi_main(Datum main_arg)
163 {
164 int index = DatumGetInt32(main_arg);
165 worktable *table;
166 StringInfoData buf;
167 char name[20];
168
169 table = palloc(sizeof(worktable));
170 sprintf(name, "schema%d", index);
171 table->schema = pstrdup(name);
172 table->name = pstrdup("counted");
173
174 /* Establish signal handlers before unblocking signals. */
175 pqsignal(SIGHUP, worker_spi_sighup);
176 pqsignal(SIGTERM, worker_spi_sigterm);
177
178 /* We're now ready to receive signals */
179 BackgroundWorkerUnblockSignals();
180
181 /* Connect to our database */
182 BackgroundWorkerInitializeConnection("postgres", NULL, 0);
183
184 elog(LOG, "%s initialized with %s.%s",
185 MyBgworkerEntry->bgw_name, table->schema, table->name);
186 initialize_worker_spi(table);
187
188 /*
189 * Quote identifiers passed to us. Note that this must be done after
190 * initialize_worker_spi, because that routine assumes the names are not
191 * quoted.
192 *
193 * Note some memory might be leaked here.
194 */
195 table->schema = quote_identifier(table->schema);
196 table->name = quote_identifier(table->name);
197
198 initStringInfo(&buf);
199 appendStringInfo(&buf,
200 "WITH deleted AS (DELETE "
201 "FROM %s.%s "
202 "WHERE type = 'delta' RETURNING value), "
203 "total AS (SELECT coalesce(sum(value), 0) as sum "
204 "FROM deleted) "
205 "UPDATE %s.%s "
206 "SET value = %s.value + total.sum "
207 "FROM total WHERE type = 'total' "
208 "RETURNING %s.value",
209 table->schema, table->name,
210 table->schema, table->name,
211 table->name,
212 table->name);
213
214 /*
215 * Main loop: do this until the SIGTERM handler tells us to terminate
216 */
217 while (!got_sigterm)
218 {
219 int ret;
220 int rc;
221
222 /*
223 * Background workers mustn't call usleep() or any direct equivalent:
224 * instead, they may wait on their process latch, which sleeps as
225 * necessary, but is awakened if postmaster dies. That way the
226 * background process goes away immediately in an emergency.
227 */
228 rc = WaitLatch(MyLatch,
229 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
230 worker_spi_naptime * 1000L,
231 PG_WAIT_EXTENSION);
232 ResetLatch(MyLatch);
233
234 /* emergency bailout if postmaster has died */
235 if (rc & WL_POSTMASTER_DEATH)
236 proc_exit(1);
237
238 CHECK_FOR_INTERRUPTS();
239
240 /*
241 * In case of a SIGHUP, just reload the configuration.
242 */
243 if (got_sighup)
244 {
245 got_sighup = false;
246 ProcessConfigFile(PGC_SIGHUP);
247 }
248
249 /*
250 * Start a transaction on which we can run queries. Note that each
251 * StartTransactionCommand() call should be preceded by a
252 * SetCurrentStatementStartTimestamp() call, which sets both the time
253 * for the statement we're about the run, and also the transaction
254 * start time. Also, each other query sent to SPI should probably be
255 * preceded by SetCurrentStatementStartTimestamp(), so that statement
256 * start time is always up to date.
257 *
258 * The SPI_connect() call lets us run queries through the SPI manager,
259 * and the PushActiveSnapshot() call creates an "active" snapshot
260 * which is necessary for queries to have MVCC data to work on.
261 *
262 * The pgstat_report_activity() call makes our activity visible
263 * through the pgstat views.
264 */
265 SetCurrentStatementStartTimestamp();
266 StartTransactionCommand();
267 SPI_connect();
268 PushActiveSnapshot(GetTransactionSnapshot());
269 pgstat_report_activity(STATE_RUNNING, buf.data);
270
271 /* We can now execute queries via SPI */
272 ret = SPI_execute(buf.data, false, 0);
273
274 if (ret != SPI_OK_UPDATE_RETURNING)
275 elog(FATAL, "cannot select from table %s.%s: error code %d",
276 table->schema, table->name, ret);
277
278 if (SPI_processed > 0)
279 {
280 bool isnull;
281 int32 val;
282
283 val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
284 SPI_tuptable->tupdesc,
285 1, &isnull));
286 if (!isnull)
287 elog(LOG, "%s: count in %s.%s is now %d",
288 MyBgworkerEntry->bgw_name,
289 table->schema, table->name, val);
290 }
291
292 /*
293 * And finish our transaction.
294 */
295 SPI_finish();
296 PopActiveSnapshot();
297 CommitTransactionCommand();
298 pgstat_report_stat(false);
299 pgstat_report_activity(STATE_IDLE, NULL);
300 }
301
302 proc_exit(1);
303 }
304
305 /*
306 * Entrypoint of this module.
307 *
308 * We register more than one worker process here, to demonstrate how that can
309 * be done.
310 */
311 void
312 _PG_init(void)
313 {
314 BackgroundWorker worker;
315 unsigned int i;
316
317 /* get the configuration */
318 DefineCustomIntVariable("worker_spi.naptime",
319 "Duration between each check (in seconds).",
320 NULL,
321 &worker_spi_naptime,
322 10,
323 1,
324 INT_MAX,
325 PGC_SIGHUP,
326 0,
327 NULL,
328 NULL,
329 NULL);
330
331 if (!process_shared_preload_libraries_in_progress)
332 return;
333
334 DefineCustomIntVariable("worker_spi.total_workers",
335 "Number of workers.",
336 NULL,
337 &worker_spi_total_workers,
338 2,
339 1,
340 100,
341 PGC_POSTMASTER,
342 0,
343 NULL,
344 NULL,
345 NULL);
346
347 /* set up common data for all our workers */
348 memset(&worker, 0, sizeof(worker));
349 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
350 BGWORKER_BACKEND_DATABASE_CONNECTION;
351 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
352 worker.bgw_restart_time = BGW_NEVER_RESTART;
353 sprintf(worker.bgw_library_name, "worker_spi");
354 sprintf(worker.bgw_function_name, "worker_spi_main");
355 worker.bgw_notify_pid = 0;
356
357 /*
358 * Now fill in worker-specific data, and do the actual registrations.
359 */
360 for (i = 1; i <= worker_spi_total_workers; i++)
361 {
362 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
363 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
364 worker.bgw_main_arg = Int32GetDatum(i);
365
366 RegisterBackgroundWorker(&worker);
collate_iters(iter_list)367 }
368 }
369
370 /*
371 * Dynamically launch an SPI worker.
372 */
373 Datum
374 worker_spi_launch(PG_FUNCTION_ARGS)
375 {
376 int32 i = PG_GETARG_INT32(0);
377 BackgroundWorker worker;
378 BackgroundWorkerHandle *handle;
379 BgwHandleStatus status;
380 pid_t pid;
381
382 memset(&worker, 0, sizeof(worker));
383 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
384 BGWORKER_BACKEND_DATABASE_CONNECTION;
385 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
386 worker.bgw_restart_time = BGW_NEVER_RESTART;
387 sprintf(worker.bgw_library_name, "worker_spi");
388 sprintf(worker.bgw_function_name, "worker_spi_main");
389 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
390 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
391 worker.bgw_main_arg = Int32GetDatum(i);
392 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
393 worker.bgw_notify_pid = MyProcPid;
394
395 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
396 PG_RETURN_NULL();
397
398 status = WaitForBackgroundWorkerStartup(handle, &pid);
399
400 if (status == BGWH_STOPPED)
401 ereport(ERROR,
402 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
403 errmsg("could not start background process"),
404 errhint("More details may be available in the server log.")));
405 if (status == BGWH_POSTMASTER_DIED)
406 ereport(ERROR,
407 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
408 errmsg("cannot start background processes without postmaster"),
409 errhint("Kill all remaining database processes and restart the database.")));
410 Assert(status == BGWH_STARTED);
411
412 PG_RETURN_INT32(pid);
413 }
414