1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7
8 /* BGW includes below */
9 /* These are always necessary for a bgworker */
10 #include <miscadmin.h>
11 #include <postmaster/bgworker.h>
12 #include <storage/ipc.h>
13 #include <storage/latch.h>
14 #include <storage/lwlock.h>
15 #include <storage/proc.h>
16 #include <storage/shmem.h>
17
18 /* for setting our wait event during waitlatch*/
19 #include <pgstat.h>
20
21 /* needed for getting database list*/
22 #include <access/heapam.h>
23 #include <access/htup_details.h>
24 #include <catalog/pg_database.h>
25 #include <utils/snapmgr.h>
26 #include <access/xact.h>
27
28 /* and checking db list for whether we're in a template*/
29 #include <utils/syscache.h>
30
31 /* for calling external function*/
32 #include <fmgr.h>
33
34 /* for signal handling (specifically die() function) */
35 #include <tcop/tcopprot.h>
36
37 /* for looking up sending proc information for message handling */
38 #include <storage/procarray.h>
39
40 /* for allocating the htab storage */
41 #include <utils/memutils.h>
42
43 /* for getting settings correct before loading the versioned scheduler */
44 #include "catalog/pg_db_role_setting.h"
45
46 #include "../compat/compat.h"
47 #include "../extension_constants.h"
48 #include "loader.h"
49 #include "bgw_counter.h"
50 #include "bgw_message_queue.h"
51 #include "bgw_launcher.h"
52
53 #define BGW_DB_SCHEDULER_FUNCNAME "ts_bgw_scheduler_main"
54 #define BGW_ENTRYPOINT_FUNCNAME "ts_bgw_db_scheduler_entrypoint"
55
56 typedef enum AckResult
57 {
58 ACK_FAILURE = 0,
59 ACK_SUCCESS,
60 } AckResult;
61
62 /* See state machine in README.md */
63 typedef enum SchedulerState
64 {
65 /* Scheduler should be started but has not been allocated or started */
66 ENABLED = 0,
67 /* The scheduler has been allocated a spot in timescaleDB's worker counter */
68 ALLOCATED,
69 /* Scheduler has been started */
70 STARTED,
71
72 /*
73 * Scheduler is stopped and should not be started automatically. START and
74 * RESTART messages can re-enable the scheduler.
75 */
76 DISABLED
77 } SchedulerState;
78
79 #ifdef TS_DEBUG
80 #define BGW_LAUNCHER_RESTART_TIME_S 0
81 #else
82 #define BGW_LAUNCHER_RESTART_TIME_S 60
83 #endif
84
85 /* WaitLatch expects a long */
86 #ifdef TS_DEBUG
87 #define BGW_LAUNCHER_POLL_TIME_MS 10L
88 #else
89 #define BGW_LAUNCHER_POLL_TIME_MS 60000L
90 #endif
91
92 static volatile sig_atomic_t got_SIGHUP = false;
93
launcher_sighup(SIGNAL_ARGS)94 static void launcher_sighup(SIGNAL_ARGS)
95 {
96 /* based on av_sighup_handler */
97 int save_errno = errno;
98
99 got_SIGHUP = true;
100 SetLatch(MyLatch);
101
102 errno = save_errno;
103 }
104
105 /*
106 * Main bgw launcher for the cluster.
107 *
108 * Run through the TimescaleDB loader, so needs to have a small footprint as
109 * any interactions it has will need to remain backwards compatible for the
110 * foreseeable future.
111 *
112 * Notes: multiple databases in an instance (PG cluster) can have TimescaleDB
113 * installed. They are not necessarily the same version of TimescaleDB (though
114 * they could be) Shared memory is allocated and background workers are
115 * registered at shared_preload_libraries time We do not know what databases
116 * exist, nor which databases TimescaleDB is installed in (if any) at
117 * shared_preload_libraries time.
118 */
119
120 TS_FUNCTION_INFO_V1(ts_bgw_cluster_launcher_main);
121 TS_FUNCTION_INFO_V1(ts_bgw_db_scheduler_entrypoint);
122 typedef struct DbHashEntry
123 {
124 Oid db_oid; /* key for the hash table, must be first */
125 BackgroundWorkerHandle *db_scheduler_handle; /* needed to shut down
126 * properly */
127 SchedulerState state;
128 VirtualTransactionId vxid;
129 int state_transition_failures;
130 } DbHashEntry;
131
132 static void scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry);
133
134 static void
bgw_on_postmaster_death(void)135 bgw_on_postmaster_death(void)
136 {
137 on_exit_reset(); /* don't call exit hooks cause we want to bail
138 * out quickly */
139 ereport(FATAL,
140 (errcode(ERRCODE_ADMIN_SHUTDOWN),
141 errmsg("postmaster exited while TimescaleDB background worker launcher was working")));
142 }
143
144 static void
report_bgw_limit_exceeded(DbHashEntry * entry)145 report_bgw_limit_exceeded(DbHashEntry *entry)
146 {
147 if (entry->state_transition_failures == 0)
148 ereport(LOG,
149 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
150 errmsg("TimescaleDB background worker limit of %d exceeded",
151 ts_guc_max_background_workers),
152 errhint("Consider increasing timescaledb.max_background_workers.")));
153 entry->state_transition_failures++;
154 }
155
156 static void
report_error_on_worker_register_failure(DbHashEntry * entry)157 report_error_on_worker_register_failure(DbHashEntry *entry)
158 {
159 if (entry->state_transition_failures == 0)
160 ereport(LOG,
161 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
162 errmsg("no available background worker slots"),
163 errhint("Consider increasing max_worker_processes in tandem with "
164 "timescaledb.max_background_workers.")));
165 entry->state_transition_failures++;
166 }
167
168 /*
169 * Aliasing a few things in bgworker.h so that we exit correctly on postmaster
170 * death so we don't have to duplicate code basically telling it we shouldn't
171 * call exit hooks cause we want to bail out quickly - similar to how the
172 * quickdie function works when we receive a sigquit. This should work
173 * similarly because postmaster death is a similar severity of issue.
174 * Additionally, we're wrapping these calls to make sure we never have a NULL
175 * handle, if we have a null handle, we return normal things.
176 */
177 static BgwHandleStatus
get_background_worker_pid(BackgroundWorkerHandle * handle,pid_t * pidp)178 get_background_worker_pid(BackgroundWorkerHandle *handle, pid_t *pidp)
179 {
180 BgwHandleStatus status;
181 pid_t pid;
182
183 if (handle == NULL)
184 status = BGWH_STOPPED;
185 else
186 {
187 status = GetBackgroundWorkerPid(handle, &pid);
188 if (pidp != NULL)
189 *pidp = pid;
190 }
191
192 if (status == BGWH_POSTMASTER_DIED)
193 bgw_on_postmaster_death();
194 return status;
195 }
196
197 static void
wait_for_background_worker_startup(BackgroundWorkerHandle * handle,pid_t * pidp)198 wait_for_background_worker_startup(BackgroundWorkerHandle *handle, pid_t *pidp)
199 {
200 BgwHandleStatus status;
201
202 if (handle == NULL)
203 status = BGWH_STOPPED;
204 else
205 status = WaitForBackgroundWorkerStartup(handle, pidp);
206
207 /*
208 * We don't care whether we get BGWH_STOPPED or BGWH_STARTED here, because
209 * the worker could have started and stopped very quickly before we read
210 * it. We can't get BGWH_NOT_YET_STARTED as that's what we're waiting for.
211 * We do care if the Postmaster died however.
212 */
213
214 if (status == BGWH_POSTMASTER_DIED)
215 bgw_on_postmaster_death();
216
217 Assert(status == BGWH_STOPPED || status == BGWH_STARTED);
218 return;
219 }
220
221 static void
wait_for_background_worker_shutdown(BackgroundWorkerHandle * handle)222 wait_for_background_worker_shutdown(BackgroundWorkerHandle *handle)
223 {
224 BgwHandleStatus status;
225
226 if (handle == NULL)
227 status = BGWH_STOPPED;
228 else
229 status = WaitForBackgroundWorkerShutdown(handle);
230
231 /* We can only ever get BGWH_STOPPED stopped unless the Postmaster died. */
232 if (status == BGWH_POSTMASTER_DIED)
233 bgw_on_postmaster_death();
234
235 Assert(status == BGWH_STOPPED);
236 return;
237 }
238
239 static void
terminate_background_worker(BackgroundWorkerHandle * handle)240 terminate_background_worker(BackgroundWorkerHandle *handle)
241 {
242 if (handle == NULL)
243 return;
244 else
245 TerminateBackgroundWorker(handle);
246 }
247
248 extern void
ts_bgw_cluster_launcher_register(void)249 ts_bgw_cluster_launcher_register(void)
250 {
251 BackgroundWorker worker;
252
253 memset(&worker, 0, sizeof(worker));
254 /* set up worker settings for our main worker */
255 snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Launcher");
256 worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
257 worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME_S;
258
259 /*
260 * Starting at BgWorkerStart_RecoveryFinished means we won't ever get
261 * started on a hot_standby see
262 * https://www.postgresql.org/docs/10/static/bgworker.html as it's not
263 * documented in bgworker.c.
264 */
265 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
266 worker.bgw_notify_pid = 0;
267 snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
268 snprintf(worker.bgw_function_name, BGW_MAXLEN, "ts_bgw_cluster_launcher_main");
269 RegisterBackgroundWorker(&worker);
270 }
271
272 /*
273 * Register a background worker that calls the main TimescaleDB background
274 * worker launcher library (i.e. loader) and uses the scheduler entrypoint
275 * function. The scheduler entrypoint will deal with starting a new worker,
276 * and waiting on any txns that it needs to, if we pass along a vxid in the
277 * bgw_extra field of the BgWorker.
278 */
279 static bool
register_entrypoint_for_db(Oid db_id,VirtualTransactionId vxid,BackgroundWorkerHandle ** handle)280 register_entrypoint_for_db(Oid db_id, VirtualTransactionId vxid, BackgroundWorkerHandle **handle)
281 {
282 BackgroundWorker worker;
283
284 memset(&worker, 0, sizeof(worker));
285 snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Scheduler");
286 worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
287 worker.bgw_restart_time = BGW_NEVER_RESTART;
288 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
289 snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
290 snprintf(worker.bgw_function_name, BGW_MAXLEN, BGW_ENTRYPOINT_FUNCNAME);
291 worker.bgw_notify_pid = MyProcPid;
292 worker.bgw_main_arg = ObjectIdGetDatum(db_id);
293 memcpy(worker.bgw_extra, &vxid, sizeof(VirtualTransactionId));
294
295 return RegisterDynamicBackgroundWorker(&worker, handle);
296 }
297
298 /* Initializes the launcher's hash table of schedulers.
299 * Return value is guaranteed to be not-null, because otherwise the function
300 * will have thrown an error.
301 */
302 static HTAB *
init_database_htab(void)303 init_database_htab(void)
304 {
305 HASHCTL info = { .keysize = sizeof(Oid), .entrysize = sizeof(DbHashEntry) };
306
307 return hash_create("launcher_db_htab",
308 ts_guc_max_background_workers,
309 &info,
310 HASH_BLOBS | HASH_ELEM);
311 }
312
313 /* Insert a scheduler entry into the hash table. Correctly set entry values. */
314 static DbHashEntry *
db_hash_entry_create_if_not_exists(HTAB * db_htab,Oid db_oid)315 db_hash_entry_create_if_not_exists(HTAB *db_htab, Oid db_oid)
316 {
317 DbHashEntry *db_he;
318 bool found;
319
320 db_he = (DbHashEntry *) hash_search(db_htab, &db_oid, HASH_ENTER, &found);
321 if (!found)
322 {
323 db_he->db_scheduler_handle = NULL;
324 db_he->state = ENABLED;
325 SetInvalidVirtualTransactionId(db_he->vxid);
326 db_he->state_transition_failures = 0;
327
328 /*
329 * Try to allocate a spot right away to give schedulers priority over
330 * other bgws. This is especially important on initial server startup
331 * where we want to reserve slots for all schedulers before starting
332 * any. This is done so that background workers started by schedulers
333 * don't race for open slots with other schedulers on startup.
334 */
335 scheduler_state_trans_enabled_to_allocated(db_he);
336 }
337
338 return db_he;
339 }
340
341 /*
342 * Model this on autovacuum.c -> get_database_list.
343 *
344 * Note that we are not doing
345 * all the things around memory context that they do, because the hashtable
346 * we're using to store db entries is automatically created in its own memory
347 * context (a child of TopMemoryContext) This can get called at two different
348 * times 1) when the cluster launcher starts and is looking for dbs and 2) if
349 * it restarts due to a postmaster signal.
350 */
351 static void
populate_database_htab(HTAB * db_htab)352 populate_database_htab(HTAB *db_htab)
353 {
354 Relation rel;
355 TableScanDesc scan;
356 HeapTuple tup;
357
358 /*
359 * by this time we should already be connected to the db, and only have
360 * access to shared catalogs
361 */
362 StartTransactionCommand();
363 (void) GetTransactionSnapshot();
364
365 rel = table_open(DatabaseRelationId, AccessShareLock);
366 scan = table_beginscan_catalog(rel, 0, NULL);
367
368 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
369 {
370 Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
371
372 if (!pgdb->datallowconn || pgdb->datistemplate)
373 continue; /* don't bother with dbs that don't allow
374 * connections or are templates */
375 db_hash_entry_create_if_not_exists(db_htab, pgdb->oid);
376 }
377 heap_endscan(scan);
378 table_close(rel, AccessShareLock);
379
380 CommitTransactionCommand();
381 }
382
383 static void
scheduler_modify_state(DbHashEntry * entry,SchedulerState new_state)384 scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state)
385 {
386 Assert(entry->state != new_state);
387 entry->state_transition_failures = 0;
388 entry->state = new_state;
389 }
390
391 /* TRANSITION FUNCTIONS */
392 static void
scheduler_state_trans_disabled_to_enabled(DbHashEntry * entry)393 scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry)
394 {
395 Assert(entry->state == DISABLED);
396 Assert(entry->db_scheduler_handle == NULL);
397 scheduler_modify_state(entry, ENABLED);
398 }
399
400 static void
scheduler_state_trans_enabled_to_allocated(DbHashEntry * entry)401 scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry)
402 {
403 Assert(entry->state == ENABLED);
404 Assert(entry->db_scheduler_handle == NULL);
405 /* Reserve a spot for this scheduler with BGW counter */
406 if (!ts_bgw_total_workers_increment())
407 {
408 report_bgw_limit_exceeded(entry);
409 return;
410 }
411 scheduler_modify_state(entry, ALLOCATED);
412 }
413
414 static void
scheduler_state_trans_started_to_allocated(DbHashEntry * entry)415 scheduler_state_trans_started_to_allocated(DbHashEntry *entry)
416 {
417 Assert(entry->state == STARTED);
418 Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
419 if (entry->db_scheduler_handle != NULL)
420 {
421 pfree(entry->db_scheduler_handle);
422 entry->db_scheduler_handle = NULL;
423 }
424 scheduler_modify_state(entry, ALLOCATED);
425 }
426
427 static void
scheduler_state_trans_allocated_to_started(DbHashEntry * entry)428 scheduler_state_trans_allocated_to_started(DbHashEntry *entry)
429 {
430 pid_t worker_pid;
431 bool worker_registered;
432
433 Assert(entry->state == ALLOCATED);
434 Assert(entry->db_scheduler_handle == NULL);
435
436 worker_registered =
437 register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle);
438
439 if (!worker_registered)
440 {
441 report_error_on_worker_register_failure(entry);
442 return;
443 }
444 wait_for_background_worker_startup(entry->db_scheduler_handle, &worker_pid);
445 SetInvalidVirtualTransactionId(entry->vxid);
446 scheduler_modify_state(entry, STARTED);
447 }
448
449 static void
scheduler_state_trans_enabled_to_disabled(DbHashEntry * entry)450 scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry)
451 {
452 Assert(entry->state == ENABLED);
453 Assert(entry->db_scheduler_handle == NULL);
454 scheduler_modify_state(entry, DISABLED);
455 }
456
457 static void
scheduler_state_trans_allocated_to_disabled(DbHashEntry * entry)458 scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry)
459 {
460 Assert(entry->state == ALLOCATED);
461 Assert(entry->db_scheduler_handle == NULL);
462
463 ts_bgw_total_workers_decrement();
464 scheduler_modify_state(entry, DISABLED);
465 }
466
467 static void
scheduler_state_trans_started_to_disabled(DbHashEntry * entry)468 scheduler_state_trans_started_to_disabled(DbHashEntry *entry)
469 {
470 Assert(entry->state == STARTED);
471 Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
472
473 ts_bgw_total_workers_decrement();
474 if (entry->db_scheduler_handle != NULL)
475 {
476 pfree(entry->db_scheduler_handle);
477 entry->db_scheduler_handle = NULL;
478 }
479 scheduler_modify_state(entry, DISABLED);
480 }
481
482 static void
scheduler_state_trans_automatic(DbHashEntry * entry)483 scheduler_state_trans_automatic(DbHashEntry *entry)
484 {
485 switch (entry->state)
486 {
487 case ENABLED:
488 scheduler_state_trans_enabled_to_allocated(entry);
489 if (entry->state == ALLOCATED)
490 scheduler_state_trans_allocated_to_started(entry);
491 break;
492 case ALLOCATED:
493 scheduler_state_trans_allocated_to_started(entry);
494 break;
495 case STARTED:
496 if (get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED)
497 scheduler_state_trans_started_to_disabled(entry);
498 break;
499 case DISABLED:
500 break;
501 }
502 }
503
504 static void
scheduler_state_trans_automatic_all(HTAB * db_htab)505 scheduler_state_trans_automatic_all(HTAB *db_htab)
506 {
507 HASH_SEQ_STATUS hash_seq;
508 DbHashEntry *current_entry;
509
510 hash_seq_init(&hash_seq, db_htab);
511 while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
512 scheduler_state_trans_automatic(current_entry);
513 }
514
515 /* This is called when we're going to shut down so we don't leave things messy*/
516 static void
launcher_pre_shmem_cleanup(int code,Datum arg)517 launcher_pre_shmem_cleanup(int code, Datum arg)
518 {
519 HTAB *db_htab = *(HTAB **) DatumGetPointer(arg);
520 HASH_SEQ_STATUS hash_seq;
521 DbHashEntry *current_entry;
522
523 /* db_htab will be NULL if we fail during init_database_htab */
524 if (db_htab != NULL)
525 {
526 hash_seq_init(&hash_seq, db_htab);
527
528 /*
529 * Stop everyone (or at least tell the Postmaster we don't care about
530 * them anymore)
531 */
532 while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
533 {
534 if (current_entry->db_scheduler_handle != NULL)
535 {
536 terminate_background_worker(current_entry->db_scheduler_handle);
537 pfree(current_entry->db_scheduler_handle);
538 }
539 }
540
541 hash_destroy(db_htab);
542 }
543
544 /*
545 * Reset our pid in the queue so that others know we've died and don't
546 * wait forever
547 */
548 ts_bgw_message_queue_shmem_cleanup();
549 }
550
551 /*
552 *************
553 * Actions for message types we could receive off of the bgw_message_queue.
554 *************
555 */
556
557 /*
558 * This should be idempotent. If we find the background worker and it's not
559 * stopped, do nothing. In order to maintain idempotency, a scheduler in the
560 * ENABLED, ALLOCATED or STARTED state cannot get a new vxid to wait on. (We
561 * cannot pass in a new vxid to wait on for an already-started scheduler in any
562 * case). This means that actions like restart, which are not idempotent, will
563 * not have their effects changed by subsequent start actions, no matter the
564 * state they are in when the start action is received.
565 */
566 static AckResult
message_start_action(HTAB * db_htab,BgwMessage * message)567 message_start_action(HTAB *db_htab, BgwMessage *message)
568 {
569 DbHashEntry *entry;
570
571 entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
572
573 if (entry->state == DISABLED)
574 scheduler_state_trans_disabled_to_enabled(entry);
575
576 scheduler_state_trans_automatic(entry);
577
578 return (entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE);
579 }
580
581 static AckResult
message_stop_action(HTAB * db_htab,BgwMessage * message)582 message_stop_action(HTAB *db_htab, BgwMessage *message)
583 {
584 DbHashEntry *entry;
585
586 /*
587 * If the entry does not exist try to create it so we can put it in the
588 * DISABLED state. Otherwise, it will be created during the next poll and
589 * then will end up in the ENABLED state and proceed to being STARTED. But
590 * this is not the behavior we want.
591 */
592 entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
593
594 switch (entry->state)
595 {
596 case ENABLED:
597 scheduler_state_trans_enabled_to_disabled(entry);
598 break;
599 case ALLOCATED:
600 scheduler_state_trans_allocated_to_disabled(entry);
601 break;
602 case STARTED:
603 terminate_background_worker(entry->db_scheduler_handle);
604 wait_for_background_worker_shutdown(entry->db_scheduler_handle);
605 scheduler_state_trans_started_to_disabled(entry);
606 break;
607 case DISABLED:
608 break;
609 }
610 return entry->state == DISABLED ? ACK_SUCCESS : ACK_FAILURE;
611 }
612
613 /*
614 * This function will stop and restart a scheduler in the STARTED state, ENABLE
615 * a scheduler if it does not exist or is in the DISABLED state and set the vxid
616 * to wait on for a scheduler in any state. It is not idempotent. Additionally,
617 * one might think that this function would simply be a combination of stop and
618 * start above, but it is not as we maintain the worker's "slot" by never
619 * releasing the worker from our "pool" of background workers as stopping and
620 * starting would. We don't want a race condition where some other db steals
621 * the scheduler of the other by requesting a worker at the wrong time. (This is
622 * accomplished by moving from STARTED to ALLOCATED after shutting down the
623 * worker, never releasing the entry and transitioning all the way back to
624 * ENABLED).
625 */
626 static AckResult
message_restart_action(HTAB * db_htab,BgwMessage * message,VirtualTransactionId vxid)627 message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
628 {
629 DbHashEntry *entry;
630
631 entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
632
633 entry->vxid = vxid;
634
635 switch (entry->state)
636 {
637 case ENABLED:
638 break;
639 case ALLOCATED:
640 break;
641 case STARTED:
642 terminate_background_worker(entry->db_scheduler_handle);
643 wait_for_background_worker_shutdown(entry->db_scheduler_handle);
644 scheduler_state_trans_started_to_allocated(entry);
645 break;
646 case DISABLED:
647 scheduler_state_trans_disabled_to_enabled(entry);
648 }
649
650 scheduler_state_trans_automatic(entry);
651 return entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE;
652 }
653
654 /*
655 * Handle 1 message.
656 */
657 static bool
launcher_handle_message(HTAB * db_htab)658 launcher_handle_message(HTAB *db_htab)
659 {
660 BgwMessage *message = ts_bgw_message_receive();
661 PGPROC *sender;
662 VirtualTransactionId vxid;
663 AckResult action_result = ACK_FAILURE;
664
665 if (message == NULL)
666 return false;
667
668 sender = BackendPidGetProc(message->sender_pid);
669 if (sender == NULL)
670 {
671 ereport(LOG,
672 (errmsg("TimescaleDB background worker launcher received message from non-existent "
673 "backend")));
674 return true;
675 }
676
677 GET_VXID_FROM_PGPROC(vxid, *sender);
678
679 switch (message->message_type)
680 {
681 case START:
682 action_result = message_start_action(db_htab, message);
683 break;
684 case STOP:
685 action_result = message_stop_action(db_htab, message);
686 break;
687 case RESTART:
688 action_result = message_restart_action(db_htab, message, vxid);
689 break;
690 }
691
692 ts_bgw_message_send_ack(message, action_result);
693 return true;
694 }
695
696 /*
697 * Wrapper around normal postgres `die()` function to give more context on
698 * sigterms. The default, `bgworker_die()`, can't be used due to the fact
699 * that it handles signals synchronously, rather than waiting for a
700 * CHECK_FOR_INTERRUPTS(). `die()` (which is arguably misnamed) sets flags
701 * that will cause the backend to exit on the next call to
702 * CHECK_FOR_INTERRUPTS(), which can happen either in our code or in
703 * functions within the Postgres codebase that we call. This means that we
704 * don't need to wait for the next time control is returned to our loop to
705 * exit, which would be necessary if we set our own flag and checked it in
706 * a loop condition. However, because it cannot exit 0, the launcher will be
707 * restarted by the postmaster, even when it has received a SIGTERM, which
708 * we decided was the proper behavior. If users want to disable the launcher,
709 * they can set `timescaledb.max_background_workers = 0` and then we will
710 * `proc_exit(0)` before doing anything else.
711 */
launcher_sigterm(SIGNAL_ARGS)712 static void launcher_sigterm(SIGNAL_ARGS)
713 {
714 /* Do not use anything that calls malloc() inside a signal handler since
715 * malloc() is not signal-safe. This includes ereport() */
716 write_stderr("terminating TimescaleDB background worker launcher due to administrator command");
717 die(postgres_signal_arg);
718 }
719
720 extern Datum
ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)721 ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)
722 {
723 HTAB **htab_storage;
724
725 HTAB *db_htab;
726
727 pqsignal(SIGINT, StatementCancelHandler);
728 pqsignal(SIGTERM, launcher_sigterm);
729 pqsignal(SIGHUP, launcher_sighup);
730
731 /* Some SIGHUPS may already have been dropped, so we must load the file here */
732 got_SIGHUP = false;
733 ProcessConfigFile(PGC_SIGHUP);
734 BackgroundWorkerUnblockSignals();
735 ereport(DEBUG1, (errmsg("TimescaleDB background worker launcher started")));
736
737 /* set counter back to zero on restart */
738 ts_bgw_counter_reinit();
739 if (!ts_bgw_total_workers_increment())
740 {
741 /*
742 * Should be the first thing happening so if we already exceeded our
743 * limits it means we have a limit of 0 and we should just exit We
744 * have to exit(0) because if we exit in error we get restarted by the
745 * postmaster.
746 */
747 ereport(LOG,
748 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
749 errmsg("TimescaleDB background worker is set to 0"),
750 errhint("TimescaleDB background worker launcher shutting down.")));
751 proc_exit(0);
752 }
753 /* Connect to the db, no db name yet, so can only access shared catalogs */
754 BackgroundWorkerInitializeConnection(NULL, NULL, 0);
755 pgstat_report_appname(MyBgworkerEntry->bgw_name);
756 ereport(LOG, (errmsg("TimescaleDB background worker launcher connected to shared catalogs")));
757
758 htab_storage = MemoryContextAllocZero(TopMemoryContext, sizeof(*htab_storage));
759
760 /*
761 * We must setup the cleanup function _before_ initializing any state it
762 * touches (specifically the bgw_message_queue and db_htab). Failing to do
763 * this can cause cascading failures when the launcher fails in
764 * init_database_htab (eg. due to running out of shared memory) but
765 * doesn't deregister itself from the shared bgw_message_queue.
766 */
767 before_shmem_exit(launcher_pre_shmem_cleanup, PointerGetDatum(htab_storage));
768
769 ts_bgw_message_queue_set_reader();
770
771 db_htab = init_database_htab();
772 *htab_storage = db_htab;
773
774 populate_database_htab(db_htab);
775
776 while (true)
777 {
778 int wl_rc;
779 bool handled_msgs = false;
780
781 CHECK_FOR_INTERRUPTS();
782 populate_database_htab(db_htab);
783 handled_msgs = launcher_handle_message(db_htab);
784 scheduler_state_trans_automatic_all(db_htab);
785 if (handled_msgs)
786 continue;
787
788 wl_rc = WaitLatch(MyLatch,
789 WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
790 BGW_LAUNCHER_POLL_TIME_MS,
791 PG_WAIT_EXTENSION);
792 ResetLatch(MyLatch);
793 if (wl_rc & WL_POSTMASTER_DEATH)
794 bgw_on_postmaster_death();
795
796 if (got_SIGHUP)
797 {
798 got_SIGHUP = false;
799 ProcessConfigFile(PGC_SIGHUP);
800 }
801 }
802 PG_RETURN_VOID();
803 }
804
805 /* Wrapper around `die()`, see note on `launcher_sigterm()` above for more info*/
entrypoint_sigterm(SIGNAL_ARGS)806 static void entrypoint_sigterm(SIGNAL_ARGS)
807 {
808 /* Do not use anything that calls malloc() inside a signal handler since
809 * malloc() is not signal-safe. This includes ereport() */
810 write_stderr("terminating TimescaleDB scheduler entrypoint due to administrator command");
811 die(postgres_signal_arg);
812 }
813
814 /*
815 * Inside the entrypoint, we must check again if we're in a template db
816 * even though we excluded template dbs in populate_database_htab because
817 * we can be called on, say, CREATE EXTENSION in a template db and then
818 * we'll not stop til next server shutdown so if we hit this point and are
819 * in a template db, we throw an error and shut down Check in the syscache
820 * rather than searching through the entire database catalog again.
821 * Modelled on autovacuum.c -> do_autovacuum.
822 */
823 static void
database_is_template_check(void)824 database_is_template_check(void)
825 {
826 Form_pg_database pgdb;
827 HeapTuple tuple;
828
829 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
830 if (!HeapTupleIsValid(tuple))
831 ereport(ERROR,
832 (errmsg("TimescaleDB background worker failed to find entry for database in "
833 "syscache")));
834
835 pgdb = (Form_pg_database) GETSTRUCT(tuple);
836 if (pgdb->datistemplate)
837 ereport(ERROR,
838 (errmsg("TimescaleDB background worker connected to template database, exiting")));
839
840 ReleaseSysCache(tuple);
841 }
842
843 /*
844 * Before we morph into the scheduler, we also need to reload configs from their
845 * defaults if the database default has changed. Defaults are changed in the
846 * post_restore function where we change the db default for the restoring guc
847 * wait until the txn commits and then must see if the txn made the change.
848 * Checks for changes are normally run at connection startup, but because we
849 * have to connect in order to wait on the txn we have to re-run after the wait.
850 * This function is based on the postgres function in postinit.c by the same
851 * name.
852 */
853
854 static void
process_settings(Oid databaseid)855 process_settings(Oid databaseid)
856 {
857 Relation relsetting;
858 Snapshot snapshot;
859
860 if (!IsUnderPostmaster)
861 return;
862
863 relsetting = table_open(DbRoleSettingRelationId, AccessShareLock);
864
865 /* read all the settings under the same snapshot for efficiency */
866 snapshot = RegisterSnapshot(GetCatalogSnapshot(DbRoleSettingRelationId));
867
868 /* Later settings are ignored if set earlier. */
869 ApplySetting(snapshot, databaseid, InvalidOid, relsetting, PGC_S_DATABASE);
870 ApplySetting(snapshot, InvalidOid, InvalidOid, relsetting, PGC_S_GLOBAL);
871
872 UnregisterSnapshot(snapshot);
873 table_close(relsetting, AccessShareLock);
874 }
875
876 /*
877 * This can be run either from the cluster launcher at db_startup time, or
878 * in the case of an install/uninstall/update of the extension, in the
879 * first case, we have no vxid that we're waiting on. In the second case,
880 * we do, because we have to wait so that we see the effects of said txn.
881 * So we wait for it to finish, then we morph into the new db_scheduler
882 * worker using whatever version is now installed (or exit gracefully if
883 * no version is now installed).
884 */
885 extern Datum
ts_bgw_db_scheduler_entrypoint(PG_FUNCTION_ARGS)886 ts_bgw_db_scheduler_entrypoint(PG_FUNCTION_ARGS)
887 {
888 Oid db_id = DatumGetObjectId(MyBgworkerEntry->bgw_main_arg);
889 bool ts_installed = false;
890 char version[MAX_VERSION_LEN];
891 VirtualTransactionId vxid;
892
893 pqsignal(SIGINT, StatementCancelHandler);
894 pqsignal(SIGTERM, entrypoint_sigterm);
895 BackgroundWorkerUnblockSignals();
896 BackgroundWorkerInitializeConnectionByOid(db_id, InvalidOid, 0);
897 pgstat_report_appname(MyBgworkerEntry->bgw_name);
898
899 /*
900 * Wait until whatever vxid that potentially called us finishes before we
901 * happens in a txn so it's cleaned up correctly if we get a sigkill in
902 * the meantime, but we will need stop after and take a new txn so we can
903 * see the correct state after its effects
904 */
905 StartTransactionCommand();
906 (void) GetTransactionSnapshot();
907 memcpy(&vxid, MyBgworkerEntry->bgw_extra, sizeof(VirtualTransactionId));
908 if (VirtualTransactionIdIsValid(vxid))
909 VirtualXactLock(vxid, true);
910 CommitTransactionCommand();
911
912 /*
913 * now we can start our transaction and get the version currently
914 * installed
915 */
916 StartTransactionCommand();
917 (void) GetTransactionSnapshot();
918
919 /*
920 * Check whether a database is a template database and raise an error if
921 * so, as we don't want to run in template dbs.
922 */
923 database_is_template_check();
924 /* Process any config changes caused by an ALTER DATABASE */
925 process_settings(MyDatabaseId);
926 ts_installed = ts_loader_extension_exists();
927 if (ts_installed)
928 strlcpy(version, ts_loader_extension_version(), MAX_VERSION_LEN);
929
930 ts_loader_extension_check();
931 CommitTransactionCommand();
932 if (ts_installed)
933 {
934 char soname[MAX_SO_NAME_LEN];
935 PGFunction versioned_scheduler_main;
936
937 snprintf(soname, MAX_SO_NAME_LEN, "%s-%s", EXTENSION_SO, version);
938 versioned_scheduler_main =
939 load_external_function(soname, BGW_DB_SCHEDULER_FUNCNAME, false, NULL);
940 if (versioned_scheduler_main == NULL)
941 ereport(LOG,
942 (errmsg("TimescaleDB version %s does not have a background worker, exiting",
943 soname)));
944 else /* essentially we morph into the versioned
945 * worker here */
946 DirectFunctionCall1(versioned_scheduler_main, ObjectIdGetDatum(InvalidOid));
947 }
948 PG_RETURN_VOID();
949 }
950