Skip to content

Commit

Permalink
Enable altering job schedule type through alter_job
Browse files Browse the repository at this point in the history
In #4664 we introduced fixed schedules for jobs. This was done by
introducing additional parameters fixed_schedule, initial_start and
timezone for our add_job and add_policy APIs.
These fields were not updatable by alter_job so it was not
possible to switch from one type of schedule to another without dropping
and recreating existing jobs and policies.
This patch adds the missing parameters to alter_job to enable switching
from one type of schedule to another.

Fixes #5681
  • Loading branch information
konskov committed Jul 3, 2023
1 parent b9a58dd commit 06d20b1
Show file tree
Hide file tree
Showing 32 changed files with 578 additions and 209 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_5758
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5758 Enable altering job schedule type through `alter_job`
7 changes: 5 additions & 2 deletions sql/job_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ CREATE OR REPLACE FUNCTION @extschema@.alter_job(
config JSONB = NULL,
next_start TIMESTAMPTZ = NULL,
if_exists BOOL = FALSE,
check_config REGPROC = NULL
check_config REGPROC = NULL,
fixed_schedule BOOL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT DEFAULT NULL
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB,
next_start TIMESTAMPTZ, check_config TEXT)
next_start TIMESTAMPTZ, check_config TEXT, fixed_schedule BOOL, initial_start TIMESTAMPTZ, timezone TEXT)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;

Expand Down
32 changes: 32 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DROP FUNCTION IF EXISTS @extschema@.alter_job(
INTEGER,
INTERVAL,
INTERVAL,
INTEGER,
INTERVAL,
BOOL,
JSONB,
TIMESTAMPTZ,
BOOL,
REGPROC
);

CREATE FUNCTION @extschema@.alter_job(
job_id INTEGER,
schedule_interval INTERVAL = NULL,
max_runtime INTERVAL = NULL,
max_retries INTEGER = NULL,
retry_period INTERVAL = NULL,
scheduled BOOL = NULL,
config JSONB = NULL,
next_start TIMESTAMPTZ = NULL,
if_exists BOOL = FALSE,
check_config REGPROC = NULL,
fixed_schedule BOOL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT DEFAULT NULL
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB,
next_start TIMESTAMPTZ, check_config TEXT, fixed_schedule BOOL, initial_start TIMESTAMPTZ, timezone TEXT)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;
32 changes: 32 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DROP FUNCTION IF EXISTS @extschema@.alter_job(
INTEGER,
INTERVAL,
INTERVAL,
INTEGER,
INTERVAL,
BOOL,
JSONB,
TIMESTAMPTZ,
BOOL,
REGPROC,
BOOL,
TIMESTAMPTZ,
TEXT
);

CREATE FUNCTION @extschema@.alter_job(
job_id INTEGER,
schedule_interval INTERVAL = NULL,
max_runtime INTERVAL = NULL,
max_retries INTEGER = NULL,
retry_period INTERVAL = NULL,
scheduled BOOL = NULL,
config JSONB = NULL,
next_start TIMESTAMPTZ = NULL,
if_exists BOOL = FALSE,
check_config REGPROC = NULL
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB,
next_start TIMESTAMPTZ, check_config TEXT)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;
22 changes: 22 additions & 0 deletions src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size)
job->fd.initial_start =
DatumGetTimestampTz(values[AttrNumberGetAttrOffset(Anum_bgw_job_initial_start)]);
}
else
job->fd.initial_start = DT_NOBEGIN;

if (!nulls[AttrNumberGetAttrOffset(Anum_bgw_job_timezone)])
job->fd.timezone =
Expand Down Expand Up @@ -878,6 +880,10 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
BoolGetDatum(updated_job->fd.scheduled);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_fixed_schedule)] =
BoolGetDatum(updated_job->fd.fixed_schedule);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_fixed_schedule)] = true;

doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_check_schema)] =
Expand Down Expand Up @@ -912,6 +918,22 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_hypertable_id)] = true;

