Skip to content

Commit

Permalink
Merge pull request hasura#5736 from codingkarthik/scheduled-triggers-…
Browse files Browse the repository at this point in the history
…created-at-bug-5272

hasura#5736
  • Loading branch information
kodiakhq[bot] authored and codingkarthik committed Nov 6, 2020
1 parent c187d0b commit 93b6a07
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The corresponding JWT config can be:

- server: allow remote relationships joining `type` column with `[type]` input argument as spec allows this coercion (fixes #5133)
- server: add action-like URL templating for event triggers and remote schemas (fixes #2483)
- server: change `created_at` column type from `timestamp` to `timestamptz` for scheduled triggers tables (fix #5722)

### Breaking change

Expand Down
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
unlockEventsForShutdown pool hasuraLogger "event_triggers" "" unlockEvents leEvents
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "unlocking scheduled events that are locked by the HGE"
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "cron events" unlockCronEvents leCronEvents
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leStandAloneEvents
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leOneOffEvents

unlockEventsForShutdown
:: Q.PGPool
Expand Down
14 changes: 7 additions & 7 deletions server/src-lib/Hasura/Eventing/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ import Control.Concurrent.STM.TVar
import Control.Monad.STM
import Hasura.Prelude
import Hasura.RQL.Types.EventTrigger (EventId)
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, StandAloneScheduledEventId)
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, OneOffScheduledEventId)

import qualified Data.Set as Set

data LockedEventsCtx
= LockedEventsCtx
{ leCronEvents :: TVar (Set.Set CronEventId)
, leStandAloneEvents :: TVar (Set.Set StandAloneScheduledEventId)
, leEvents :: TVar (Set.Set EventId)
{ leCronEvents :: TVar (Set.Set CronEventId)
, leOneOffEvents :: TVar (Set.Set OneOffScheduledEventId)
, leEvents :: TVar (Set.Set EventId)
}

initLockedEventsCtx :: STM LockedEventsCtx
initLockedEventsCtx = do
leCronEvents <- newTVar Set.empty
leStandAloneEvents <- newTVar Set.empty
leEvents <- newTVar Set.empty
leCronEvents <- newTVar Set.empty
leOneOffEvents <- newTVar Set.empty
leEvents <- newTVar Set.empty
return $ LockedEventsCtx{..}

-- | After the events are fetched from the DB, we store the locked events
Expand Down
169 changes: 93 additions & 76 deletions server/src-lib/Hasura/Eventing/ScheduledTrigger.hs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Hasura.RQL.Types.ScheduledTrigger
, STRetryConf(..)
, CreateScheduledEvent(..)
, CronEventId
, StandAloneScheduledEventId
, OneOffScheduledEventId
, formatTime'
, defaultSTRetryConf
) where
Expand All @@ -29,7 +29,7 @@ import qualified Hasura.RQL.Types.EventTrigger as ET

type CronEventId = Text

type StandAloneScheduledEventId = Text
type OneOffScheduledEventId = Text

data STRetryConf
= STRetryConf
Expand Down
2 changes: 1 addition & 1 deletion server/src-rsr/catalog_version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
37
38
8 changes: 4 additions & 4 deletions server/src-rsr/initialise.sql
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ CREATE TABLE hdb_catalog.hdb_cron_events
scheduled_time TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL DEFAULT 'scheduled',
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
next_retry_at TIMESTAMPTZ,

FOREIGN KEY (trigger_name) REFERENCES hdb_catalog.hdb_cron_triggers(name)
Expand All @@ -774,7 +774,7 @@ CREATE TABLE hdb_catalog.hdb_cron_event_invocation_logs
status INTEGER,
request JSON,
response JSON,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),

FOREIGN KEY (event_id) REFERENCES hdb_catalog.hdb_cron_events (id)
ON UPDATE CASCADE ON DELETE CASCADE
Expand Down Expand Up @@ -803,7 +803,7 @@ CREATE TABLE hdb_catalog.hdb_scheduled_events
header_conf JSON,
status TEXT NOT NULL DEFAULT 'scheduled',
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
next_retry_at TIMESTAMPTZ,
comment TEXT,
CONSTRAINT valid_status CHECK (status IN ('scheduled','locked','delivered','error','dead'))
Expand All @@ -818,7 +818,7 @@ event_id TEXT,
status INTEGER,
request JSON,
response JSON,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),

FOREIGN KEY (event_id) REFERENCES hdb_catalog.hdb_scheduled_events (id)
ON DELETE CASCADE ON UPDATE CASCADE
Expand Down
11 changes: 11 additions & 0 deletions server/src-rsr/migrations/37_to_38.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE hdb_catalog.hdb_cron_events
ALTER COLUMN created_at TYPE TIMESTAMPTZ;

