1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6
7 #include <postgres.h>
8 #include <funcapi.h>
9 #include <miscadmin.h>
10 #include <utils/acl.h>
11 #include <utils/builtins.h>
12
13 #include <bgw/job.h>
14 #include <bgw/job_stat.h>
15
16 #include "job.h"
17 #include "job_api.h"
18
19 /* Default max runtime for a custom job is unlimited for now */
20 #define DEFAULT_MAX_RUNTIME 0
21
22 /* Right now, there is an infinite number of retries for custom jobs */
23 #define DEFAULT_MAX_RETRIES -1
24 /* Default retry period for reorder_jobs is currently 5 minutes */
25 #define DEFAULT_RETRY_PERIOD 5 * USECS_PER_MINUTE
26
27 #define ALTER_JOB_NUM_COLS 8
28
29 /*
30 * Check configuration for a job type.
31 */
32 static void
job_config_check(Name proc_schema,Name proc_name,Jsonb * config)33 job_config_check(Name proc_schema, Name proc_name, Jsonb *config)
34 {
35 if (namestrcmp(proc_schema, INTERNAL_SCHEMA_NAME) == 0)
36 {
37 if (namestrcmp(proc_name, "policy_retention") == 0)
38 policy_retention_read_and_validate_config(config, NULL);
39 else if (namestrcmp(proc_name, "policy_reorder") == 0)
40 policy_reorder_read_and_validate_config(config, NULL);
41 else if (namestrcmp(proc_name, "policy_compression") == 0)
42 {
43 PolicyCompressionData policy_data;
44 policy_compression_read_and_validate_config(config, &policy_data);
45 ts_cache_release(policy_data.hcache);
46 }
47 else if (namestrcmp(proc_name, "policy_refresh_continuous_aggregate") == 0)
48 policy_refresh_cagg_read_and_validate_config(config, NULL);
49 }
50 }
51
52 /*
53 * CREATE FUNCTION add_job(
54 * 0 proc REGPROC,
55 * 1 schedule_interval INTERVAL,
56 * 2 config JSONB DEFAULT NULL,
57 * 3 initial_start TIMESTAMPTZ DEFAULT NULL,
58 * 4 scheduled BOOL DEFAULT true
59 * ) RETURNS INTEGER
60 */
61 Datum
job_add(PG_FUNCTION_ARGS)62 job_add(PG_FUNCTION_ARGS)
63 {
64 NameData application_name;
65 NameData proc_name;
66 NameData proc_schema;
67 NameData owner_name;
68 Interval max_runtime = { .time = DEFAULT_MAX_RUNTIME };
69 Interval retry_period = { .time = DEFAULT_RETRY_PERIOD };
70 int32 job_id;
71 char *func_name = NULL;
72
73 Oid owner = GetUserId();
74 Oid proc = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
75 Interval *schedule_interval = PG_ARGISNULL(1) ? NULL : PG_GETARG_INTERVAL_P(1);
76 Jsonb *config = PG_ARGISNULL(2) ? NULL : PG_GETARG_JSONB_P(2);
77 bool scheduled = PG_ARGISNULL(4) ? true : PG_GETARG_BOOL(4);
78
79 TS_PREVENT_FUNC_IF_READ_ONLY();
80
81 if (PG_ARGISNULL(0))
82 ereport(ERROR,
83 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
84 errmsg("function or procedure cannot be NULL")));
85
86 if (NULL == schedule_interval)
87 ereport(ERROR,
88 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
89 errmsg("schedule interval cannot be NULL")));
90
91 func_name = get_func_name(proc);
92 if (func_name == NULL)
93 ereport(ERROR,
94 (errcode(ERRCODE_UNDEFINED_OBJECT),
95 errmsg("function or procedure with OID %u does not exist", proc)));
96
97 if (pg_proc_aclcheck(proc, owner, ACL_EXECUTE) != ACLCHECK_OK)
98 ereport(ERROR,
99 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
100 errmsg("permission denied for function \"%s\"", func_name),
101 errhint("Job owner must have EXECUTE privilege on the function.")));
102
103 /* Verify that the owner can create a background worker */
104 ts_bgw_job_validate_job_owner(owner);
105
106 /* Next, insert a new job into jobs table */
107 namestrcpy(&application_name, "User-Defined Action");
108 namestrcpy(&proc_schema, get_namespace_name(get_func_namespace(proc)));
109 namestrcpy(&proc_name, func_name);
110 namestrcpy(&owner_name, GetUserNameFromId(owner, false));
111
112 if (config)
113 job_config_check(&proc_schema, &proc_name, config);
114
115 job_id = ts_bgw_job_insert_relation(&application_name,
116 schedule_interval,
117 &max_runtime,
118 DEFAULT_MAX_RETRIES,
119 &retry_period,
120 &proc_schema,
121 &proc_name,
122 &owner_name,
123 scheduled,
124 0,
125 config);
126
127 if (!PG_ARGISNULL(3))
128 {
129 TimestampTz initial_start = PG_GETARG_TIMESTAMPTZ(3);
130 ts_bgw_job_stat_upsert_next_start(job_id, initial_start);
131 }
132
133 PG_RETURN_INT32(job_id);
134 }
135
136 static BgwJob *
find_job(int32 job_id,bool null_job_id,bool missing_ok)137 find_job(int32 job_id, bool null_job_id, bool missing_ok)
138 {
139 BgwJob *job;
140
141 if (null_job_id && !missing_ok)
142 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("job ID cannot be NULL")));
143
144 job = ts_bgw_job_find(job_id, CurrentMemoryContext, !missing_ok);
145
146 if (NULL == job)
147 {
148 Assert(missing_ok);
149 ereport(NOTICE,
150 (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("job %d not found, skipping", job_id)));
151 }
152
153 return job;
154 }
155
156 /*
157 * CREATE OR REPLACE FUNCTION delete_job(job_id INTEGER) RETURNS VOID
158 */
159 Datum
job_delete(PG_FUNCTION_ARGS)160 job_delete(PG_FUNCTION_ARGS)
161 {
162 int32 job_id = PG_GETARG_INT32(0);
163 BgwJob *job;
164 Oid owner;
165
166 TS_PREVENT_FUNC_IF_READ_ONLY();
167
168 job = find_job(job_id, PG_ARGISNULL(0), false);
169 owner = get_role_oid(NameStr(job->fd.owner), false);
170
171 if (!has_privs_of_role(GetUserId(), owner))
172 ereport(ERROR,
173 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
174 errmsg("insufficient permissions to delete job for user \"%s\"",
175 NameStr(job->fd.owner))));
176
177 ts_bgw_job_delete_by_id(job_id);
178
179 PG_RETURN_VOID();
180 }
181
182 /* This function only updates the fields modifiable with alter_job. */
183 static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo * ti,void * const data)184 bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
185 {
186 BgwJob *updated_job = (BgwJob *) data;
187 bool should_free;
188 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
189 HeapTuple new_tuple;
190
191 Datum values[Natts_bgw_job] = { 0 };
192 bool isnull[Natts_bgw_job] = { 0 };
193 bool repl[Natts_bgw_job] = { 0 };
194
195 Datum old_schedule_interval =
196 slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
197 Assert(!isnull[0]);
198
199 /* when we update the schedule interval, modify the next start time as well*/
200 if (!DatumGetBool(DirectFunctionCall2(interval_eq,
201 old_schedule_interval,
202 IntervalPGetDatum(&updated_job->fd.schedule_interval))))
203 {
204 BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);
205
206 if (stat != NULL)
207 {
208 TimestampTz next_start = DatumGetTimestampTz(
209 DirectFunctionCall2(timestamptz_pl_interval,
210 TimestampTzGetDatum(stat->fd.last_finish),
211 IntervalPGetDatum(&updated_job->fd.schedule_interval)));
212 /* allow DT_NOBEGIN for next_start here through allow_unset=true in the case that
213 * last_finish is DT_NOBEGIN,
214 * This means the value is counted as unset which is what we want */
215 ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
216 }
217 values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
218 IntervalPGetDatum(&updated_job->fd.schedule_interval);
219 repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
220 }
221
222 values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
223 IntervalPGetDatum(&updated_job->fd.max_runtime);
224 repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;
225
226 values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
227 Int32GetDatum(updated_job->fd.max_retries);
228 repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;
229
230 values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
231 IntervalPGetDatum(&updated_job->fd.retry_period);
232 repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;
233
234 values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
235 BoolGetDatum(updated_job->fd.scheduled);
236 repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;
237
238 repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
239 if (updated_job->fd.config)
240 {
241 job_config_check(&updated_job->fd.proc_schema,
242 &updated_job->fd.proc_name,
243 updated_job->fd.config);
244 values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
245 JsonbPGetDatum(updated_job->fd.config);
246 }
247 else
248 isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
249
250 new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);
251
252 ts_catalog_update(ti->scanrel, new_tuple);
253
254 heap_freetuple(new_tuple);
255 if (should_free)
256 heap_freetuple(tuple);
257
258 return SCAN_DONE;
259 }
260
261 /*
262 * Overwrite job with specified job_id with the given fields
263 *
264 * This function only updates the fields modifiable with alter_job.
265 */
266 static void
ts_bgw_job_update_by_id(int32 job_id,BgwJob * job)267 ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
268 {
269 ScanKeyData scankey[1];
270 Catalog *catalog = ts_catalog_get();
271 ScanTupLock scantuplock = {
272 .waitpolicy = LockWaitBlock,
273 .lockmode = LockTupleExclusive,
274 };
275 ScannerCtx scanctx = { .table = catalog_get_table_id(catalog, BGW_JOB),
276 .index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
277 .nkeys = 1,
278 .scankey = scankey,
279 .data = job,
280 .limit = 1,
281 .tuple_found = bgw_job_tuple_update_by_id,
282 .lockmode = RowExclusiveLock,
283 .scandirection = ForwardScanDirection,
284 .result_mctx = CurrentMemoryContext,
285 .tuplock = &scantuplock };
286
287 ScanKeyInit(&scankey[0],
288 Anum_bgw_job_pkey_idx_id,
289 BTEqualStrategyNumber,
290 F_INT4EQ,
291 Int32GetDatum(job_id));
292
293 ts_scanner_scan(&scanctx);
294 }
295
296 /*
297 * CREATE OR REPLACE PROCEDURE run_job(job_id INTEGER)
298 */
299 Datum
job_run(PG_FUNCTION_ARGS)300 job_run(PG_FUNCTION_ARGS)
301 {
302 int32 job_id = PG_GETARG_INT32(0);
303 BgwJob *job = find_job(job_id, PG_ARGISNULL(0), false);
304
305 job_execute(job);
306
307 PG_RETURN_VOID();
308 }
309
310 /*
311 * CREATE OR REPLACE FUNCTION alter_job(
312 * 0 job_id INTEGER,
313 * 1 schedule_interval INTERVAL = NULL,
314 * 2 max_runtime INTERVAL = NULL,
315 * 3 max_retries INTEGER = NULL,
316 * 4 retry_period INTERVAL = NULL,
317 * 5 scheduled BOOL = NULL,
318 * 6 config JSONB = NULL,
319 * 7 next_start TIMESTAMPTZ = NULL
320 * 8 if_exists BOOL = FALSE,
321 * ) RETURNS TABLE (
322 * job_id INTEGER,
323 * schedule_interval INTERVAL,
324 * max_runtime INTERVAL,
325 * max_retries INTEGER,
326 * retry_period INTERVAL,
327 * scheduled BOOL,
328 * config JSONB,
329 * next_start TIMESTAMPTZ
330 * )
331 */
332 Datum
job_alter(PG_FUNCTION_ARGS)333 job_alter(PG_FUNCTION_ARGS)
334 {
335 BgwJobStat *stat;
336 TupleDesc tupdesc;
337 Datum values[ALTER_JOB_NUM_COLS] = { 0 };
338 bool nulls[ALTER_JOB_NUM_COLS] = { false };
339 HeapTuple tuple;
340 TimestampTz next_start;
341 int job_id = PG_GETARG_INT32(0);
342 bool if_exists = PG_GETARG_BOOL(8);
343 BgwJob *job;
344
345 TS_PREVENT_FUNC_IF_READ_ONLY();
346
347 /* check that caller accepts tuple and abort early if that is not the
348 * case */
349 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
350 ereport(ERROR,
351 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
352 errmsg("function returning record called in context "
353 "that cannot accept type record")));
354
355 job = find_job(job_id, PG_ARGISNULL(0), if_exists);
356 if (job == NULL)
357 PG_RETURN_NULL();
358
359 ts_bgw_job_permission_check(job);
360
361 if (!PG_ARGISNULL(1))
362 job->fd.schedule_interval = *PG_GETARG_INTERVAL_P(1);
363 if (!PG_ARGISNULL(2))
364 job->fd.max_runtime = *PG_GETARG_INTERVAL_P(2);
365 if (!PG_ARGISNULL(3))
366 job->fd.max_retries = PG_GETARG_INT32(3);
367 if (!PG_ARGISNULL(4))
368 job->fd.retry_period = *PG_GETARG_INTERVAL_P(4);
369 if (!PG_ARGISNULL(5))
370 job->fd.scheduled = PG_GETARG_BOOL(5);
371 if (!PG_ARGISNULL(6))
372 job->fd.config = PG_GETARG_JSONB_P(6);
373
374 ts_bgw_job_update_by_id(job_id, job);
375
376 if (!PG_ARGISNULL(7))
377 ts_bgw_job_stat_upsert_next_start(job_id, PG_GETARG_TIMESTAMPTZ(7));
378
379 stat = ts_bgw_job_stat_find(job_id);
380 if (stat != NULL)
381 next_start = stat->fd.next_start;
382 else
383 next_start = DT_NOBEGIN;
384
385 tupdesc = BlessTupleDesc(tupdesc);
386 values[0] = Int32GetDatum(job->fd.id);
387 values[1] = IntervalPGetDatum(&job->fd.schedule_interval);
388 values[2] = IntervalPGetDatum(&job->fd.max_runtime);
389 values[3] = Int32GetDatum(job->fd.max_retries);
390 values[4] = IntervalPGetDatum(&job->fd.retry_period);
391 values[5] = BoolGetDatum(job->fd.scheduled);
392
393 if (job->fd.config == NULL)
394 nulls[6] = true;
395 else
396 values[6] = JsonbPGetDatum(job->fd.config);
397
398 values[7] = TimestampTzGetDatum(next_start);
399
400 tuple = heap_form_tuple(tupdesc, values, nulls);
401 return HeapTupleGetDatum(tuple);
402 }
403