if (TIMESTAMP_NOT_FINITE(updated_job->fd.initial_start))
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_initial_start)] = true;
else
values[AttrNumberGetAttrOffset(Anum_bgw_job_initial_start)] =
TimestampTzGetDatum(updated_job->fd.initial_start);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_initial_start)] = true;

if (updated_job->fd.timezone)
{
values[AttrNumberGetAttrOffset(Anum_bgw_job_timezone)] =
PointerGetDatum(updated_job->fd.timezone);
}
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_timezone)] = true;
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_timezone)] = true;

new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, doReplace);

ts_catalog_update(ti->scanrel, new_tuple);
Expand Down
8 changes: 4 additions & 4 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ typedef struct
* In those cases, we only have month components. So we compute the difference in
* months between the initial_start's timebucket and the finish time's bucket.
*/
static TimestampTz
get_next_scheduled_execution_slot(BgwJob *job, TimestampTz finish_time)
TimestampTz
ts_get_next_scheduled_execution_slot(BgwJob *job, TimestampTz finish_time)
{
Assert(job->fd.fixed_schedule == true);
Datum schedint_datum = IntervalPGetDatum(&job->fd.schedule_interval);
Expand Down Expand Up @@ -278,7 +278,7 @@ calculate_next_start_on_success_fixed(TimestampTz finish_time, BgwJob *job)
{
TimestampTz next_slot;

next_slot = get_next_scheduled_execution_slot(job, finish_time);
next_slot = ts_get_next_scheduled_execution_slot(job, finish_time);

return next_slot;
}
Expand Down Expand Up @@ -431,7 +431,7 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
* of the next scheduled slot, so we don't get off track */
if (job->fd.fixed_schedule)
{
TimestampTz next_slot = get_next_scheduled_execution_slot(job, finish_time);
TimestampTz next_slot = ts_get_next_scheduled_execution_slot(job, finish_time);
if (res > next_slot)
res = next_slot;
}
Expand Down
2 changes: 2 additions & 0 deletions src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job,
int32 consecutive_failed_launches);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_crash_reported(int32 bgw_job_id);

extern TSDLLEXPORT TimestampTz ts_get_next_scheduled_execution_slot(BgwJob *job,
TimestampTz finish_time);
#endif /* BGW_JOB_STAT_H */
2 changes: 1 addition & 1 deletion test/src/bgw/scheduler_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static const char *test_job_type_names[_MAX_TEST_JOB_TYPE] = {
[TEST_JOB_TYPE_JOB_4] = "bgw_test_job_4",
};

