1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <access/xact.h>
8 #include <utils/fmgrprotos.h>
9 
10 #include <math.h>
11 
12 #include "job_stat.h"
13 #include "scanner.h"
14 #include "timer.h"
15 #include "utils.h"
16 
17 #define MAX_INTERVALS_BACKOFF 5
18 #define MAX_FAILURES_MULTIPLIER 20
19 #define MIN_WAIT_AFTER_CRASH_MS (5 * 60 * 1000)
20 
21 static bool
bgw_job_stat_next_start_was_set(FormData_bgw_job_stat * fd)22 bgw_job_stat_next_start_was_set(FormData_bgw_job_stat *fd)
23 {
24 	return fd->next_start != DT_NOBEGIN;
25 }
26 
27 static ScanTupleResult
bgw_job_stat_tuple_found(TupleInfo * ti,void * const data)28 bgw_job_stat_tuple_found(TupleInfo *ti, void *const data)
29 {
30 	BgwJobStat **job_stat_pp = data;
31 
32 	*job_stat_pp = STRUCT_FROM_SLOT(ti->slot, ti->mctx, BgwJobStat, FormData_bgw_job_stat);
33 
34 	/*
35 	 * Return SCAN_CONTINUE because we check for multiple tuples as an error
36 	 * condition.
37 	 */
38 	return SCAN_CONTINUE;
39 }
40 
41 static bool
bgw_job_stat_scan_one(int indexid,ScanKeyData scankey[],int nkeys,tuple_found_func tuple_found,tuple_filter_func tuple_filter,void * data,LOCKMODE lockmode)42 bgw_job_stat_scan_one(int indexid, ScanKeyData scankey[], int nkeys, tuple_found_func tuple_found,
43 					  tuple_filter_func tuple_filter, void *data, LOCKMODE lockmode)
44 {
45 	Catalog *catalog = ts_catalog_get();
46 	ScannerCtx scanctx = {
47 		.table = catalog_get_table_id(catalog, BGW_JOB_STAT),
48 		.index = catalog_get_index(catalog, BGW_JOB_STAT, indexid),
49 		.nkeys = nkeys,
50 		.scankey = scankey,
51 		.tuple_found = tuple_found,
52 		.filter = tuple_filter,
53 		.data = data,
54 		.lockmode = lockmode,
55 		.scandirection = ForwardScanDirection,
56 	};
57 
58 	return ts_scanner_scan_one(&scanctx, false, "bgw job stat");
59 }
60 
61 static inline bool
bgw_job_stat_scan_job_id(int32 bgw_job_id,tuple_found_func tuple_found,tuple_filter_func tuple_filter,void * data,LOCKMODE lockmode)62 bgw_job_stat_scan_job_id(int32 bgw_job_id, tuple_found_func tuple_found,
63 						 tuple_filter_func tuple_filter, void *data, LOCKMODE lockmode)
64 {
65 	ScanKeyData scankey[1];
66 
67 	ScanKeyInit(&scankey[0],
68 				Anum_bgw_job_stat_pkey_idx_job_id,
69 				BTEqualStrategyNumber,
70 				F_INT4EQ,
71 				Int32GetDatum(bgw_job_id));
72 	return bgw_job_stat_scan_one(BGW_JOB_STAT_PKEY_IDX,
73 								 scankey,
74 								 1,
75 								 tuple_found,
76 								 tuple_filter,
77 								 data,
78 								 lockmode);
79 }
80 
81 TSDLLEXPORT BgwJobStat *
ts_bgw_job_stat_find(int32 bgw_job_id)82 ts_bgw_job_stat_find(int32 bgw_job_id)
83 {
84 	BgwJobStat *job_stat = NULL;
85 
86 	bgw_job_stat_scan_job_id(bgw_job_id,
87 							 bgw_job_stat_tuple_found,
88 							 NULL,
89 							 &job_stat,
90 							 AccessShareLock);
91 
92 	return job_stat;
93 }
94 
95 static ScanTupleResult
bgw_job_stat_tuple_delete(TupleInfo * ti,void * const data)96 bgw_job_stat_tuple_delete(TupleInfo *ti, void *const data)
97 {
98 	ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
99 
100 	return SCAN_CONTINUE;
101 }
102 
103 void
ts_bgw_job_stat_delete(int32 bgw_job_id)104 ts_bgw_job_stat_delete(int32 bgw_job_id)
105 {
106 	bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_delete, NULL, NULL, RowExclusiveLock);
107 }
108 
109 /* Mark the start of a job. This should be done in a separate transaction by the scheduler
110  *  before the bgw for a job is launched. This ensures that the job is counted as started
111  * before /any/ job specific code is executed. A job that has been started but never ended
112  * is assumed to have crashed. We use this conservative design since no process in the database
113  * instance can write once a crash happened in any job. Therefore our only choice is to deduce
114  * a crash from the lack of a write (the marked end write in this case).
115  */
116 static ScanTupleResult
bgw_job_stat_tuple_mark_start(TupleInfo * ti,void * const data)117 bgw_job_stat_tuple_mark_start(TupleInfo *ti, void *const data)
118 {
119 	bool should_free;
120 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
121 	HeapTuple new_tuple = heap_copytuple(tuple);
122 	FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
123 
124 	if (should_free)
125 		heap_freetuple(tuple);
126 
127 	fd->last_start = ts_timer_get_current_timestamp();
128 	fd->last_finish = DT_NOBEGIN;
129 	fd->next_start = DT_NOBEGIN;
130 
131 	fd->total_runs++;
132 
133 	/*
134 	 * This is undone by any of the end marks. This is so that we count
135 	 * crashes conservatively. Pretty much the crash is incremented in the
136 	 * beginning and then decremented during `bgw_job_stat_tuple_mark_end`.
137 	 * Thus, it only remains incremented if the job is never marked as having
138 	 * ended. This happens when: 1) the job crashes 2) another process crashes
139 	 * while the job is running 3) the scheduler gets a SIGTERM while the job
140 	 * is running
141 	 *
142 	 * Unfortunately, 3 cannot be helped because when a scheduler gets a
143 	 * SIGTERM it sends SIGTERMS to it's any running jobs as well. Since you
144 	 * aren't supposed to write to the DB Once you get a sigterm, neither the
145 	 * job nor the scheduler can mark the end of a job.
146 	 */
147 	fd->last_run_success = false;
148 	fd->total_crashes++;
149 	fd->consecutive_crashes++;
150 
151 	ts_catalog_update(ti->scanrel, new_tuple);
152 	heap_freetuple(new_tuple);
153 
154 	return SCAN_DONE;
155 }
156 
157 typedef struct
158 {
159 	JobResult result;
160 	BgwJob *job;
161 } JobResultCtx;
162 
163 static TimestampTz
calculate_next_start_on_success(TimestampTz finish_time,BgwJob * job)164 calculate_next_start_on_success(TimestampTz finish_time, BgwJob *job)
165 {
166 	TimestampTz ts;
167 	TimestampTz last_finish = finish_time;
168 	if (!IS_VALID_TIMESTAMP(finish_time))
169 	{
170 		last_finish = ts_timer_get_current_timestamp();
171 	}
172 	ts = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
173 												 TimestampTzGetDatum(last_finish),
174 												 IntervalPGetDatum(&job->fd.schedule_interval)));
175 	return ts;
176 }
177 
178 static float8
calculate_jitter_percent()179 calculate_jitter_percent()
180 {
181 	/* returns a number in the range [-0.125, 0.125] */
182 	/* right now we use the postgres user-space RNG. if we become worried about
183 	 * correlated schedulers we can switch to
184 	 *     pg_strong_random(&percent, sizeof(percent));
185 	 * though we would need to figure out a way to make our tests pass
186 	 */
187 	uint8 percent = pg_lrand48();
188 	return ldexp((double) (16 - (int) (percent % 32)), -7);
189 }
190 
191 /* For failures we have additive backoff based on consecutive failures
192  * along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF
193  * We also limit the additive backoff in case of consecutive failures as we don't
194  * want to pass in input that leads to out of range timestamps and don't want to
195  * put off the next start time for the job indefinitely
196  */
197 static TimestampTz
calculate_next_start_on_failure(TimestampTz finish_time,int consecutive_failures,BgwJob * job)198 calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failures, BgwJob *job)
199 {
200 	float8 jitter = calculate_jitter_percent();
201 	/* consecutive failures includes this failure */
202 	TimestampTz res = 0;
203 	volatile bool res_set = false;
204 	TimestampTz last_finish = finish_time;
205 	float8 multiplier = (consecutive_failures > MAX_FAILURES_MULTIPLIER ? MAX_FAILURES_MULTIPLIER :
206 																		  consecutive_failures);
207 	MemoryContext oldctx;
208 	if (!IS_VALID_TIMESTAMP(finish_time))
209 	{
210 		elog(LOG, "%s: invalid finish time", __func__);
211 		last_finish = ts_timer_get_current_timestamp();
212 	}
213 	oldctx = CurrentMemoryContext;
214 	BeginInternalSubTransaction("next start on failure");
215 	PG_TRY();
216 	{
217 		/* ival = retry_period * (consecutive_failures)  */
218 		Datum ival = DirectFunctionCall2(interval_mul,
219 										 IntervalPGetDatum(&job->fd.retry_period),
220 										 Float8GetDatum(multiplier));
221 
222 		/* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
223 		Datum ival_max = DirectFunctionCall2(interval_mul,
224 											 IntervalPGetDatum(&job->fd.schedule_interval),
225 											 Float8GetDatum(MAX_INTERVALS_BACKOFF));
226 
227 		if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0)
228 			ival = ival_max;
229 
230 		/* Add some random jitter to prevent stampeding-herds, interval will be within about +-13%
231 		 */
232 		ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter));
233 
234 		res = DatumGetTimestampTz(
235 			DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival));
236 		res_set = true;
237 		ReleaseCurrentSubTransaction();
238 	}
239 	PG_CATCH();
240 	{
241 		MemoryContextSwitchTo(oldctx);
242 		ErrorData *errdata = CopyErrorData();
243 		ereport(LOG,
244 				(errcode(ERRCODE_INTERNAL_ERROR),
245 				 errmsg("could not calculate next start on failure: resetting value"),
246 				 errdetail("Error: %s.", errdata->message)));
247 		FlushErrorState();
248 		RollbackAndReleaseCurrentSubTransaction();
249 	}
250 	PG_END_TRY();
251 	Assert(CurrentMemoryContext == oldctx);
252 	if (!res_set)
253 	{
254 		TimestampTz nowt;
255 		/* job->fd_retry_period is a valid non-null value */
256 		nowt = ts_timer_get_current_timestamp();
257 		res = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
258 													  TimestampTzGetDatum(nowt),
259 													  IntervalPGetDatum(&job->fd.retry_period)));
260 	}
261 	return res;
262 }
263 
264 /* For crashes, the logic is the similar as for failures except we also have
265  *  a minimum wait after a crash that we wait, so that if an operator needs to disable the job,
266  *  there will be enough time before another crash.
267  */
268 static TimestampTz
calculate_next_start_on_crash(int consecutive_crashes,BgwJob * job)269 calculate_next_start_on_crash(int consecutive_crashes, BgwJob *job)
270 {
271 	TimestampTz now = ts_timer_get_current_timestamp();
272 	TimestampTz failure_calc = calculate_next_start_on_failure(now, consecutive_crashes, job);
273 	TimestampTz min_time = TimestampTzPlusMilliseconds(now, MIN_WAIT_AFTER_CRASH_MS);
274 
275 	if (min_time > failure_calc)
276 		return min_time;
277 	return failure_calc;
278 }
279 
280 static ScanTupleResult
bgw_job_stat_tuple_mark_end(TupleInfo * ti,void * const data)281 bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
282 {
283 	JobResultCtx *result_ctx = data;
284 	bool should_free;
285 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
286 	HeapTuple new_tuple = heap_copytuple(tuple);
287 	FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
288 	Interval *duration;
289 
290 	if (should_free)
291 		heap_freetuple(tuple);
292 
293 	fd->last_finish = ts_timer_get_current_timestamp();
294 
295 	duration = DatumGetIntervalP(DirectFunctionCall2(timestamp_mi,
296 													 TimestampTzGetDatum(fd->last_finish),
297 													 TimestampTzGetDatum(fd->last_start)));
298 	fd->total_duration =
299 		*DatumGetIntervalP(DirectFunctionCall2(interval_pl,
300 											   IntervalPGetDatum(&fd->total_duration),
301 											   IntervalPGetDatum(duration)));
302 
303 	/* undo marking created by start marks */
304 	fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false;
305 	fd->total_crashes--;
306 	fd->consecutive_crashes = 0;
307 
308 	if (result_ctx->result == JOB_SUCCESS)
309 	{
310 		fd->total_success++;
311 		fd->consecutive_failures = 0;
312 		fd->last_successful_finish = fd->last_finish;
313 		/* Mark the next start at the end if the job itself hasn't */
314 		if (!bgw_job_stat_next_start_was_set(fd))
315 			fd->next_start = calculate_next_start_on_success(fd->last_finish, result_ctx->job);
316 	}
317 	else
318 	{
319 		fd->total_failures++;
320 		fd->consecutive_failures++;
321 
322 		/*
323 		 * Mark the next start at the end if the job itself hasn't (this may
324 		 * have happened before failure) and the failure was not in starting.
325 		 * If the failure was in starting, then next_start should have been
326 		 * restored in `on_failure_to_start_job` and thus we don't change it here.
327 		 * Even if it wasn't restored, then keep it as DT_NOBEGIN to mark it as highest priority.
328 		 */
329 		if (!bgw_job_stat_next_start_was_set(fd) && result_ctx->result != JOB_FAILURE_TO_START)
330 			fd->next_start = calculate_next_start_on_failure(fd->last_finish,
331 															 fd->consecutive_failures,
332 															 result_ctx->job);
333 	}
334 
335 	ts_catalog_update(ti->scanrel, new_tuple);
336 	heap_freetuple(new_tuple);
337 
338 	return SCAN_DONE;
339 }
340 
341 static ScanTupleResult
bgw_job_stat_tuple_set_next_start(TupleInfo * ti,void * const data)342 bgw_job_stat_tuple_set_next_start(TupleInfo *ti, void *const data)
343 {
344 	TimestampTz *next_start = data;
345 	bool should_free;
346 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
347 	HeapTuple new_tuple = heap_copytuple(tuple);
348 	FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
349 
350 	if (should_free)
351 		heap_freetuple(tuple);
352 
353 	fd->next_start = *next_start;
354 	ts_catalog_update(ti->scanrel, new_tuple);
355 	heap_freetuple(new_tuple);
356 
357 	return SCAN_DONE;
358 }
359 
360 static bool
bgw_job_stat_insert_relation(Relation rel,int32 bgw_job_id,bool mark_start,TimestampTz next_start)361 bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
362 							 TimestampTz next_start)
363 {
364 	TupleDesc desc = RelationGetDescr(rel);
365 	Datum values[Natts_bgw_job_stat];
366 	bool nulls[Natts_bgw_job_stat] = { false };
367 	CatalogSecurityContext sec_ctx;
368 	Interval zero_ival = {
369 		.time = 0,
370 	};
371 
372 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_job_id)] = Int32GetDatum(bgw_job_id);
373 	if (mark_start)
374 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_start)] =
375 			TimestampGetDatum(ts_timer_get_current_timestamp());
376 	else
377 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_start)] =
378 			TimestampGetDatum(DT_NOBEGIN);
379 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_finish)] = TimestampGetDatum(DT_NOBEGIN);
380 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_next_start)] = TimestampGetDatum(next_start);
381 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_successful_finish)] =
382 		TimestampGetDatum(DT_NOBEGIN);
383 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_runs)] =
384 		Int64GetDatum((mark_start ? 1 : 0));
385 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration)] =
386 		IntervalPGetDatum(&zero_ival);
387 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0);
388 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0);
389 	values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0);
390 
391 	if (mark_start)
392 	{
393 		/* This is udone by any of the end marks */
394 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_run_success)] = BoolGetDatum(false);
395 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_crashes)] = Int64GetDatum(1);
396 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_crashes)] = Int32GetDatum(1);
397 	}
398 	else
399 	{
400 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_run_success)] = BoolGetDatum(true);
401 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_crashes)] = Int64GetDatum(0);
402 		values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_crashes)] = Int32GetDatum(0);
403 	}
404 
405 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
406 	ts_catalog_insert_values(rel, desc, values, nulls);
407 	ts_catalog_restore_user(&sec_ctx);
408 
409 	return true;
410 }
411 
412 void
ts_bgw_job_stat_mark_start(int32 bgw_job_id)413 ts_bgw_job_stat_mark_start(int32 bgw_job_id)
414 {
415 	/* Use double-check locking */
416 	if (!bgw_job_stat_scan_job_id(bgw_job_id,
417 								  bgw_job_stat_tuple_mark_start,
418 								  NULL,
419 								  NULL,
420 								  RowExclusiveLock))
421 	{
422 		Relation rel =
423 			table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
424 		/* Recheck while having a self-exclusive lock */
425 		if (!bgw_job_stat_scan_job_id(bgw_job_id,
426 									  bgw_job_stat_tuple_mark_start,
427 									  NULL,
428 									  NULL,
429 									  RowExclusiveLock))
430 			bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN);
431 		table_close(rel, ShareRowExclusiveLock);
432 		pgstat_report_activity(STATE_IDLE, NULL);
433 	}
434 }
435 
436 void
ts_bgw_job_stat_mark_end(BgwJob * job,JobResult result)437 ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result)
438 {
439 	JobResultCtx res = {
440 		.job = job,
441 		.result = result,
442 	};
443 
444 	if (!bgw_job_stat_scan_job_id(job->fd.id,
445 								  bgw_job_stat_tuple_mark_end,
446 								  NULL,
447 								  &res,
448 								  RowExclusiveLock))
449 		elog(ERROR, "unable to find job statistics for job %d", job->fd.id);
450 	pgstat_report_activity(STATE_IDLE, NULL);
451 }
452 
453 bool
ts_bgw_job_stat_end_was_marked(BgwJobStat * jobstat)454 ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat)
455 {
456 	return !TIMESTAMP_IS_NOBEGIN(jobstat->fd.last_finish);
457 }
458 
459 TSDLLEXPORT void
ts_bgw_job_stat_set_next_start(int32 job_id,TimestampTz next_start)460 ts_bgw_job_stat_set_next_start(int32 job_id, TimestampTz next_start)
461 {
462 	/* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
463 	if (next_start == DT_NOBEGIN)
464 		elog(ERROR, "cannot set next start to -infinity");
465 
466 	if (!bgw_job_stat_scan_job_id(job_id,
467 								  bgw_job_stat_tuple_set_next_start,
468 								  NULL,
469 								  &next_start,
470 								  RowExclusiveLock))
471 		elog(ERROR, "unable to find job statistics for job %d", job_id);
472 }
473 
474 /* update next_start if job stat exists */
475 TSDLLEXPORT bool
ts_bgw_job_stat_update_next_start(int32 job_id,TimestampTz next_start,bool allow_unset)476 ts_bgw_job_stat_update_next_start(int32 job_id, TimestampTz next_start, bool allow_unset)
477 {
478 	bool found = false;
479 	/* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
480 	if (!allow_unset && next_start == DT_NOBEGIN)
481 		elog(ERROR, "cannot set next start to -infinity");
482 
483 	found = bgw_job_stat_scan_job_id(job_id,
484 									 bgw_job_stat_tuple_set_next_start,
485 									 NULL,
486 									 &next_start,
487 									 RowExclusiveLock);
488 	return found;
489 }
490 
491 TSDLLEXPORT void
ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id,TimestampTz next_start)492 ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, TimestampTz next_start)
493 {
494 	/* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
495 	if (next_start == DT_NOBEGIN)
496 		elog(ERROR, "cannot set next start to -infinity");
497 
498 	/* Use double-check locking */
499 	if (!bgw_job_stat_scan_job_id(bgw_job_id,
500 								  bgw_job_stat_tuple_set_next_start,
501 								  NULL,
502 								  &next_start,
503 								  RowExclusiveLock))
504 	{
505 		Relation rel =
506 			table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
507 		/* Recheck while having a self-exclusive lock */
508 		if (!bgw_job_stat_scan_job_id(bgw_job_id,
509 									  bgw_job_stat_tuple_set_next_start,
510 									  NULL,
511 									  &next_start,
512 									  RowExclusiveLock))
513 			bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start);
514 		table_close(rel, ShareRowExclusiveLock);
515 	}
516 }
517 
518 bool
ts_bgw_job_stat_should_execute(BgwJobStat * jobstat,BgwJob * job)519 ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job)
520 {
521 	/*
522 	 * Stub to allow the system to disable jobs based on the number of crashes
523 	 * or failures.
524 	 */
525 	return true;
526 }
527 
528 TimestampTz
ts_bgw_job_stat_next_start(BgwJobStat * jobstat,BgwJob * job)529 ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job)
530 {
531 	if (jobstat == NULL)
532 		/* Never previously run - run right away */
533 		return DT_NOBEGIN;
534 
535 	if (jobstat->fd.consecutive_crashes > 0)
536 		return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job);
537 
538 	return jobstat->fd.next_start;
539 }
540