Skip to content

Commit

Permalink
Refactor job execution history table
Browse files Browse the repository at this point in the history
In #6767 we introduced the ability to track job execution history
including succeeded and failed jobs.

The new metadata table `_timescaledb_internal.bgw_job_stat_history` has
two JSONB columns `config` (store config information) and `error_data`
(store the ErrorData information). The problem is that this approach is
not flexible for future history recording changes so this PR refactor
the current implementation to use only one JSONB column named `data`
that will store more job information in that form:

{
  "job": {
    "owner": "fabrizio",
    "proc_name": "error",
    "scheduled": true,
    "max_retries": -1,
    "max_runtime": "00:00:00",
    "proc_schema": "public",
    "retry_period": "00:05:00",
    "initial_start": "00:05:00",
    "fixed_schedule": true,
    "schedule_interval": "00:00:30"
  },
  "config": {
    "bar": 1
  },
  "error_data": {
    "domain": "postgres-16",
    "lineno": 841,
    "context": "SQL statement \"SELECT 1/0\"\nPL/pgSQL function error(integer,jsonb) line 3 at PERFORM",
    "message": "division by zero",
    "filename": "int.c",
    "funcname": "int4div",
    "proc_name": "error",
    "sqlerrcode": "22012",
    "proc_schema": "public",
    "context_domain": "plpgsql-16"
  }
}
  • Loading branch information
fabriziomello committed Apr 19, 2024
1 parent e02a473 commit 66c0702
Show file tree
Hide file tree
Showing 22 changed files with 279 additions and 173 deletions.
3 changes: 1 addition & 2 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat_history (
execution_start TIMESTAMPTZ NOT NULL DEFAULT NOW(),
execution_finish TIMESTAMPTZ,
succeeded boolean NOT NULL DEFAULT FALSE,
config jsonb,
error_data jsonb,
data jsonb,
-- table constraints
CONSTRAINT bgw_job_stat_history_pkey PRIMARY KEY (id)
);
Expand Down
7 changes: 3 additions & 4 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat_history (
execution_start TIMESTAMPTZ NOT NULL DEFAULT NOW(),
execution_finish TIMESTAMPTZ,
succeeded boolean NOT NULL DEFAULT FALSE,
config jsonb,
error_data jsonb,
data jsonb,
-- table constraints
CONSTRAINT bgw_job_stat_history_pkey PRIMARY KEY (id)
);
Expand All @@ -356,13 +355,13 @@ CREATE INDEX bgw_job_stat_history_job_id_idx ON _timescaledb_internal.bgw_job_st

REVOKE ALL ON _timescaledb_internal.bgw_job_stat_history FROM PUBLIC;