/* this is copied from the job_stat/get_next_scheduled_execution_slot */
/* this is copied from the job_stat/ts_get_next_scheduled_execution_slot */
extern Datum
ts_test_next_scheduled_execution_slot(PG_FUNCTION_ARGS)
{
Expand Down
108 changes: 107 additions & 1 deletion tsl/src/bgw_policy/job_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/* Default retry period for reorder_jobs is currently 5 minutes */
#define DEFAULT_RETRY_PERIOD (5 * USECS_PER_MINUTE)

#define ALTER_JOB_NUM_COLS 10
#define ALTER_JOB_NUM_COLS 13

/*
* This function ensures that the check function has the required signature
Expand Down Expand Up @@ -282,6 +282,9 @@ job_run(PG_FUNCTION_ARGS)
* 7 next_start TIMESTAMPTZ = NULL
* 8 if_exists BOOL = FALSE,
* 9 check_config REGPROC = NULL
* 10 fixed_schedule BOOL = NULL,
* 11 initial_start TIMESTAMPTZ = NULL
* 12 timezone TEXT = NULL
* ) RETURNS TABLE (
* job_id INTEGER,
* schedule_interval INTERVAL,
Expand All @@ -292,6 +295,9 @@ job_run(PG_FUNCTION_ARGS)
* config JSONB,
* next_start TIMESTAMPTZ
* check_config TEXT
* fixed_schedule BOOL
* initial_start TIMESTAMPTZ
* timezone TEXT
* )
*/
Datum
Expand All @@ -313,6 +319,12 @@ job_alter(PG_FUNCTION_ARGS)
/* Added space for period and NULL */
char schema_qualified_check_name[2 * NAMEDATALEN + 2] = { 0 };
bool unregister_check = (!PG_ARGISNULL(9) && !OidIsValid(check));
TimestampTz initial_start = PG_ARGISNULL(11) ? DT_NOBEGIN : PG_GETARG_TIMESTAMPTZ(11);
text *timezone = PG_ARGISNULL(12) ? NULL : PG_GETARG_TEXT_PP(12);
char *valid_timezone = NULL;
/* verify it's a valid timezone */
if (timezone != NULL)
valid_timezone = ts_bgw_job_validate_timezone(PG_GETARG_DATUM(12));

TS_PREVENT_FUNC_IF_READ_ONLY();

Expand Down Expand Up @@ -387,7 +399,86 @@ job_alter(PG_FUNCTION_ARGS)
namestrcpy(&job->fd.check_schema, NameStr(empty_namedata));
namestrcpy(&job->fd.check_name, NameStr(empty_namedata));
}

if (!PG_ARGISNULL(10))
{
bool fixed_schedule = PG_GETARG_BOOL(10);
/*
* initial_start is a required argument for fixed schedules
* so we use the current timestamp if it's not provided
*/
if (fixed_schedule)
{
if (TIMESTAMP_NOT_FINITE(initial_start))
{
initial_start = ts_timer_get_current_timestamp();
elog(NOTICE,
"Using current time [%s] as initial start for job %d",
DatumGetCString(
DirectFunctionCall1(timestamptz_out, TimestampTzGetDatum(initial_start))),
job->fd.id);
job->fd.initial_start = initial_start;
}
}
job->fd.fixed_schedule = fixed_schedule;
}
if (!PG_ARGISNULL(11))
{
/* user provided +- infinity as initial_start, this is not acceptable */
if (TIMESTAMP_NOT_FINITE(initial_start))
{
initial_start = ts_timer_get_current_timestamp();
elog(NOTICE,
"Using current time [%s] as initial start for job %d",
DatumGetCString(
DirectFunctionCall1(timestamptz_out, TimestampTzGetDatum(initial_start))),
job->fd.id);
}
job->fd.initial_start = initial_start;
}

if (valid_timezone != NULL)
job->fd.timezone = cstring_to_text(valid_timezone);
else
job->fd.timezone = NULL;
/* it's also possible to alter the fields initial_start and timezone without
* specifying fixed_schedule. In that case, update them and also update the
* next_start accordingly.
* If the job is not on a fixed schedule, then this has no effect on the next_start,
* so maybe print a message to the user
* that these changes are not really doing anything */

/* this function will also update the next_start if the schedule interval is changed,
but I'm not going to rely on this to change stuff */
ts_bgw_job_update_by_id(job_id, job);
/* one of the fields below changing necessitates a next_start update */
if (!PG_ARGISNULL(10) || !TIMESTAMP_NOT_FINITE(initial_start) || (valid_timezone != NULL))
{
TimestampTz next_start_calculated;
if (job->fd.fixed_schedule)
{
next_start_calculated =
ts_get_next_scheduled_execution_slot(job, ts_timer_get_current_timestamp());
ts_bgw_job_stat_update_next_start(job->fd.id, next_start_calculated, false);
}
else
{
/* last finish time plus schedule interval */
BgwJobStat *stat = ts_bgw_job_stat_find(job->fd.id);

if (stat != NULL)
{
next_start_calculated = DatumGetTimestampTz(
DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(stat->fd.last_finish),
IntervalPGetDatum(&job->fd.schedule_interval)));
/* allow DT_NOBEGIN for next_start here through allow_unset=true in the case that
* last_finish is DT_NOBEGIN,
* This means the value is counted as unset which is what we want */
ts_bgw_job_stat_update_next_start(job->fd.id, next_start_calculated, true);
}
}
}