ALTER TABLE hdb_catalog.hdb_cron_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMPTZ;

ALTER TABLE hdb_catalog.hdb_scheduled_events
ALTER COLUMN created_at TYPE TIMESTAMPTZ;

ALTER TABLE hdb_catalog.hdb_scheduled_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMPTZ;
11 changes: 11 additions & 0 deletions server/src-rsr/migrations/38_to_37.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE hdb_catalog.hdb_cron_events
ALTER COLUMN created_at TYPE TIMESTAMP;

ALTER TABLE hdb_catalog.hdb_cron_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMP;

ALTER TABLE hdb_catalog.hdb_scheduled_events
ALTER COLUMN created_at TYPE TIMESTAMP;

ALTER TABLE hdb_catalog.hdb_scheduled_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMP;
62 changes: 59 additions & 3 deletions server/tests-py/test_scheduled_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time

# The create and delete tests should ideally go in setup and teardown YAML files,
# We can't use that here because, the payload is dynamic i.e. in case of adhoc Scheduled Triggers
# We can't use that here because, the payload is dynamic i.e. in case of one-off scheduled events
# the value is the current timestamp and in case of cron Triggers, the cron schedule is
# derived based on the current timestamp

Expand Down Expand Up @@ -39,7 +39,8 @@ def test_create_scheduled_event(self,hge_ctx):
"webhook":'{{SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN}}/test',
"schedule_at":stringify_datetime(datetime.utcnow()),
"payload":self.webhook_payload,
"headers":self.header_conf
"headers":self.header_conf,
"comment":"test scheduled event"
}
}
st, resp = hge_ctx.v1q(query)
Expand Down Expand Up @@ -78,10 +79,24 @@ def test_create_trigger_with_error_returning_webhook(self,hge_ctx):
assert st == 200, resp

def test_check_fired_webhook_event(self,hge_ctx,scheduled_triggers_evts_webhook):
query = {
"type":"run_sql",
"args":{
"sql":'''
select timezone('utc',created_at) as created_at
from hdb_catalog.hdb_scheduled_events
where comment = 'test scheduled event';
'''
}
}
st, resp = hge_ctx.v1q(query)
assert st == 200, resp
db_created_at = resp['result'][1][0]
event = scheduled_triggers_evts_webhook.get_event(65)
validate_event_webhook(event['path'],'/test')
validate_event_headers(event['headers'],{"header-key":"header-value"})
assert event['body']['payload'] == self.webhook_payload
assert event['body']['created_at'] == db_created_at.replace(" ","T") + "Z"
payload_keys = dict.keys(event['body'])
for k in ["scheduled_time","created_at","id"]: # additional keys
assert k in payload_keys
Expand Down Expand Up @@ -215,7 +230,6 @@ def test_update_existing_cron_trigger(self,hge_ctx):
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp
print ("resp",resp['result'][1][0])
assert json.loads(resp['result'][1][0]) == [{
"name":"header-name",
"value":"header-value"
Expand Down Expand Up @@ -245,6 +259,40 @@ def test_update_existing_cron_trigger(self,hge_ctx):
actual_schedule_timestamps.append(datetime_ts)
assert actual_schedule_timestamps == expected_schedule_timestamps

def test_check_fired_webhook_event(self, hge_ctx, scheduled_triggers_evts_webhook):
q = {
"type":"create_cron_trigger",
"args":{
"name":"test_cron_trigger",
"webhook":"{{SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN}}" + "/test",
"schedule":"* * * * *",
"headers":[
{
"name":"header-key",
"value":"header-value"
}
],
"payload":{"foo":"baz"},
"include_in_metadata":False
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200, resp
# The maximum timeout is set to 120s because, the cron timestamps
# that are generated will start from the next minute, suppose
# the cron schedule is "* * * * *" and the time the cron trigger
# is created is 10:00:00, then the next event will be scheduled
# at 10:01:00, but the events processor will not process it
# exactly at the zeroeth second of 10:01. The only guarantee
# is that, the event processor will process the event before
# 10:02:00. So, in the worst case, it will take 2 minutes
# to process the first scheduled event.
event = scheduled_triggers_evts_webhook.get_event(120)
validate_event_webhook(event['path'],'/test')
validate_event_headers(event['headers'],{"header-key":"header-value"})
assert event['body']['payload'] == {"foo":"baz"}
assert event['body']['name'] == 'test_cron_trigger'

def test_delete_cron_scheduled_trigger(self,hge_ctx):
q = {
"type":"delete_cron_trigger",
Expand All @@ -254,3 +302,11 @@ def test_delete_cron_scheduled_trigger(self,hge_ctx):
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp
q = {
"type":"delete_cron_trigger",
"args":{
"name":"test_cron_trigger"
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp

0 comments on commit 93b6a07

Please sign in to comment.