INSERT INTO _timescaledb_internal.bgw_job_stat_history (job_id, pid, execution_start, execution_finish, error_data)
INSERT INTO _timescaledb_internal.bgw_job_stat_history (job_id, pid, execution_start, execution_finish, data)
SELECT
job_id,
pid,
start_time,
finish_time,
error_data
jsonb_build_object('error_data', error_data)
FROM
_timescaledb_internal.job_errors
ORDER BY
Expand Down
2 changes: 1 addition & 1 deletion sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ SELECT
pid,
execution_start,
execution_finish,
error_data
data->'error_data'
FROM
_timescaledb_internal.bgw_job_stat_history
WHERE
Expand Down
38 changes: 19 additions & 19 deletions sql/views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,20 @@ CREATE OR REPLACE VIEW timescaledb_information.job_errors
WITH (security_barrier = true) AS
SELECT
job_id,
error_data->>'proc_schema' as proc_schema,
error_data->>'proc_name' as proc_name,
data->'job'->>'proc_schema' as proc_schema,
data->'job'->>'proc_name' as proc_name,
pid,
execution_start AS start_time,
execution_finish AS finish_time,
error_data->>'sqlerrcode' AS sqlerrcode,
CASE WHEN error_data->>'message' IS NOT NULL THEN
CASE WHEN error_data->>'detail' IS NOT NULL THEN
CASE WHEN error_data->>'hint' IS NOT NULL THEN concat(error_data->>'message', '. ', error_data->>'detail', '. ', error_data->>'hint')
ELSE concat(error_data->>'message', ' ', error_data->>'detail')
data->'error_data'->>'sqlerrcode' AS sqlerrcode,
CASE WHEN data->'error_data'->>'message' IS NOT NULL THEN
CASE WHEN data->'error_data'->>'detail' IS NOT NULL THEN
CASE WHEN data->'error_data'->>'hint' IS NOT NULL THEN concat(data->'error_data'->>'message', '. ', data->'error_data'->>'detail', '. ', data->'error_data'->>'hint')
ELSE concat(data->'error_data'->>'message', ' ', data->'error_data'->>'detail')
END
ELSE
CASE WHEN error_data->>'hint' IS NOT NULL THEN concat(error_data->>'message', '. ', error_data->>'hint')
ELSE error_data->>'message'
CASE WHEN data->'error_data'->>'hint' IS NOT NULL THEN concat(data->'error_data'->>'message', '. ', data->'error_data'->>'hint')
ELSE data->'error_data'->>'message'
END
END
ELSE
Expand All @@ -320,22 +320,22 @@ SELECT
h.id,
h.job_id,
h.succeeded,
coalesce(h.error_data->>'proc_schema', j.proc_schema) as proc_schema,
coalesce(h.error_data->>'proc_name', j.proc_name) as proc_name,
coalesce(h.data->'job'->>'proc_schema', j.proc_schema) as proc_schema,
coalesce(h.data->'job'->>'proc_name', j.proc_name) as proc_name,
h.pid,
h.execution_start AS start_time,
h.execution_finish AS finish_time,
h.config,
h.error_data->>'sqlerrcode' AS sqlerrcode,
h.data->'job'->'config' AS config,
h.data->'error_data'->>'sqlerrcode' AS sqlerrcode,
CASE
WHEN h.succeeded IS FALSE AND h.error_data->>'message' IS NOT NULL THEN
CASE WHEN h.error_data->>'detail' IS NOT NULL THEN
CASE WHEN h.error_data->>'hint' IS NOT NULL THEN concat(h.error_data->>'message', '. ', h.error_data->>'detail', '. ', h.error_data->>'hint')
ELSE concat(h.error_data->>'message', ' ', h.error_data->>'detail')
WHEN h.succeeded IS FALSE AND h.data->'error_data'->>'message' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'detail' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'detail', '. ', h.data->'error_data'->>'hint')
ELSE concat(h.data->'error_data'->>'message', ' ', h.data->'error_data'->>'detail')
END
ELSE
CASE WHEN h.error_data->>'hint' IS NOT NULL THEN concat(h.error_data->>'message', '. ', h.error_data->>'hint')
ELSE h.error_data->>'message'
CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'hint')
ELSE h.data->'error_data'->>'message'
END
END
WHEN h.succeeded IS FALSE AND h.execution_finish IS NOT NULL THEN
Expand Down
20 changes: 3 additions & 17 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result, Jsonb *edata)
}

void
ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result, Jsonb *edata)
ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result)
{
if (!bgw_job_stat_scan_job_id(job->fd.id,
bgw_job_stat_tuple_mark_crash_reported,
Expand All @@ -693,7 +693,7 @@ ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result, Jsonb *edata)
errmsg("unable to find job statistics for job %d", job->fd.id)));
}

ts_bgw_job_stat_history_mark_end(job, result, edata);
ts_bgw_job_stat_history_mark_end(job, result, NULL);

pgstat_report_activity(STATE_IDLE, NULL);
}
Expand Down Expand Up @@ -800,21 +800,7 @@ ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, int32 consecutive_f
/* Update the errors table regarding the crash */
if (!ts_flags_are_set_32(jobstat->fd.flags, LAST_CRASH_REPORTED))
{
NameData proc_schema = { .data = { 0 } }, proc_name = { .data = { 0 } };
JsonbParseState *parse_state = NULL;
JsonbValue *result = NULL;

/* add the proc_schema, proc_name to the jsonb */
namestrcpy(&proc_schema, NameStr(job->fd.proc_schema));
namestrcpy(&proc_name, NameStr(job->fd.proc_name));

/* build jsonb error data field */
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(proc_schema));
ts_jsonb_add_str(parse_state, "proc_name", NameStr(proc_name));
result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

ts_bgw_job_stat_mark_crash_reported(job, JOB_FAILURE, JsonbValueToJsonb(result));
ts_bgw_job_stat_mark_crash_reported(job, JOB_FAILURE);
}

return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job);
Expand Down
3 changes: 1 addition & 2 deletions src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ extern bool ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job);

extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job,
int32 consecutive_failed_launches);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result,
Jsonb *edata);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result);

