1 /* -------------------------------------------------------------------------
2  *
3  * worker_spi.c
4  *		Sample background worker code that demonstrates various coding
5  *		patterns: establishing a database connection; starting and committing
6  *		transactions; using GUC variables, and heeding SIGHUP to reread
7  *		the configuration file; reporting to pg_stat_activity; using the
8  *		process latch to sleep and exit in case of postmaster death.
9  *
10  * This code connects to a database, creates a schema and table, and summarizes
11  * the numbers contained therein.  To see it working, insert an initial value
12  * with "total" type and some initial value; then insert some other rows with
13  * "delta" type.  Delta rows will be deleted by this worker and their values
14  * aggregated into the total.
15  *
16  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
17  *
18  * IDENTIFICATION
19  *		src/test/modules/worker_spi/worker_spi.c
20  *
21  * -------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24 
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33 
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43 
44 PG_MODULE_MAGIC;
45 
46 PG_FUNCTION_INFO_V1(worker_spi_launch);
47 
48 void		_PG_init(void);
49 void		worker_spi_main(Datum) pg_attribute_noreturn();
50 
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54 
55 /* GUC variables */
56 static int	worker_spi_naptime = 10;
57 static int	worker_spi_total_workers = 2;
58 static char *worker_spi_database = NULL;
59 
60 
61 typedef struct worktable
62 {
63 	const char *schema;
64 	const char *name;
65 } worktable;
66 
67 /*
68  * Signal handler for SIGTERM
69  *		Set a flag to let the main loop to terminate, and set our latch to wake
70  *		it up.
71  */
72 static void
worker_spi_sigterm(SIGNAL_ARGS)73 worker_spi_sigterm(SIGNAL_ARGS)
74 {
75 	int			save_errno = errno;
76 
77 	got_sigterm = true;
78 	SetLatch(MyLatch);
79 
80 	errno = save_errno;
81 }
82 
83 /*
84  * Signal handler for SIGHUP
85  *		Set a flag to tell the main loop to reread the config file, and set
86  *		our latch to wake it up.
87  */
88 static void
worker_spi_sighup(SIGNAL_ARGS)89 worker_spi_sighup(SIGNAL_ARGS)
90 {
91 	int			save_errno = errno;
92 
93 	got_sighup = true;
94 	SetLatch(MyLatch);
95 
96 	errno = save_errno;
97 }
98 
99 /*
100  * Initialize workspace for a worker process: create the schema if it doesn't
101  * already exist.
102  */
103 static void
initialize_worker_spi(worktable * table)104 initialize_worker_spi(worktable *table)
105 {
106 	int			ret;
107 	int			ntup;
108 	bool		isnull;
109 	StringInfoData buf;
110 
111 	SetCurrentStatementStartTimestamp();
112 	StartTransactionCommand();
113 	SPI_connect();
114 	PushActiveSnapshot(GetTransactionSnapshot());
115 	pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
116 
117 	/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
118 	initStringInfo(&buf);
119 	appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
120 					 table->schema);
121 
122 	ret = SPI_execute(buf.data, true, 0);
123 	if (ret != SPI_OK_SELECT)
124 		elog(FATAL, "SPI_execute failed: error code %d", ret);
125 
126 	if (SPI_processed != 1)
127 		elog(FATAL, "not a singleton result");
128 
129 	ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
130 									   SPI_tuptable->tupdesc,
131 									   1, &isnull));
132 	if (isnull)
133 		elog(FATAL, "null result");
134 
135 	if (ntup == 0)
136 	{
137 		resetStringInfo(&buf);
138 		appendStringInfo(&buf,
139 						 "CREATE SCHEMA \"%s\" "
140 						 "CREATE TABLE \"%s\" ("
141 						 "		type text CHECK (type IN ('total', 'delta')), "
142 						 "		value	integer)"
143 						 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
144 						 "WHERE type = 'total'",
145 						 table->schema, table->name, table->name, table->name);
146 
147 		/* set statement start time */
148 		SetCurrentStatementStartTimestamp();
149 
150 		ret = SPI_execute(buf.data, false, 0);
151 
152 		if (ret != SPI_OK_UTILITY)
153 			elog(FATAL, "failed to create my schema");
154 	}
155 
156 	SPI_finish();
157 	PopActiveSnapshot();
158 	CommitTransactionCommand();
159 	pgstat_report_activity(STATE_IDLE, NULL);
160 }
161 
162 void
worker_spi_main(Datum main_arg)163 worker_spi_main(Datum main_arg)
164 {
165 	int			index = DatumGetInt32(main_arg);
166 	worktable  *table;
167 	StringInfoData buf;
168 	char		name[20];
169 
170 	table = palloc(sizeof(worktable));
171 	sprintf(name, "schema%d", index);
172 	table->schema = pstrdup(name);
173 	table->name = pstrdup("counted");
174 
175 	/* Establish signal handlers before unblocking signals. */
176 	pqsignal(SIGHUP, worker_spi_sighup);
177 	pqsignal(SIGTERM, worker_spi_sigterm);
178 
179 	/* We're now ready to receive signals */
180 	BackgroundWorkerUnblockSignals();
181 
182 	/* Connect to our database */
183 	BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
184 
185 	elog(LOG, "%s initialized with %s.%s",
186 		 MyBgworkerEntry->bgw_name, table->schema, table->name);
187 	initialize_worker_spi(table);
188 
189 	/*
190 	 * Quote identifiers passed to us.  Note that this must be done after
191 	 * initialize_worker_spi, because that routine assumes the names are not
192 	 * quoted.
193 	 *
194 	 * Note some memory might be leaked here.
195 	 */
196 	table->schema = quote_identifier(table->schema);
197 	table->name = quote_identifier(table->name);
198 
199 	initStringInfo(&buf);
200 	appendStringInfo(&buf,
201 					 "WITH deleted AS (DELETE "
202 					 "FROM %s.%s "
203 					 "WHERE type = 'delta' RETURNING value), "
204 					 "total AS (SELECT coalesce(sum(value), 0) as sum "
205 					 "FROM deleted) "
206 					 "UPDATE %s.%s "
207 					 "SET value = %s.value + total.sum "
208 					 "FROM total WHERE type = 'total' "
209 					 "RETURNING %s.value",
210 					 table->schema, table->name,
211 					 table->schema, table->name,
212 					 table->name,
213 					 table->name);
214 
215 	/*
216 	 * Main loop: do this until the SIGTERM handler tells us to terminate
217 	 */
218 	while (!got_sigterm)
219 	{
220 		int			ret;
221 
222 		/*
223 		 * Background workers mustn't call usleep() or any direct equivalent:
224 		 * instead, they may wait on their process latch, which sleeps as
225 		 * necessary, but is awakened if postmaster dies.  That way the
226 		 * background process goes away immediately in an emergency.
227 		 */
228 		(void) WaitLatch(MyLatch,
229 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
230 						 worker_spi_naptime * 1000L,
231 						 PG_WAIT_EXTENSION);
232 		ResetLatch(MyLatch);
233 
234 		CHECK_FOR_INTERRUPTS();
235 
236 		/*
237 		 * In case of a SIGHUP, just reload the configuration.
238 		 */
239 		if (got_sighup)
240 		{
241 			got_sighup = false;
242 			ProcessConfigFile(PGC_SIGHUP);
243 		}
244 
245 		/*
246 		 * Start a transaction on which we can run queries.  Note that each
247 		 * StartTransactionCommand() call should be preceded by a
248 		 * SetCurrentStatementStartTimestamp() call, which sets both the time
249 		 * for the statement we're about the run, and also the transaction
250 		 * start time.  Also, each other query sent to SPI should probably be
251 		 * preceded by SetCurrentStatementStartTimestamp(), so that statement
252 		 * start time is always up to date.
253 		 *
254 		 * The SPI_connect() call lets us run queries through the SPI manager,
255 		 * and the PushActiveSnapshot() call creates an "active" snapshot
256 		 * which is necessary for queries to have MVCC data to work on.
257 		 *
258 		 * The pgstat_report_activity() call makes our activity visible
259 		 * through the pgstat views.
260 		 */
261 		SetCurrentStatementStartTimestamp();
262 		StartTransactionCommand();
263 		SPI_connect();
264 		PushActiveSnapshot(GetTransactionSnapshot());
265 		pgstat_report_activity(STATE_RUNNING, buf.data);
266 
267 		/* We can now execute queries via SPI */
268 		ret = SPI_execute(buf.data, false, 0);
269 
270 		if (ret != SPI_OK_UPDATE_RETURNING)
271 			elog(FATAL, "cannot select from table %s.%s: error code %d",
272 				 table->schema, table->name, ret);
273 
274 		if (SPI_processed > 0)
275 		{
276 			bool		isnull;
277 			int32		val;
278 
279 			val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
280 											  SPI_tuptable->tupdesc,
281 											  1, &isnull));
282 			if (!isnull)
283 				elog(LOG, "%s: count in %s.%s is now %d",
284 					 MyBgworkerEntry->bgw_name,
285 					 table->schema, table->name, val);
286 		}
287 
288 		/*
289 		 * And finish our transaction.
290 		 */
291 		SPI_finish();
292 		PopActiveSnapshot();
293 		CommitTransactionCommand();
294 		pgstat_report_stat(false);
295 		pgstat_report_activity(STATE_IDLE, NULL);
296 	}
297 
298 	proc_exit(1);
299 }
300 
301 /*
302  * Entrypoint of this module.
303  *
304  * We register more than one worker process here, to demonstrate how that can
305  * be done.
306  */
307 void
_PG_init(void)308 _PG_init(void)
309 {
310 	BackgroundWorker worker;
311 	unsigned int i;
312 
313 	/* get the configuration */
314 	DefineCustomIntVariable("worker_spi.naptime",
315 							"Duration between each check (in seconds).",
316 							NULL,
317 							&worker_spi_naptime,
318 							10,
319 							1,
320 							INT_MAX,
321 							PGC_SIGHUP,
322 							0,
323 							NULL,
324 							NULL,
325 							NULL);
326 
327 	if (!process_shared_preload_libraries_in_progress)
328 		return;
329 
330 	DefineCustomIntVariable("worker_spi.total_workers",
331 							"Number of workers.",
332 							NULL,
333 							&worker_spi_total_workers,
334 							2,
335 							1,
336 							100,
337 							PGC_POSTMASTER,
338 							0,
339 							NULL,
340 							NULL,
341 							NULL);
342 
343 	DefineCustomStringVariable("worker_spi.database",
344 							   "Database to connect to.",
345 							   NULL,
346 							   &worker_spi_database,
347 							   "postgres",
348 							   PGC_POSTMASTER,
349 							   0,
350 							   NULL, NULL, NULL);
351 
352 	/* set up common data for all our workers */
353 	memset(&worker, 0, sizeof(worker));
354 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
355 		BGWORKER_BACKEND_DATABASE_CONNECTION;
356 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
357 	worker.bgw_restart_time = BGW_NEVER_RESTART;
358 	sprintf(worker.bgw_library_name, "worker_spi");
359 	sprintf(worker.bgw_function_name, "worker_spi_main");
360 	worker.bgw_notify_pid = 0;
361 
362 	/*
363 	 * Now fill in worker-specific data, and do the actual registrations.
364 	 */
365 	for (i = 1; i <= worker_spi_total_workers; i++)
366 	{
367 		snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
368 		snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
369 		worker.bgw_main_arg = Int32GetDatum(i);
370 
371 		RegisterBackgroundWorker(&worker);
372 	}
373 }
374 
375 /*
376  * Dynamically launch an SPI worker.
377  */
378 Datum
worker_spi_launch(PG_FUNCTION_ARGS)379 worker_spi_launch(PG_FUNCTION_ARGS)
380 {
381 	int32		i = PG_GETARG_INT32(0);
382 	BackgroundWorker worker;
383 	BackgroundWorkerHandle *handle;
384 	BgwHandleStatus status;
385 	pid_t		pid;
386 
387 	memset(&worker, 0, sizeof(worker));
388 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
389 		BGWORKER_BACKEND_DATABASE_CONNECTION;
390 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
391 	worker.bgw_restart_time = BGW_NEVER_RESTART;
392 	sprintf(worker.bgw_library_name, "worker_spi");
393 	sprintf(worker.bgw_function_name, "worker_spi_main");
394 	snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
395 	snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
396 	worker.bgw_main_arg = Int32GetDatum(i);
397 	/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
398 	worker.bgw_notify_pid = MyProcPid;
399 
400 	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
401 		PG_RETURN_NULL();
402 
403 	status = WaitForBackgroundWorkerStartup(handle, &pid);
404 
405 	if (status == BGWH_STOPPED)
406 		ereport(ERROR,
407 				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
408 				 errmsg("could not start background process"),
409 				 errhint("More details may be available in the server log.")));
410 	if (status == BGWH_POSTMASTER_DIED)
411 		ereport(ERROR,
412 				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
413 				 errmsg("cannot start background processes without postmaster"),
414 				 errhint("Kill all remaining database processes and restart the database.")));
415 	Assert(status == BGWH_STARTED);
416 
417 	PG_RETURN_INT32(pid);
418 }
419