if (!PG_ARGISNULL(7))
ts_bgw_job_stat_upsert_next_start(job_id, PG_GETARG_TIMESTAMPTZ(7));
Expand Down Expand Up @@ -420,6 +511,21 @@ job_alter(PG_FUNCTION_ARGS)
else
nulls[8] = true;

/* values/nulls[9]: fixed_schedule */
values[9] = job->fd.fixed_schedule;
/* values/nulls[10]: initial_start */
if (TIMESTAMP_NOT_FINITE(job->fd.initial_start))
{
nulls[10] = true;
}
else
values[10] = TimestampTzGetDatum(job->fd.initial_start);
/* values/nulls[11]: timezone */
if (valid_timezone)
values[11] = CStringGetTextDatum(valid_timezone);
else
nulls[11] = true;

tuple = heap_form_tuple(tupdesc, values, nulls);
return HeapTupleGetDatum(tuple);
}
Expand Down
6 changes: 3 additions & 3 deletions tsl/test/expected/bgw_custom.out
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,9 @@ ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_orderby
SELECT add_compression_policy('sensor_data', INTERVAL '1' minute) AS compressjob_id \gset
-- set recompress to true
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}', 'true')) FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
alter_job
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
(1014,"@ 3 days 12 hours","@ 0",-1,"@ 1 hour",t,"{""recompress"": true, ""hypertable_id"": 4, ""compress_after"": ""@ 1 min""}",-infinity,_timescaledb_internal.policy_compression_check)
alter_job
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
(1014,"@ 3 days 12 hours","@ 0",-1,"@ 1 hour",t,"{""recompress"": true, ""hypertable_id"": 4, ""compress_after"": ""@ 1 min""}",-infinity,_timescaledb_internal.policy_compression_check,f,,)
(1 row)

-- create new chunks
Expand Down
12 changes: 6 additions & 6 deletions tsl/test/expected/bgw_db_scheduler.out
Original file line number Diff line number Diff line change
Expand Up @@ -1418,9 +1418,9 @@ SELECT wait_for_timer_to_run(0);
SELECT insert_job('another', 'bgw_test_job_1', INTERVAL '100ms', INTERVAL '100s', INTERVAL '1s') AS job_id \gset
-- call alter_job to trigger cache invalidation
SELECT alter_job(:job_id,scheduled:=true);
alter_job
-----------------------------------------------------------------
(1024,"@ 0.1 secs","@ 1 min 40 secs",5,"@ 1 sec",t,,-infinity,)
alter_job
---------------------------------------------------------------------
(1024,"@ 0.1 secs","@ 1 min 40 secs",5,"@ 1 sec",t,,-infinity,,f,,)
(1 row)

SELECT ts_bgw_params_reset_time(50000, true);
Expand Down Expand Up @@ -1534,9 +1534,9 @@ SELECT wait_for_timer_to_run(400000);
SELECT insert_job('new_job', 'bgw_test_job_1', INTERVAL '10ms', INTERVAL '100s', INTERVAL '1s') AS job_id \gset
-- call alter_job to trigger cache invalidation
SELECT alter_job(:job_id,scheduled:=true);
alter_job
------------------------------------------------------------------
(1025,"@ 0.01 secs","@ 1 min 40 secs",5,"@ 1 sec",t,,-infinity,)
alter_job
----------------------------------------------------------------------
(1025,"@ 0.01 secs","@ 1 min 40 secs",5,"@ 1 sec",t,,-infinity,,f,,)
(1 row)

SELECT ts_bgw_params_reset_time(450000, true);
Expand Down
Loading

0 comments on commit 06d20b1

Please sign in to comment.