extern TSDLLEXPORT TimestampTz ts_get_next_scheduled_execution_slot(BgwJob *job,
TimestampTz finish_time);
133 changes: 95 additions & 38 deletions src/bgw/job_stat_history.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
#include <access/xact.h>
#include <utils/jsonb.h>

#include "compat/compat.h"
#include "guc.h"
#include "hypertable.h"
#include "job_stat_history.h"
#include "jsonb_utils.h"
#include "timer.h"
Expand All @@ -21,51 +23,107 @@ typedef struct BgwJobStatHistoryContext
Jsonb *edata;
} BgwJobStatHistoryContext;

static Jsonb *
build_job_info(BgwJob *job)
{
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

/* all fields that is possible to change with `alter_job` API */
ts_jsonb_add_interval(parse_state, "schedule_interval", &job->fd.schedule_interval);
ts_jsonb_add_interval(parse_state, "max_runtime", &job->fd.max_runtime);
ts_jsonb_add_int32(parse_state, "max_retries", job->fd.max_retries);
ts_jsonb_add_interval(parse_state, "retry_period", &job->fd.retry_period);
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(job->fd.proc_schema));
ts_jsonb_add_str(parse_state, "proc_name", NameStr(job->fd.proc_name));
ts_jsonb_add_str(parse_state, "owner", GetUserNameFromId(job->fd.owner, false));
ts_jsonb_add_bool(parse_state, "scheduled", job->fd.scheduled);
ts_jsonb_add_bool(parse_state, "fixed_schedule", job->fd.fixed_schedule);

if (job->fd.initial_start)
ts_jsonb_add_interval(parse_state, "initial_start", &job->fd.retry_period);

if (job->fd.hypertable_id != INVALID_HYPERTABLE_ID)
ts_jsonb_add_int32(parse_state, "hypertable_id", job->fd.hypertable_id);

if (job->fd.config != NULL)
{
/* config information jsonb*/
JsonbValue value = { 0 };
JsonbToJsonbValue(job->fd.config, &value);
ts_jsonb_add_value(parse_state, "config", &value);
}

if (strlen(NameStr(job->fd.check_schema)) > 0)
ts_jsonb_add_str(parse_state, "check_schema", NameStr(job->fd.check_schema));

if (strlen(NameStr(job->fd.check_name)) > 0)
ts_jsonb_add_str(parse_state, "check_name", NameStr(job->fd.check_name));

if (job->fd.timezone != NULL)
ts_jsonb_add_str(parse_state, "timezone", text_to_cstring(job->fd.timezone));

return JsonbValueToJsonb(pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL));
}

static Jsonb *
ts_bgw_job_stat_history_build_data_info(BgwJobStatHistoryContext *context)
{
JsonbParseState *parse_state = NULL;
JsonbValue value = { 0 };
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);

Assert(context != NULL && context->job != NULL);

/* job information jsonb */
JsonbToJsonbValue(build_job_info(context->job), &value);
ts_jsonb_add_value(parse_state, "job", &value);

if (context->edata != NULL)
{
/* error information jsonb */
JsonbToJsonbValue(context->edata, &value);
ts_jsonb_add_value(parse_state, "error_data", &value);
}

return JsonbValueToJsonb(pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL));
}

static void
ts_bgw_job_stat_history_insert(BgwJob *job, BgwJobStatHistoryContext *context)
ts_bgw_job_stat_history_insert(BgwJobStatHistoryContext *context)
{
Assert(context != NULL);

Relation rel = table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT_HISTORY),
ShareRowExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
NullableDatum values[Natts_bgw_job_stat_history] = { { 0 } };
CatalogSecurityContext sec_ctx;

ts_datum_set_int32(Anum_bgw_job_stat_history_job_id, values, job->fd.id, false);
ts_datum_set_int32(Anum_bgw_job_stat_history_job_id, values, context->job->fd.id, false);
ts_datum_set_int32(Anum_bgw_job_stat_history_pid, values, 0, true);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_start,
values,
job->job_history.execution_start,
context->job->job_history.execution_start,
false);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish, values, 0, true);
ts_datum_set_bool(Anum_bgw_job_stat_history_succeeded, values, false);
ts_datum_set_jsonb(Anum_bgw_job_stat_history_config, values, job->fd.config);

if (job->fd.config != NULL)
{
ts_datum_set_jsonb(Anum_bgw_job_stat_history_config, values, job->fd.config);
}

