1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include <postgres.h>
7 
8 /* BGW includes below */
9 /* These are always necessary for a bgworker */
10 #include <miscadmin.h>
11 #include <postmaster/bgworker.h>
12 #include <storage/ipc.h>
13 #include <storage/latch.h>
14 #include <storage/lwlock.h>
15 #include <storage/proc.h>
16 #include <storage/shmem.h>
17 
18 /* for setting our wait event during waitlatch*/
19 #include <pgstat.h>
20 
21 /* needed for getting database list*/
22 #include <access/heapam.h>
23 #include <access/htup_details.h>
24 #include <catalog/pg_database.h>
25 #include <utils/snapmgr.h>
26 #include <access/xact.h>
27 
28 /* and checking db list for whether we're in a template*/
29 #include <utils/syscache.h>
30 
31 /* for calling external function*/
32 #include <fmgr.h>
33 
34 /* for signal handling (specifically die() function) */
35 #include <tcop/tcopprot.h>
36 
37 /* for looking up sending proc information for message handling */
38 #include <storage/procarray.h>
39 
40 /* for allocating the htab storage */
41 #include <utils/memutils.h>
42 
43 /* for getting settings correct before loading the versioned scheduler */
44 #include "catalog/pg_db_role_setting.h"
45 
46 #include "../compat/compat.h"
47 #include "../extension_constants.h"
48 #include "loader.h"
49 #include "bgw_counter.h"
50 #include "bgw_message_queue.h"
51 #include "bgw_launcher.h"
52 
53 #define BGW_DB_SCHEDULER_FUNCNAME "ts_bgw_scheduler_main"
54 #define BGW_ENTRYPOINT_FUNCNAME "ts_bgw_db_scheduler_entrypoint"
55 
56 typedef enum AckResult
57 {
58 	ACK_FAILURE = 0,
59 	ACK_SUCCESS,
60 } AckResult;
61 
62 /* See state machine in README.md */
63 typedef enum SchedulerState
64 {
65 	/* Scheduler should be started but has not been allocated or started */
66 	ENABLED = 0,
67 	/* The scheduler has been allocated a spot in timescaleDB's worker counter */
68 	ALLOCATED,
69 	/* Scheduler has been started */
70 	STARTED,
71 
72 	/*
73 	 * Scheduler is stopped and should not be started automatically. START and
74 	 * RESTART messages can re-enable the scheduler.
75 	 */
76 	DISABLED
77 } SchedulerState;
78 
79 #ifdef TS_DEBUG
80 #define BGW_LAUNCHER_RESTART_TIME_S 0
81 #else
82 #define BGW_LAUNCHER_RESTART_TIME_S 60
83 #endif
84 
85 /* WaitLatch expects a long */
86 #ifdef TS_DEBUG
87 #define BGW_LAUNCHER_POLL_TIME_MS 10L
88 #else
89 #define BGW_LAUNCHER_POLL_TIME_MS 60000L
90 #endif
91 
92 static volatile sig_atomic_t got_SIGHUP = false;
93 
launcher_sighup(SIGNAL_ARGS)94 static void launcher_sighup(SIGNAL_ARGS)
95 {
96 	/* based on av_sighup_handler */
97 	int save_errno = errno;
98 
99 	got_SIGHUP = true;
100 	SetLatch(MyLatch);
101 
102 	errno = save_errno;
103 }
104 
105 /*
106  * Main bgw launcher for the cluster.
107  *
108  * Run through the TimescaleDB loader, so needs to have a small footprint as
109  * any interactions it has will need to remain backwards compatible for the
110  * foreseeable future.
111  *
112  * Notes: multiple databases in an instance (PG cluster) can have TimescaleDB
113  * installed. They are not necessarily the same version of TimescaleDB (though
114  * they could be) Shared memory is allocated and background workers are
115  * registered at shared_preload_libraries time We do not know what databases
116  * exist, nor which databases TimescaleDB is installed in (if any) at
117  * shared_preload_libraries time.
118  */
119 
120 TS_FUNCTION_INFO_V1(ts_bgw_cluster_launcher_main);
121 TS_FUNCTION_INFO_V1(ts_bgw_db_scheduler_entrypoint);
122 typedef struct DbHashEntry
123 {
124 	Oid db_oid;									 /* key for the hash table, must be first */
125 	BackgroundWorkerHandle *db_scheduler_handle; /* needed to shut down
126 												  * properly */
127 	SchedulerState state;
128 	VirtualTransactionId vxid;
129 	int state_transition_failures;
130 } DbHashEntry;
131 
132 static void scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry);
133 
134 static void
bgw_on_postmaster_death(void)135 bgw_on_postmaster_death(void)
136 {
137 	on_exit_reset(); /* don't call exit hooks cause we want to bail
138 					  * out quickly */
139 	ereport(FATAL,
140 			(errcode(ERRCODE_ADMIN_SHUTDOWN),
141 			 errmsg("postmaster exited while TimescaleDB background worker launcher was working")));
142 }
143 
144 static void
report_bgw_limit_exceeded(DbHashEntry * entry)145 report_bgw_limit_exceeded(DbHashEntry *entry)
146 {
147 	if (entry->state_transition_failures == 0)
148 		ereport(LOG,
149 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
150 				 errmsg("TimescaleDB background worker limit of %d exceeded",
151 						ts_guc_max_background_workers),
152 				 errhint("Consider increasing timescaledb.max_background_workers.")));
153 	entry->state_transition_failures++;
154 }
155 
156 static void
report_error_on_worker_register_failure(DbHashEntry * entry)157 report_error_on_worker_register_failure(DbHashEntry *entry)
158 {
159 	if (entry->state_transition_failures == 0)
160 		ereport(LOG,
161 				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
162 				 errmsg("no available background worker slots"),
163 				 errhint("Consider increasing max_worker_processes in tandem with "
164 						 "timescaledb.max_background_workers.")));
165 	entry->state_transition_failures++;
166 }
167 
168 /*
169  * Aliasing a few things in bgworker.h so that we exit correctly on postmaster
170  * death so we don't have to duplicate code basically telling it we shouldn't
171  * call exit hooks cause we want to bail out quickly - similar to how the
172  * quickdie function works when we receive a sigquit. This should work
173  * similarly because postmaster death is a similar severity of issue.
174  * Additionally, we're wrapping these calls to make sure we never have a NULL
175  * handle, if we have a null handle, we return normal things.
176  */
177 static BgwHandleStatus
get_background_worker_pid(BackgroundWorkerHandle * handle,pid_t * pidp)178 get_background_worker_pid(BackgroundWorkerHandle *handle, pid_t *pidp)
179 {
180 	BgwHandleStatus status;
181 	pid_t pid;
182 
183 	if (handle == NULL)
184 		status = BGWH_STOPPED;
185 	else
186 	{
187 		status = GetBackgroundWorkerPid(handle, &pid);
188 		if (pidp != NULL)
189 			*pidp = pid;
190 	}
191 
192 	if (status == BGWH_POSTMASTER_DIED)
193 		bgw_on_postmaster_death();
194 	return status;
195 }
196 
197 static void
wait_for_background_worker_startup(BackgroundWorkerHandle * handle,pid_t * pidp)198 wait_for_background_worker_startup(BackgroundWorkerHandle *handle, pid_t *pidp)
199 {
200 	BgwHandleStatus status;
201 
202 	if (handle == NULL)
203 		status = BGWH_STOPPED;
204 	else
205 		status = WaitForBackgroundWorkerStartup(handle, pidp);
206 
207 	/*
208 	 * We don't care whether we get BGWH_STOPPED or BGWH_STARTED here, because
209 	 * the worker could have started and stopped very quickly before we read
210 	 * it. We can't get BGWH_NOT_YET_STARTED as that's what we're waiting for.
211 	 * We do care if the Postmaster died however.
212 	 */
213 
214 	if (status == BGWH_POSTMASTER_DIED)
215 		bgw_on_postmaster_death();
216 
217 	Assert(status == BGWH_STOPPED || status == BGWH_STARTED);
218 	return;
219 }
220 
221 static void
wait_for_background_worker_shutdown(BackgroundWorkerHandle * handle)222 wait_for_background_worker_shutdown(BackgroundWorkerHandle *handle)
223 {
224 	BgwHandleStatus status;
225 
226 	if (handle == NULL)
227 		status = BGWH_STOPPED;
228 	else
229 		status = WaitForBackgroundWorkerShutdown(handle);
230 
231 	/* We can only ever get BGWH_STOPPED stopped unless the Postmaster died. */
232 	if (status == BGWH_POSTMASTER_DIED)
233 		bgw_on_postmaster_death();
234 
235 	Assert(status == BGWH_STOPPED);
236 	return;
237 }
238 
239 static void
terminate_background_worker(BackgroundWorkerHandle * handle)240 terminate_background_worker(BackgroundWorkerHandle *handle)
241 {
242 	if (handle == NULL)
243 		return;
244 	else
245 		TerminateBackgroundWorker(handle);
246 }
247 
248 extern void
ts_bgw_cluster_launcher_register(void)249 ts_bgw_cluster_launcher_register(void)
250 {
251 	BackgroundWorker worker;
252 
253 	memset(&worker, 0, sizeof(worker));
254 	/* set up worker settings for our main worker */
255 	snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Launcher");
256 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
257 	worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME_S;
258 
259 	/*
260 	 * Starting at BgWorkerStart_RecoveryFinished means we won't ever get
261 	 * started on a hot_standby see
262 	 * https://www.postgresql.org/docs/10/static/bgworker.html as it's not
263 	 * documented in bgworker.c.
264 	 */
265 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
266 	worker.bgw_notify_pid = 0;
267 	snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
268 	snprintf(worker.bgw_function_name, BGW_MAXLEN, "ts_bgw_cluster_launcher_main");
269 	RegisterBackgroundWorker(&worker);
270 }
271 
272 /*
273  * Register a background worker that calls the main TimescaleDB background
274  * worker launcher library (i.e. loader) and uses the scheduler entrypoint
275  * function.  The scheduler entrypoint will deal with starting a new worker,
276  * and waiting on any txns that it needs to, if we pass along a vxid in the
277  * bgw_extra field of the BgWorker.
278  */
279 static bool
register_entrypoint_for_db(Oid db_id,VirtualTransactionId vxid,BackgroundWorkerHandle ** handle)280 register_entrypoint_for_db(Oid db_id, VirtualTransactionId vxid, BackgroundWorkerHandle **handle)
281 {
282 	BackgroundWorker worker;
283 
284 	memset(&worker, 0, sizeof(worker));
285 	snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Scheduler");
286 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
287 	worker.bgw_restart_time = BGW_NEVER_RESTART;
288 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
289 	snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
290 	snprintf(worker.bgw_function_name, BGW_MAXLEN, BGW_ENTRYPOINT_FUNCNAME);
291 	worker.bgw_notify_pid = MyProcPid;
292 	worker.bgw_main_arg = ObjectIdGetDatum(db_id);
293 	memcpy(worker.bgw_extra, &vxid, sizeof(VirtualTransactionId));
294 
295 	return RegisterDynamicBackgroundWorker(&worker, handle);
296 }
297 
298 /* Initializes the launcher's hash table of schedulers.
299  * Return value is guaranteed to be not-null, because otherwise the function
300  * will have thrown an error.
301  */
302 static HTAB *
init_database_htab(void)303 init_database_htab(void)
304 {
305 	HASHCTL info = { .keysize = sizeof(Oid), .entrysize = sizeof(DbHashEntry) };
306 
307 	return hash_create("launcher_db_htab",
308 					   ts_guc_max_background_workers,
309 					   &info,
310 					   HASH_BLOBS | HASH_ELEM);
311 }
312 
313 /* Insert a scheduler entry into the hash table. Correctly set entry values. */
314 static DbHashEntry *
db_hash_entry_create_if_not_exists(HTAB * db_htab,Oid db_oid)315 db_hash_entry_create_if_not_exists(HTAB *db_htab, Oid db_oid)
316 {
317 	DbHashEntry *db_he;
318 	bool found;
319 
320 	db_he = (DbHashEntry *) hash_search(db_htab, &db_oid, HASH_ENTER, &found);
321 	if (!found)
322 	{
323 		db_he->db_scheduler_handle = NULL;
324 		db_he->state = ENABLED;
325 		SetInvalidVirtualTransactionId(db_he->vxid);
326 		db_he->state_transition_failures = 0;
327 
328 		/*
329 		 * Try to allocate a spot right away to give schedulers priority over
330 		 * other bgws. This is especially important on initial server startup
331 		 * where we want to reserve slots for all schedulers before starting
332 		 * any. This is done so that background workers started by schedulers
333 		 * don't race for open slots with other schedulers on startup.
334 		 */
335 		scheduler_state_trans_enabled_to_allocated(db_he);
336 	}
337 
338 	return db_he;
339 }
340 
341 /*
342  * Model this on autovacuum.c -> get_database_list.
343  *
344  * Note that we are not doing
345  * all the things around memory context that they do, because the hashtable
346  * we're using to store db entries is automatically created in its own memory
347  * context (a child of TopMemoryContext) This can get called at two different
348  * times 1) when the cluster launcher starts and is looking for dbs and 2) if
349  * it restarts due to a postmaster signal.
350  */
351 static void
populate_database_htab(HTAB * db_htab)352 populate_database_htab(HTAB *db_htab)
353 {
354 	Relation rel;
355 	TableScanDesc scan;
356 	HeapTuple tup;
357 
358 	/*
359 	 * by this time we should already be connected to the db, and only have
360 	 * access to shared catalogs
361 	 */
362 	StartTransactionCommand();
363 	(void) GetTransactionSnapshot();
364 
365 	rel = table_open(DatabaseRelationId, AccessShareLock);
366 	scan = table_beginscan_catalog(rel, 0, NULL);
367 
368 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
369 	{
370 		Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
371 
372 		if (!pgdb->datallowconn || pgdb->datistemplate)
373 			continue; /* don't bother with dbs that don't allow
374 					   * connections or are templates */
375 		db_hash_entry_create_if_not_exists(db_htab, pgdb->oid);
376 	}
377 	heap_endscan(scan);
378 	table_close(rel, AccessShareLock);
379 
380 	CommitTransactionCommand();
381 }
382 
383 static void
scheduler_modify_state(DbHashEntry * entry,SchedulerState new_state)384 scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state)
385 {
386 	Assert(entry->state != new_state);
387 	entry->state_transition_failures = 0;
388 	entry->state = new_state;
389 }
390 
391 /* TRANSITION FUNCTIONS */
392 static void
scheduler_state_trans_disabled_to_enabled(DbHashEntry * entry)393 scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry)
394 {
395 	Assert(entry->state == DISABLED);
396 	Assert(entry->db_scheduler_handle == NULL);
397 	scheduler_modify_state(entry, ENABLED);
398 }
399 
400 static void
scheduler_state_trans_enabled_to_allocated(DbHashEntry * entry)401 scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry)
402 {
403 	Assert(entry->state == ENABLED);
404 	Assert(entry->db_scheduler_handle == NULL);
405 	/* Reserve a spot for this scheduler with BGW counter */
406 	if (!ts_bgw_total_workers_increment())
407 	{
408 		report_bgw_limit_exceeded(entry);
409 		return;
410 	}
411 	scheduler_modify_state(entry, ALLOCATED);
412 }
413 
414 static void
scheduler_state_trans_started_to_allocated(DbHashEntry * entry)415 scheduler_state_trans_started_to_allocated(DbHashEntry *entry)
416 {
417 	Assert(entry->state == STARTED);
418 	Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
419 	if (entry->db_scheduler_handle != NULL)
420 	{
421 		pfree(entry->db_scheduler_handle);
422 		entry->db_scheduler_handle = NULL;
423 	}
424 	scheduler_modify_state(entry, ALLOCATED);
425 }
426 
427 static void
scheduler_state_trans_allocated_to_started(DbHashEntry * entry)428 scheduler_state_trans_allocated_to_started(DbHashEntry *entry)
429 {
430 	pid_t worker_pid;
431 	bool worker_registered;
432 
433 	Assert(entry->state == ALLOCATED);
434 	Assert(entry->db_scheduler_handle == NULL);
435 
436 	worker_registered =
437 		register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle);
438 
439 	if (!worker_registered)
440 	{
441 		report_error_on_worker_register_failure(entry);
442 		return;
443 	}
444 	wait_for_background_worker_startup(entry->db_scheduler_handle, &worker_pid);
445 	SetInvalidVirtualTransactionId(entry->vxid);
446 	scheduler_modify_state(entry, STARTED);
447 }
448 
449 static void
scheduler_state_trans_enabled_to_disabled(DbHashEntry * entry)450 scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry)
451 {
452 	Assert(entry->state == ENABLED);
453 	Assert(entry->db_scheduler_handle == NULL);
454 	scheduler_modify_state(entry, DISABLED);
455 }
456 
457 static void
scheduler_state_trans_allocated_to_disabled(DbHashEntry * entry)458 scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry)
459 {
460 	Assert(entry->state == ALLOCATED);
461 	Assert(entry->db_scheduler_handle == NULL);
462 
463 	ts_bgw_total_workers_decrement();
464 	scheduler_modify_state(entry, DISABLED);
465 }
466 
467 static void
scheduler_state_trans_started_to_disabled(DbHashEntry * entry)468 scheduler_state_trans_started_to_disabled(DbHashEntry *entry)
469 {
470 	Assert(entry->state == STARTED);
471 	Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
472 
473 	ts_bgw_total_workers_decrement();
474 	if (entry->db_scheduler_handle != NULL)
475 	{
476 		pfree(entry->db_scheduler_handle);
477 		entry->db_scheduler_handle = NULL;
478 	}
479 	scheduler_modify_state(entry, DISABLED);
480 }
481 
482 static void
scheduler_state_trans_automatic(DbHashEntry * entry)483 scheduler_state_trans_automatic(DbHashEntry *entry)
484 {
485 	switch (entry->state)
486 	{
487 		case ENABLED:
488 			scheduler_state_trans_enabled_to_allocated(entry);
489 			if (entry->state == ALLOCATED)
490 				scheduler_state_trans_allocated_to_started(entry);
491 			break;
492 		case ALLOCATED:
493 			scheduler_state_trans_allocated_to_started(entry);
494 			break;
495 		case STARTED:
496 			if (get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED)
497 				scheduler_state_trans_started_to_disabled(entry);
498 			break;
499 		case DISABLED:
500 			break;
501 	}
502 }
503 
504 static void
scheduler_state_trans_automatic_all(HTAB * db_htab)505 scheduler_state_trans_automatic_all(HTAB *db_htab)
506 {
507 	HASH_SEQ_STATUS hash_seq;
508 	DbHashEntry *current_entry;
509 
510 	hash_seq_init(&hash_seq, db_htab);
511 	while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
512 		scheduler_state_trans_automatic(current_entry);
513 }
514 
515 /* This is called when we're going to shut down so we don't leave things messy*/
516 static void
launcher_pre_shmem_cleanup(int code,Datum arg)517 launcher_pre_shmem_cleanup(int code, Datum arg)
518 {
519 	HTAB *db_htab = *(HTAB **) DatumGetPointer(arg);
520 	HASH_SEQ_STATUS hash_seq;
521 	DbHashEntry *current_entry;
522 
523 	/* db_htab will be NULL if we fail during init_database_htab */
524 	if (db_htab != NULL)
525 	{
526 		hash_seq_init(&hash_seq, db_htab);
527 
528 		/*
529 		 * Stop everyone (or at least tell the Postmaster we don't care about
530 		 * them anymore)
531 		 */
532 		while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
533 		{
534 			if (current_entry->db_scheduler_handle != NULL)
535 			{
536 				terminate_background_worker(current_entry->db_scheduler_handle);
537 				pfree(current_entry->db_scheduler_handle);
538 			}
539 		}
540 
541 		hash_destroy(db_htab);
542 	}
543 
544 	/*
545 	 * Reset our pid in the queue so that others know we've died and don't
546 	 * wait forever
547 	 */
548 	ts_bgw_message_queue_shmem_cleanup();
549 }
550 
551 /*
552  *************
553  * Actions for message types we could receive off of the bgw_message_queue.
554  *************
555  */
556 
557 /*
558  * This should be idempotent. If we find the background worker and it's not
559  * stopped, do nothing. In order to maintain idempotency, a scheduler in the
560  * ENABLED, ALLOCATED or STARTED state cannot get a new vxid to wait on. (We
561  * cannot pass in a new vxid to wait on for an already-started scheduler in any
562  * case). This means that actions like restart, which are not idempotent, will
563  * not have their effects changed by subsequent start actions, no matter the
564  * state they are in when the start action is received.
565  */
566 static AckResult
message_start_action(HTAB * db_htab,BgwMessage * message)567 message_start_action(HTAB *db_htab, BgwMessage *message)
568 {
569 	DbHashEntry *entry;
570 
571 	entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
572 
573 	if (entry->state == DISABLED)
574 		scheduler_state_trans_disabled_to_enabled(entry);
575 
576 	scheduler_state_trans_automatic(entry);
577 
578 	return (entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE);
579 }
580 
581 static AckResult
message_stop_action(HTAB * db_htab,BgwMessage * message)582 message_stop_action(HTAB *db_htab, BgwMessage *message)
583 {
584 	DbHashEntry *entry;
585 
586 	/*
587 	 * If the entry does not exist try to create it so we can put it in the
588 	 * DISABLED state. Otherwise, it will be created during the next poll and
589 	 * then will end up in the ENABLED state and proceed to being STARTED. But
590 	 * this is not the behavior we want.
591 	 */
592 	entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
593 
594 	switch (entry->state)
595 	{
596 		case ENABLED:
597 			scheduler_state_trans_enabled_to_disabled(entry);
598 			break;
599 		case ALLOCATED:
600 			scheduler_state_trans_allocated_to_disabled(entry);
601 			break;
602 		case STARTED:
603 			terminate_background_worker(entry->db_scheduler_handle);
604 			wait_for_background_worker_shutdown(entry->db_scheduler_handle);
605 			scheduler_state_trans_started_to_disabled(entry);
606 			break;
607 		case DISABLED:
608 			break;
609 	}
610 	return entry->state == DISABLED ? ACK_SUCCESS : ACK_FAILURE;
611 }
612 
613 /*
614  * This function will stop and restart a scheduler in the STARTED state,  ENABLE
615  * a scheduler if it does not exist or is in the DISABLED state and set the vxid
616  * to wait on for a scheduler in any state. It is not idempotent. Additionally,
617  * one might think that this function would simply be a combination of stop and
618  * start above, but it is not as we maintain the worker's "slot" by never
619  * releasing the worker from our "pool" of background workers as stopping and
620  * starting would.  We don't want a race condition where some other db steals
621  * the scheduler of the other by requesting a worker at the wrong time. (This is
622  * accomplished by moving from STARTED to ALLOCATED after shutting down the
623  * worker, never releasing the entry and transitioning all the way back to
624  * ENABLED).
625  */
626 static AckResult
message_restart_action(HTAB * db_htab,BgwMessage * message,VirtualTransactionId vxid)627 message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
628 {
629 	DbHashEntry *entry;
630 
631 	entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
632 
633 	entry->vxid = vxid;
634 
635 	switch (entry->state)
636 	{
637 		case ENABLED:
638 			break;
639 		case ALLOCATED:
640 			break;
641 		case STARTED:
642 			terminate_background_worker(entry->db_scheduler_handle);
643 			wait_for_background_worker_shutdown(entry->db_scheduler_handle);
644 			scheduler_state_trans_started_to_allocated(entry);
645 			break;
646 		case DISABLED:
647 			scheduler_state_trans_disabled_to_enabled(entry);
648 	}
649 
650 	scheduler_state_trans_automatic(entry);
651 	return entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE;
652 }
653 
654 /*
655  * Handle 1 message.
656  */
657 static bool
launcher_handle_message(HTAB * db_htab)658 launcher_handle_message(HTAB *db_htab)
659 {
660 	BgwMessage *message = ts_bgw_message_receive();
661 	PGPROC *sender;
662 	VirtualTransactionId vxid;
663 	AckResult action_result = ACK_FAILURE;
664 
665 	if (message == NULL)
666 		return false;
667 
668 	sender = BackendPidGetProc(message->sender_pid);
669 	if (sender == NULL)
670 	{
671 		ereport(LOG,
672 				(errmsg("TimescaleDB background worker launcher received message from non-existent "
673 						"backend")));
674 		return true;
675 	}
676 
677 	GET_VXID_FROM_PGPROC(vxid, *sender);
678 
679 	switch (message->message_type)
680 	{
681 		case START:
682 			action_result = message_start_action(db_htab, message);
683 			break;
684 		case STOP:
685 			action_result = message_stop_action(db_htab, message);
686 			break;
687 		case RESTART:
688 			action_result = message_restart_action(db_htab, message, vxid);
689 			break;
690 	}
691 
692 	ts_bgw_message_send_ack(message, action_result);
693 	return true;
694 }
695 
696 /*
697  * Wrapper around normal postgres `die()` function to give more context on
698  * sigterms. The default, `bgworker_die()`, can't be used due to the fact
699  * that it handles signals synchronously, rather than waiting for a
700  * CHECK_FOR_INTERRUPTS(). `die()` (which is arguably misnamed) sets flags
701  * that will cause the backend to exit on the next call to
702  * CHECK_FOR_INTERRUPTS(), which can happen either in our code or in
703  * functions within the Postgres codebase that we call. This means that we
704  * don't need to wait for the next time control is returned to our loop to
705  * exit, which would be necessary if we set our own flag and checked it in
706  * a loop condition. However, because it cannot exit 0, the launcher will be
707  * restarted by the postmaster, even when it has received a SIGTERM, which
708  * we decided was the proper behavior. If users want to disable the launcher,
709  * they can set `timescaledb.max_background_workers = 0` and then we will
710  * `proc_exit(0)` before doing anything else.
711  */
launcher_sigterm(SIGNAL_ARGS)712 static void launcher_sigterm(SIGNAL_ARGS)
713 {
714 	/* Do not use anything that calls malloc() inside a signal handler since
715 	 * malloc() is not signal-safe. This includes ereport() */
716 	write_stderr("terminating TimescaleDB background worker launcher due to administrator command");
717 	die(postgres_signal_arg);
718 }
719 
720 extern Datum
ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)721 ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)
722 {
723 	HTAB **htab_storage;
724 
725 	HTAB *db_htab;
726 
727 	pqsignal(SIGINT, StatementCancelHandler);
728 	pqsignal(SIGTERM, launcher_sigterm);
729 	pqsignal(SIGHUP, launcher_sighup);
730 
731 	/* Some SIGHUPS may already have been dropped, so we must load the file here */
732 	got_SIGHUP = false;
733 	ProcessConfigFile(PGC_SIGHUP);
734 	BackgroundWorkerUnblockSignals();
735 	ereport(DEBUG1, (errmsg("TimescaleDB background worker launcher started")));
736 
737 	/* set counter back to zero on restart */
738 	ts_bgw_counter_reinit();
739 	if (!ts_bgw_total_workers_increment())
740 	{
741 		/*
742 		 * Should be the first thing happening so if we already exceeded our
743 		 * limits it means we have a limit of 0 and we should just exit We
744 		 * have to exit(0) because if we exit in error we get restarted by the
745 		 * postmaster.
746 		 */
747 		ereport(LOG,
748 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
749 				 errmsg("TimescaleDB background worker is set to 0"),
750 				 errhint("TimescaleDB background worker launcher shutting down.")));
751 		proc_exit(0);
752 	}
753 	/* Connect to the db, no db name yet, so can only access shared catalogs */
754 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
755 	pgstat_report_appname(MyBgworkerEntry->bgw_name);
756 	ereport(LOG, (errmsg("TimescaleDB background worker launcher connected to shared catalogs")));
757 
758 	htab_storage = MemoryContextAllocZero(TopMemoryContext, sizeof(*htab_storage));
759 
760 	/*
761 	 * We must setup the cleanup function _before_ initializing any state it
762 	 * touches (specifically the bgw_message_queue and db_htab). Failing to do
763 	 * this can cause cascading failures when the launcher fails in
764 	 * init_database_htab (eg. due to running out of shared memory) but
765 	 * doesn't deregister itself from the shared bgw_message_queue.
766 	 */
767 	before_shmem_exit(launcher_pre_shmem_cleanup, PointerGetDatum(htab_storage));
768 
769 	ts_bgw_message_queue_set_reader();
770 
771 	db_htab = init_database_htab();
772 	*htab_storage = db_htab;
773 
774 	populate_database_htab(db_htab);
775 
776 	while (true)
777 	{
778 		int wl_rc;
779 		bool handled_msgs = false;
780 
781 		CHECK_FOR_INTERRUPTS();
782 		populate_database_htab(db_htab);
783 		handled_msgs = launcher_handle_message(db_htab);
784 		scheduler_state_trans_automatic_all(db_htab);
785 		if (handled_msgs)
786 			continue;
787 
788 		wl_rc = WaitLatch(MyLatch,
789 						  WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
790 						  BGW_LAUNCHER_POLL_TIME_MS,
791 						  PG_WAIT_EXTENSION);
792 		ResetLatch(MyLatch);
793 		if (wl_rc & WL_POSTMASTER_DEATH)
794 			bgw_on_postmaster_death();
795 
796 		if (got_SIGHUP)
797 		{
798 			got_SIGHUP = false;
799 			ProcessConfigFile(PGC_SIGHUP);
800 		}
801 	}
802 	PG_RETURN_VOID();
803 }
804 
805 /* Wrapper around `die()`, see note on `launcher_sigterm()` above for more info*/
entrypoint_sigterm(SIGNAL_ARGS)806 static void entrypoint_sigterm(SIGNAL_ARGS)
807 {
808 	/* Do not use anything that calls malloc() inside a signal handler since
809 	 * malloc() is not signal-safe. This includes ereport() */
810 	write_stderr("terminating TimescaleDB scheduler entrypoint due to administrator command");
811 	die(postgres_signal_arg);
812 }
813 
814 /*
815  * Inside the entrypoint, we must check again if we're in a template db
816  * even though we excluded template dbs in populate_database_htab because
817  * we can be called on, say, CREATE EXTENSION in a template db and then
818  * we'll not stop til next server shutdown so if we hit this point and are
819  * in a template db, we throw an error and shut down Check in the syscache
820  * rather than searching through the entire database catalog again.
821  * Modelled on autovacuum.c -> do_autovacuum.
822  */
823 static void
database_is_template_check(void)824 database_is_template_check(void)
825 {
826 	Form_pg_database pgdb;
827 	HeapTuple tuple;
828 
829 	tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
830 	if (!HeapTupleIsValid(tuple))
831 		ereport(ERROR,
832 				(errmsg("TimescaleDB background worker failed to find entry for database in "
833 						"syscache")));
834 
835 	pgdb = (Form_pg_database) GETSTRUCT(tuple);
836 	if (pgdb->datistemplate)
837 		ereport(ERROR,
838 				(errmsg("TimescaleDB background worker connected to template database, exiting")));
839 
840 	ReleaseSysCache(tuple);
841 }
842 
843 /*
844  * Before we morph into the scheduler, we also need to reload configs from their
845  * defaults if the database default has changed. Defaults are changed in the
846  * post_restore function where we change the db default for the restoring guc
847  * wait until the txn commits and then must see if the txn made the change.
848  * Checks for changes are normally run at connection startup, but because we
849  * have to connect in order to wait on the txn we have to re-run after the wait.
850  * This function is based on the postgres function in postinit.c by the same
851  * name.
852  */
853 
854 static void
process_settings(Oid databaseid)855 process_settings(Oid databaseid)
856 {
857 	Relation relsetting;
858 	Snapshot snapshot;
859 
860 	if (!IsUnderPostmaster)
861 		return;
862 
863 	relsetting = table_open(DbRoleSettingRelationId, AccessShareLock);
864 
865 	/* read all the settings under the same snapshot for efficiency */
866 	snapshot = RegisterSnapshot(GetCatalogSnapshot(DbRoleSettingRelationId));
867 
868 	/* Later settings are ignored if set earlier. */
869 	ApplySetting(snapshot, databaseid, InvalidOid, relsetting, PGC_S_DATABASE);
870 	ApplySetting(snapshot, InvalidOid, InvalidOid, relsetting, PGC_S_GLOBAL);
871 
872 	UnregisterSnapshot(snapshot);
873 	table_close(relsetting, AccessShareLock);
874 }
875 
876 /*
877  * This can be run either from the cluster launcher at db_startup time, or
878  * in the case of an install/uninstall/update of the extension, in the
879  * first case, we have no vxid that we're waiting on. In the second case,
880  * we do, because we have to wait so that we see the effects of said txn.
881  * So we wait for it to finish, then we  morph into the new db_scheduler
882  * worker using whatever version is now installed (or exit gracefully if
883  * no version is now installed).
884  */
885 extern Datum
ts_bgw_db_scheduler_entrypoint(PG_FUNCTION_ARGS)886 ts_bgw_db_scheduler_entrypoint(PG_FUNCTION_ARGS)
887 {
888 	Oid db_id = DatumGetObjectId(MyBgworkerEntry->bgw_main_arg);
889 	bool ts_installed = false;
890 	char version[MAX_VERSION_LEN];
891 	VirtualTransactionId vxid;
892 
893 	pqsignal(SIGINT, StatementCancelHandler);
894 	pqsignal(SIGTERM, entrypoint_sigterm);
895 	BackgroundWorkerUnblockSignals();
896 	BackgroundWorkerInitializeConnectionByOid(db_id, InvalidOid, 0);
897 	pgstat_report_appname(MyBgworkerEntry->bgw_name);
898 
899 	/*
900 	 * Wait until whatever vxid that potentially called us finishes before we
901 	 * happens in a txn so it's cleaned up correctly if we get a sigkill in
902 	 * the meantime, but we will need stop after and take a new txn so we can
903 	 * see the correct state after its effects
904 	 */
905 	StartTransactionCommand();
906 	(void) GetTransactionSnapshot();
907 	memcpy(&vxid, MyBgworkerEntry->bgw_extra, sizeof(VirtualTransactionId));
908 	if (VirtualTransactionIdIsValid(vxid))
909 		VirtualXactLock(vxid, true);
910 	CommitTransactionCommand();
911 
912 	/*
913 	 * now we can start our transaction and get the version currently
914 	 * installed
915 	 */
916 	StartTransactionCommand();
917 	(void) GetTransactionSnapshot();
918 
919 	/*
920 	 * Check whether a database is a template database and raise an error if
921 	 * so, as we don't want to run in template dbs.
922 	 */
923 	database_is_template_check();
924 	/*  Process any config changes caused by an ALTER DATABASE */
925 	process_settings(MyDatabaseId);
926 	ts_installed = ts_loader_extension_exists();
927 	if (ts_installed)
928 		strlcpy(version, ts_loader_extension_version(), MAX_VERSION_LEN);
929 
930 	ts_loader_extension_check();
931 	CommitTransactionCommand();
932 	if (ts_installed)
933 	{
934 		char soname[MAX_SO_NAME_LEN];
935 		PGFunction versioned_scheduler_main;
936 
937 		snprintf(soname, MAX_SO_NAME_LEN, "%s-%s", EXTENSION_SO, version);
938 		versioned_scheduler_main =
939 			load_external_function(soname, BGW_DB_SCHEDULER_FUNCNAME, false, NULL);
940 		if (versioned_scheduler_main == NULL)
941 			ereport(LOG,
942 					(errmsg("TimescaleDB version %s does not have a background worker, exiting",
943 							soname)));
944 		else /* essentially we morph into the versioned
945 			  * worker here */
946 			DirectFunctionCall1(versioned_scheduler_main, ObjectIdGetDatum(InvalidOid));
947 	}
948 	PG_RETURN_VOID();
949 }
950