1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <access/xact.h>
8 #include <utils/fmgrprotos.h>
9
10 #include <math.h>
11
12 #include "job_stat.h"
13 #include "scanner.h"
14 #include "timer.h"
15 #include "utils.h"
16
17 #define MAX_INTERVALS_BACKOFF 5
18 #define MAX_FAILURES_MULTIPLIER 20
19 #define MIN_WAIT_AFTER_CRASH_MS (5 * 60 * 1000)
20
21 static bool
bgw_job_stat_next_start_was_set(FormData_bgw_job_stat * fd)22 bgw_job_stat_next_start_was_set(FormData_bgw_job_stat *fd)
23 {
24 return fd->next_start != DT_NOBEGIN;
25 }
26
27 static ScanTupleResult
bgw_job_stat_tuple_found(TupleInfo * ti,void * const data)28 bgw_job_stat_tuple_found(TupleInfo *ti, void *const data)
29 {
30 BgwJobStat **job_stat_pp = data;
31
32 *job_stat_pp = STRUCT_FROM_SLOT(ti->slot, ti->mctx, BgwJobStat, FormData_bgw_job_stat);
33
34 /*
35 * Return SCAN_CONTINUE because we check for multiple tuples as an error
36 * condition.
37 */
38 return SCAN_CONTINUE;
39 }
40
41 static bool
bgw_job_stat_scan_one(int indexid,ScanKeyData scankey[],int nkeys,tuple_found_func tuple_found,tuple_filter_func tuple_filter,void * data,LOCKMODE lockmode)42 bgw_job_stat_scan_one(int indexid, ScanKeyData scankey[], int nkeys, tuple_found_func tuple_found,
43 tuple_filter_func tuple_filter, void *data, LOCKMODE lockmode)
44 {
45 Catalog *catalog = ts_catalog_get();
46 ScannerCtx scanctx = {
47 .table = catalog_get_table_id(catalog, BGW_JOB_STAT),
48 .index = catalog_get_index(catalog, BGW_JOB_STAT, indexid),
49 .nkeys = nkeys,
50 .scankey = scankey,
51 .tuple_found = tuple_found,
52 .filter = tuple_filter,
53 .data = data,
54 .lockmode = lockmode,
55 .scandirection = ForwardScanDirection,
56 };
57
58 return ts_scanner_scan_one(&scanctx, false, "bgw job stat");
59 }
60
61 static inline bool
bgw_job_stat_scan_job_id(int32 bgw_job_id,tuple_found_func tuple_found,tuple_filter_func tuple_filter,void * data,LOCKMODE lockmode)62 bgw_job_stat_scan_job_id(int32 bgw_job_id, tuple_found_func tuple_found,
63 tuple_filter_func tuple_filter, void *data, LOCKMODE lockmode)
64 {
65 ScanKeyData scankey[1];
66
67 ScanKeyInit(&scankey[0],
68 Anum_bgw_job_stat_pkey_idx_job_id,
69 BTEqualStrategyNumber,
70 F_INT4EQ,
71 Int32GetDatum(bgw_job_id));
72 return bgw_job_stat_scan_one(BGW_JOB_STAT_PKEY_IDX,
73 scankey,
74 1,
75 tuple_found,
76 tuple_filter,
77 data,
78 lockmode);
79 }
80
81 TSDLLEXPORT BgwJobStat *
ts_bgw_job_stat_find(int32 bgw_job_id)82 ts_bgw_job_stat_find(int32 bgw_job_id)
83 {
84 BgwJobStat *job_stat = NULL;
85
86 bgw_job_stat_scan_job_id(bgw_job_id,
87 bgw_job_stat_tuple_found,
88 NULL,
89 &job_stat,
90 AccessShareLock);
91
92 return job_stat;
93 }
94
95 static ScanTupleResult
bgw_job_stat_tuple_delete(TupleInfo * ti,void * const data)96 bgw_job_stat_tuple_delete(TupleInfo *ti, void *const data)
97 {
98 ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
99
100 return SCAN_CONTINUE;
101 }
102
103 void
ts_bgw_job_stat_delete(int32 bgw_job_id)104 ts_bgw_job_stat_delete(int32 bgw_job_id)
105 {
106 bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_delete, NULL, NULL, RowExclusiveLock);
107 }
108
109 /* Mark the start of a job. This should be done in a separate transaction by the scheduler
110 * before the bgw for a job is launched. This ensures that the job is counted as started
111 * before /any/ job specific code is executed. A job that has been started but never ended
112 * is assumed to have crashed. We use this conservative design since no process in the database
113 * instance can write once a crash happened in any job. Therefore our only choice is to deduce
114 * a crash from the lack of a write (the marked end write in this case).
115 */
116 static ScanTupleResult
bgw_job_stat_tuple_mark_start(TupleInfo * ti,void * const data)117 bgw_job_stat_tuple_mark_start(TupleInfo *ti, void *const data)
118 {
119 bool should_free;
120 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
121 HeapTuple new_tuple = heap_copytuple(tuple);
122 FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
123
124 if (should_free)
125 heap_freetuple(tuple);
126
127 fd->last_start = ts_timer_get_current_timestamp();
128 fd->last_finish = DT_NOBEGIN;
129 fd->next_start = DT_NOBEGIN;
130
131 fd->total_runs++;
132
133 /*
134 * This is undone by any of the end marks. This is so that we count
135 * crashes conservatively. Pretty much the crash is incremented in the
136 * beginning and then decremented during `bgw_job_stat_tuple_mark_end`.
137 * Thus, it only remains incremented if the job is never marked as having
138 * ended. This happens when: 1) the job crashes 2) another process crashes
139 * while the job is running 3) the scheduler gets a SIGTERM while the job
140 * is running
141 *
142 * Unfortunately, 3 cannot be helped because when a scheduler gets a
143 * SIGTERM it sends SIGTERMS to it's any running jobs as well. Since you
144 * aren't supposed to write to the DB Once you get a sigterm, neither the
145 * job nor the scheduler can mark the end of a job.
146 */
147 fd->last_run_success = false;
148 fd->total_crashes++;
149 fd->consecutive_crashes++;
150
151 ts_catalog_update(ti->scanrel, new_tuple);
152 heap_freetuple(new_tuple);
153
154 return SCAN_DONE;
155 }
156
157 typedef struct
158 {
159 JobResult result;
160 BgwJob *job;
161 } JobResultCtx;
162
163 static TimestampTz
calculate_next_start_on_success(TimestampTz finish_time,BgwJob * job)164 calculate_next_start_on_success(TimestampTz finish_time, BgwJob *job)
165 {
166 TimestampTz ts;
167 TimestampTz last_finish = finish_time;
168 if (!IS_VALID_TIMESTAMP(finish_time))
169 {
170 last_finish = ts_timer_get_current_timestamp();
171 }
172 ts = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
173 TimestampTzGetDatum(last_finish),
174 IntervalPGetDatum(&job->fd.schedule_interval)));
175 return ts;
176 }
177
178 static float8
calculate_jitter_percent()179 calculate_jitter_percent()
180 {
181 /* returns a number in the range [-0.125, 0.125] */
182 /* right now we use the postgres user-space RNG. if we become worried about
183 * correlated schedulers we can switch to
184 * pg_strong_random(&percent, sizeof(percent));
185 * though we would need to figure out a way to make our tests pass
186 */
187 uint8 percent = pg_lrand48();
188 return ldexp((double) (16 - (int) (percent % 32)), -7);
189 }
190
191 /* For failures we have additive backoff based on consecutive failures
192 * along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF
193 * We also limit the additive backoff in case of consecutive failures as we don't
194 * want to pass in input that leads to out of range timestamps and don't want to
195 * put off the next start time for the job indefinitely
196 */
197 static TimestampTz
calculate_next_start_on_failure(TimestampTz finish_time,int consecutive_failures,BgwJob * job)198 calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failures, BgwJob *job)
199 {
200 float8 jitter = calculate_jitter_percent();
201 /* consecutive failures includes this failure */
202 TimestampTz res = 0;
203 volatile bool res_set = false;
204 TimestampTz last_finish = finish_time;
205 float8 multiplier = (consecutive_failures > MAX_FAILURES_MULTIPLIER ? MAX_FAILURES_MULTIPLIER :
206 consecutive_failures);
207 MemoryContext oldctx;
208 if (!IS_VALID_TIMESTAMP(finish_time))
209 {
210 elog(LOG, "%s: invalid finish time", __func__);
211 last_finish = ts_timer_get_current_timestamp();
212 }
213 oldctx = CurrentMemoryContext;
214 BeginInternalSubTransaction("next start on failure");
215 PG_TRY();
216 {
217 /* ival = retry_period * (consecutive_failures) */
218 Datum ival = DirectFunctionCall2(interval_mul,
219 IntervalPGetDatum(&job->fd.retry_period),
220 Float8GetDatum(multiplier));
221
222 /* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
223 Datum ival_max = DirectFunctionCall2(interval_mul,
224 IntervalPGetDatum(&job->fd.schedule_interval),
225 Float8GetDatum(MAX_INTERVALS_BACKOFF));
226
227 if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0)
228 ival = ival_max;
229
230 /* Add some random jitter to prevent stampeding-herds, interval will be within about +-13%
231 */
232 ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter));
233
234 res = DatumGetTimestampTz(
235 DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival));
236 res_set = true;
237 ReleaseCurrentSubTransaction();
238 }
239 PG_CATCH();
240 {
241 MemoryContextSwitchTo(oldctx);
242 ErrorData *errdata = CopyErrorData();
243 ereport(LOG,
244 (errcode(ERRCODE_INTERNAL_ERROR),
245 errmsg("could not calculate next start on failure: resetting value"),
246 errdetail("Error: %s.", errdata->message)));
247 FlushErrorState();
248 RollbackAndReleaseCurrentSubTransaction();
249 }
250 PG_END_TRY();
251 Assert(CurrentMemoryContext == oldctx);
252 if (!res_set)
253 {
254 TimestampTz nowt;
255 /* job->fd_retry_period is a valid non-null value */
256 nowt = ts_timer_get_current_timestamp();
257 res = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
258 TimestampTzGetDatum(nowt),
259 IntervalPGetDatum(&job->fd.retry_period)));
260 }
261 return res;
262 }
263
264 /* For crashes, the logic is the similar as for failures except we also have
265 * a minimum wait after a crash that we wait, so that if an operator needs to disable the job,
266 * there will be enough time before another crash.
267 */
268 static TimestampTz
calculate_next_start_on_crash(int consecutive_crashes,BgwJob * job)269 calculate_next_start_on_crash(int consecutive_crashes, BgwJob *job)
270 {
271 TimestampTz now = ts_timer_get_current_timestamp();
272 TimestampTz failure_calc = calculate_next_start_on_failure(now, consecutive_crashes, job);
273 TimestampTz min_time = TimestampTzPlusMilliseconds(now, MIN_WAIT_AFTER_CRASH_MS);
274
275 if (min_time > failure_calc)
276 return min_time;
277 return failure_calc;
278 }
279
280 static ScanTupleResult
bgw_job_stat_tuple_mark_end(TupleInfo * ti,void * const data)281 bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
282 {
283 JobResultCtx *result_ctx = data;
284 bool should_free;
285 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
286 HeapTuple new_tuple = heap_copytuple(tuple);
287 FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
288 Interval *duration;
289
290 if (should_free)
291 heap_freetuple(tuple);
292
293 fd->last_finish = ts_timer_get_current_timestamp();
294
295 duration = DatumGetIntervalP(DirectFunctionCall2(timestamp_mi,
296 TimestampTzGetDatum(fd->last_finish),
297 TimestampTzGetDatum(fd->last_start)));
298 fd->total_duration =
299 *DatumGetIntervalP(DirectFunctionCall2(interval_pl,
300 IntervalPGetDatum(&fd->total_duration),
301 IntervalPGetDatum(duration)));
302
303 /* undo marking created by start marks */
304 fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false;
305 fd->total_crashes--;
306 fd->consecutive_crashes = 0;
307
308 if (result_ctx->result == JOB_SUCCESS)
309 {
310 fd->total_success++;
311 fd->consecutive_failures = 0;
312 fd->last_successful_finish = fd->last_finish;
313 /* Mark the next start at the end if the job itself hasn't */
314 if (!bgw_job_stat_next_start_was_set(fd))
315 fd->next_start = calculate_next_start_on_success(fd->last_finish, result_ctx->job);
316 }
317 else
318 {
319 fd->total_failures++;
320 fd->consecutive_failures++;
321
322 /*
323 * Mark the next start at the end if the job itself hasn't (this may
324 * have happened before failure) and the failure was not in starting.
325 * If the failure was in starting, then next_start should have been
326 * restored in `on_failure_to_start_job` and thus we don't change it here.
327 * Even if it wasn't restored, then keep it as DT_NOBEGIN to mark it as highest priority.
328 */
329 if (!bgw_job_stat_next_start_was_set(fd) && result_ctx->result != JOB_FAILURE_TO_START)
330 fd->next_start = calculate_next_start_on_failure(fd->last_finish,
331 fd->consecutive_failures,
332 result_ctx->job);
333 }
334
335 ts_catalog_update(ti->scanrel, new_tuple);
336 heap_freetuple(new_tuple);
337
338 return SCAN_DONE;
339 }
340
341 static ScanTupleResult
bgw_job_stat_tuple_set_next_start(TupleInfo * ti,void * const data)342 bgw_job_stat_tuple_set_next_start(TupleInfo *ti, void *const data)
343 {
344 TimestampTz *next_start = data;
345 bool should_free;
346 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
347 HeapTuple new_tuple = heap_copytuple(tuple);
348 FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
349
350 if (should_free)
351 heap_freetuple(tuple);
352
353 fd->next_start = *next_start;
354 ts_catalog_update(ti->scanrel, new_tuple);
355 heap_freetuple(new_tuple);
356
357 return SCAN_DONE;
358 }
359
360 static bool
bgw_job_stat_insert_relation(Relation rel,int32 bgw_job_id,bool mark_start,TimestampTz next_start)361 bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
362 TimestampTz next_start)
363 {
364 TupleDesc desc = RelationGetDescr(rel);
365 Datum values[Natts_bgw_job_stat];
366 bool nulls[Natts_bgw_job_stat] = { false };
367 CatalogSecurityContext sec_ctx;
368 Interval zero_ival = {
369 .time = 0,
370 };
371
372 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_job_id)] = Int32GetDatum(bgw_job_id);
373 if (mark_start)
374 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_start)] =
375 TimestampGetDatum(ts_timer_get_current_timestamp());
376 else
377 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_start)] =
378 TimestampGetDatum(DT_NOBEGIN);
379 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_finish)] = TimestampGetDatum(DT_NOBEGIN);
380 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_next_start)] = TimestampGetDatum(next_start);
381 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_successful_finish)] =
382 TimestampGetDatum(DT_NOBEGIN);
383 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_runs)] =
384 Int64GetDatum((mark_start ? 1 : 0));
385 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_duration)] =
386 IntervalPGetDatum(&zero_ival);
387 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0);
388 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0);
389 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0);
390
391 if (mark_start)
392 {
393 /* This is udone by any of the end marks */
394 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_run_success)] = BoolGetDatum(false);
395 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_crashes)] = Int64GetDatum(1);
396 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_crashes)] = Int32GetDatum(1);
397 }
398 else
399 {
400 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_last_run_success)] = BoolGetDatum(true);
401 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_crashes)] = Int64GetDatum(0);
402 values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_crashes)] = Int32GetDatum(0);
403 }
404
405 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
406 ts_catalog_insert_values(rel, desc, values, nulls);
407 ts_catalog_restore_user(&sec_ctx);
408
409 return true;
410 }
411
412 void
ts_bgw_job_stat_mark_start(int32 bgw_job_id)413 ts_bgw_job_stat_mark_start(int32 bgw_job_id)
414 {
415 /* Use double-check locking */
416 if (!bgw_job_stat_scan_job_id(bgw_job_id,
417 bgw_job_stat_tuple_mark_start,
418 NULL,
419 NULL,
420 RowExclusiveLock))
421 {
422 Relation rel =
423 table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
424 /* Recheck while having a self-exclusive lock */
425 if (!bgw_job_stat_scan_job_id(bgw_job_id,
426 bgw_job_stat_tuple_mark_start,
427 NULL,
428 NULL,
429 RowExclusiveLock))
430 bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN);
431 table_close(rel, ShareRowExclusiveLock);
432 pgstat_report_activity(STATE_IDLE, NULL);
433 }
434 }
435
436 void
ts_bgw_job_stat_mark_end(BgwJob * job,JobResult result)437 ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result)
438 {
439 JobResultCtx res = {
440 .job = job,
441 .result = result,
442 };
443
444 if (!bgw_job_stat_scan_job_id(job->fd.id,
445 bgw_job_stat_tuple_mark_end,
446 NULL,
447 &res,
448 RowExclusiveLock))
449 elog(ERROR, "unable to find job statistics for job %d", job->fd.id);
450 pgstat_report_activity(STATE_IDLE, NULL);
451 }
452
453 bool
ts_bgw_job_stat_end_was_marked(BgwJobStat * jobstat)454 ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat)
455 {
456 return !TIMESTAMP_IS_NOBEGIN(jobstat->fd.last_finish);
457 }
458
459 TSDLLEXPORT void
ts_bgw_job_stat_set_next_start(int32 job_id,TimestampTz next_start)460 ts_bgw_job_stat_set_next_start(int32 job_id, TimestampTz next_start)
461 {
462 /* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
463 if (next_start == DT_NOBEGIN)
464 elog(ERROR, "cannot set next start to -infinity");
465
466 if (!bgw_job_stat_scan_job_id(job_id,
467 bgw_job_stat_tuple_set_next_start,
468 NULL,
469 &next_start,
470 RowExclusiveLock))
471 elog(ERROR, "unable to find job statistics for job %d", job_id);
472 }
473
474 /* update next_start if job stat exists */
475 TSDLLEXPORT bool
ts_bgw_job_stat_update_next_start(int32 job_id,TimestampTz next_start,bool allow_unset)476 ts_bgw_job_stat_update_next_start(int32 job_id, TimestampTz next_start, bool allow_unset)
477 {
478 bool found = false;
479 /* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
480 if (!allow_unset && next_start == DT_NOBEGIN)
481 elog(ERROR, "cannot set next start to -infinity");
482
483 found = bgw_job_stat_scan_job_id(job_id,
484 bgw_job_stat_tuple_set_next_start,
485 NULL,
486 &next_start,
487 RowExclusiveLock);
488 return found;
489 }
490
491 TSDLLEXPORT void
ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id,TimestampTz next_start)492 ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, TimestampTz next_start)
493 {
494 /* Cannot use DT_NOBEGIN as that's the value used to indicate "not set" */
495 if (next_start == DT_NOBEGIN)
496 elog(ERROR, "cannot set next start to -infinity");
497
498 /* Use double-check locking */
499 if (!bgw_job_stat_scan_job_id(bgw_job_id,
500 bgw_job_stat_tuple_set_next_start,
501 NULL,
502 &next_start,
503 RowExclusiveLock))
504 {
505 Relation rel =
506 table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
507 /* Recheck while having a self-exclusive lock */
508 if (!bgw_job_stat_scan_job_id(bgw_job_id,
509 bgw_job_stat_tuple_set_next_start,
510 NULL,
511 &next_start,
512 RowExclusiveLock))
513 bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start);
514 table_close(rel, ShareRowExclusiveLock);
515 }
516 }
517
518 bool
ts_bgw_job_stat_should_execute(BgwJobStat * jobstat,BgwJob * job)519 ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job)
520 {
521 /*
522 * Stub to allow the system to disable jobs based on the number of crashes
523 * or failures.
524 */
525 return true;
526 }
527
528 TimestampTz
ts_bgw_job_stat_next_start(BgwJobStat * jobstat,BgwJob * job)529 ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job)
530 {
531 if (jobstat == NULL)
532 /* Never previously run - run right away */
533 return DT_NOBEGIN;
534
535 if (jobstat->fd.consecutive_crashes > 0)
536 return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job);
537
538 return jobstat->fd.next_start;
539 }
540