/* In case of the GUC be disabled all errors are logged then the `context` will contain
* `error_data` information */
if (context != NULL)
{
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish,
values,
ts_timer_get_current_timestamp(),
false);
ts_datum_set_jsonb(Anum_bgw_job_stat_history_error_data, values, context->edata);
}
else
ts_datum_set_jsonb(Anum_bgw_job_stat_history_error_data, values, NULL);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish,
values,
ts_timer_get_current_timestamp(),
false);
ts_datum_set_jsonb(Anum_bgw_job_stat_history_data,
values,
ts_bgw_job_stat_history_build_data_info(context));

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);

if (job->job_history.id == INVALID_BGW_JOB_STAT_HISTORY_ID)
if (context->job->job_history.id == INVALID_BGW_JOB_STAT_HISTORY_ID)
{
/* We need to get a new job id to mark the end later */
job->job_history.id = ts_catalog_table_next_seq_id(ts_catalog_get(), BGW_JOB_STAT_HISTORY);
context->job->job_history.id =
ts_catalog_table_next_seq_id(ts_catalog_get(), BGW_JOB_STAT_HISTORY);
}
ts_datum_set_int64(Anum_bgw_job_stat_history_id, values, job->job_history.id, false);
ts_datum_set_int64(Anum_bgw_job_stat_history_id, values, context->job->job_history.id, false);

ts_catalog_insert_datums(rel, desc, values);
ts_catalog_restore_user(&sec_ctx);
Expand All @@ -80,7 +138,11 @@ ts_bgw_job_stat_history_mark_start(BgwJob *job)
if (!ts_guc_enable_job_execution_logging)
return;

ts_bgw_job_stat_history_insert(job, NULL);
BgwJobStatHistoryContext context = {
.job = job,
};

ts_bgw_job_stat_history_insert(&context);
}

static bool
Expand Down Expand Up @@ -151,18 +213,13 @@ bgw_job_stat_history_tuple_mark_end(TupleInfo *ti, void *const data)
BoolGetDatum((context->result == JOB_SUCCESS));
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_succeeded)] = true;

if (context->job->fd.config != NULL)
{
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_config)] =
JsonbPGetDatum(context->job->fd.config);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_config)] = true;
}
Jsonb *job_history_data = ts_bgw_job_stat_history_build_data_info(context);

if (context->edata != NULL)
if (job_history_data != NULL)
{
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_error_data)] =
JsonbPGetDatum(context->edata);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_error_data)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] =
JsonbPGetDatum(job_history_data);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] = true;
}

HeapTuple new_tuple =
Expand Down Expand Up @@ -204,7 +261,7 @@ ts_bgw_job_stat_history_mark_end(BgwJob *job, JobResult result, Jsonb *edata)
* to insert all the information in the job error history table */
if (!ts_guc_enable_job_execution_logging && result != JOB_SUCCESS)
{
ts_bgw_job_stat_history_insert(new_job, &context);
ts_bgw_job_stat_history_insert(&context);
}
else
{
Expand Down
13 changes: 13 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>

#include <commands/cluster.h>
#include <commands/defrem.h>
#include <commands/explain.h>
Expand All @@ -16,6 +17,7 @@
#include <nodes/nodes.h>
#include <optimizer/restrictinfo.h>
#include <pgstat.h>
#include <utils/jsonb.h>
#include <utils/lsyscache.h>
#include <utils/rel.h>

Expand Down Expand Up @@ -1038,3 +1040,14 @@ error_severity(int elevel)
return prefix;
}
#endif

#if PG14_LT
/* Copied from jsonb_util.c */
static inline void
JsonbToJsonbValue(Jsonb *jsonb, JsonbValue *val)
{
val->type = jbvBinary;
val->val.binary.data = &jsonb->root;
val->val.binary.len = VARSIZE(jsonb) - VARHDRSZ;
}
#endif
11 changes: 0 additions & 11 deletions src/telemetry/telemetry_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@
#include "scan_iterator.h"
#include "jsonb_utils.h"

#if PG14_LT
/* Copied from jsonb_util.c */
static void
JsonbToJsonbValue(Jsonb *jsonb, JsonbValue *val)
{
val->type = jbvBinary;
val->val.binary.data = &jsonb->root;
val->val.binary.len = VARSIZE(jsonb) - VARHDRSZ;
}
#endif

void
ts_telemetry_event_truncate(void)
{
Expand Down
Loading

0 comments on commit 66c0702

Please sign in to comment.