Skip to content

Commit

Permalink
Handle out of order events (#5071)
Browse files Browse the repository at this point in the history
* Handle out of order events

Signed-off-by: Faisal Memon <fymemon@yahoo.com>
Signed-off-by: Marcos Yacob <marcos.yacob@hpe.com>
Co-authored-by: Marcos Yacob <marcosyacob@gmail.com>
Co-authored-by: Marcos Yacob <marcos.yacob@hpe.com>
  • Loading branch information
3 people committed Jun 4, 2024
1 parent 4a90400 commit 09e0e36
Show file tree
Hide file tree
Showing 14 changed files with 826 additions and 162 deletions.
17 changes: 13 additions & 4 deletions cmd/spire-server/cli/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type serverConfig struct {
}

type experimentalConfig struct {
AuthOpaPolicyEngine *authpolicy.OpaEngineConfig `hcl:"auth_opa_policy_engine"`
CacheReloadInterval string `hcl:"cache_reload_interval"`
EventsBasedCache bool `hcl:"events_based_cache"`
PruneEventsOlderThan string `hcl:"prune_events_older_than"`
AuthOpaPolicyEngine *authpolicy.OpaEngineConfig `hcl:"auth_opa_policy_engine"`
CacheReloadInterval string `hcl:"cache_reload_interval"`
EventsBasedCache bool `hcl:"events_based_cache"`
PruneEventsOlderThan string `hcl:"prune_events_older_than"`
SQLTransactionTimeout string `hcl:"sql_transaction_timeout"`

Flags fflag.RawConfig `hcl:"feature_flags"`

Expand Down Expand Up @@ -677,6 +678,14 @@ func NewServerConfig(c *Config, logOptions []log.Option, allowUnknownConfig bool
sc.PruneEventsOlderThan = interval
}

if c.Server.Experimental.SQLTransactionTimeout != "" {
interval, err := time.ParseDuration(c.Server.Experimental.SQLTransactionTimeout)
if err != nil {
return nil, fmt.Errorf("could not parse SQL transaction timeout interval: %w", err)
}
sc.SQLTransactionTimeout = interval
}

if c.Server.Experimental.EventsBasedCache {
sc.Log.Info("Using events based cache")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ const (
// non-error level.
Error = "error"

// EventID tags an event ID
EventID = "event_id"

// Expect tags an expected value, as opposed to the one received. Message should clarify
// what kind of value was expected, and a different field should show the received value
Expect = "expect"
Expand Down
36 changes: 30 additions & 6 deletions pkg/common/telemetry/server/datastore/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,21 @@ func StartPruneRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.Cal
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Prune)
}

// StartGetLatestRegistrationEntryEventIDCall return metric
// for server's datastore, on listing latest registration entry event id.
func StartGetLatestRegistrationEntryEventIDCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartCreateRegistrationEntryEventForTestingCall return metric
// for server's datastore, on creating a registration entry event.
func StartCreateRegistrationEntryEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Create)
}

// StartDeleteRegistrationEntryEventForTestingCall return metric
// for server's datastore, on deleting a registration entry event.
func StartDeleteRegistrationEntryEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Delete)
}

// StartFetchRegistrationEntryEventCall return metric
// for server's datastore, on fetching a registration entry event.
func StartFetchRegistrationEntryEventCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Fetch)
}

Expand All @@ -34,8 +46,20 @@ func StartPruneAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCount
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Prune)
}

// StartGetLatestAttestedNodeEventIDCall return metric
// for server's datastore, on listing attested node event id.
func StartGetLatestAttestedNodeEventIDCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartCreateAttestedNodeEventForTestingCall return metric
// for server's datastore, on creating an attested node event.
func StartCreateAttestedNodeEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Create)
}

// StartDeleteAttestedNodeEventForTestingCall return metric
// for server's datastore, on deleting an attested node event.
func StartDeleteAttestedNodeEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Delete)
}

// StartFetchAttestedNodeEventCall return metric
// for server's datastore, on fetching an attested node event.
func StartFetchAttestedNodeEventCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Fetch)
}
48 changes: 36 additions & 12 deletions pkg/common/telemetry/server/datastore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (w metricsWrapper) CreateAttestedNode(ctx context.Context, node *common.Att
return w.ds.CreateAttestedNode(ctx, node)
}

func (w metricsWrapper) CreateAttestedNodeEventForTesting(ctx context.Context, event *datastore.AttestedNodeEvent) (err error) {
callCounter := StartCreateAttestedNodeEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.CreateAttestedNodeEventForTesting(ctx, event)
}

func (w metricsWrapper) CreateBundle(ctx context.Context, bundle *common.Bundle) (_ *common.Bundle, err error) {
callCounter := StartCreateBundleCall(w.m)
defer callCounter.Done(&err)
Expand All @@ -60,6 +66,12 @@ func (w metricsWrapper) CreateOrReturnRegistrationEntry(ctx context.Context, ent
return w.ds.CreateOrReturnRegistrationEntry(ctx, entry)
}

func (w metricsWrapper) CreateRegistrationEntryEventForTesting(ctx context.Context, event *datastore.RegistrationEntryEvent) (err error) {
callCounter := StartCreateRegistrationEntryEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.CreateRegistrationEntryEventForTesting(ctx, event)
}

func (w metricsWrapper) CreateFederationRelationship(ctx context.Context, fr *datastore.FederationRelationship) (_ *datastore.FederationRelationship, err error) {
callCounter := StartCreateFederationRelationshipCall(w.m)
defer callCounter.Done(&err)
Expand All @@ -78,6 +90,12 @@ func (w metricsWrapper) DeleteAttestedNode(ctx context.Context, spiffeID string)
return w.ds.DeleteAttestedNode(ctx, spiffeID)
}

func (w metricsWrapper) DeleteAttestedNodeEventForTesting(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteAttestedNodeEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.DeleteAttestedNodeEventForTesting(ctx, eventID)
}

func (w metricsWrapper) DeleteBundle(ctx context.Context, trustDomain string, mode datastore.DeleteMode) (err error) {
callCounter := StartDeleteBundleCall(w.m)
defer callCounter.Done(&err)
Expand All @@ -102,12 +120,24 @@ func (w metricsWrapper) DeleteRegistrationEntry(ctx context.Context, entryID str
return w.ds.DeleteRegistrationEntry(ctx, entryID)
}

func (w metricsWrapper) DeleteRegistrationEntryEventForTesting(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteRegistrationEntryEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.DeleteRegistrationEntryEventForTesting(ctx, eventID)
}

func (w metricsWrapper) FetchAttestedNode(ctx context.Context, spiffeID string) (_ *common.AttestedNode, err error) {
callCounter := StartFetchNodeCall(w.m)
defer callCounter.Done(&err)
return w.ds.FetchAttestedNode(ctx, spiffeID)
}

func (w metricsWrapper) FetchAttestedNodeEvent(ctx context.Context, eventID uint) (_ *datastore.AttestedNodeEvent, err error) {
callCounter := StartFetchAttestedNodeEventCall(w.m)
defer callCounter.Done(&err)
return w.ds.FetchAttestedNodeEvent(ctx, eventID)
}

func (w metricsWrapper) FetchBundle(ctx context.Context, trustDomain string) (_ *common.Bundle, err error) {
callCounter := StartFetchBundleCall(w.m)
defer callCounter.Done(&err)
Expand All @@ -126,22 +156,16 @@ func (w metricsWrapper) FetchRegistrationEntry(ctx context.Context, entryID stri
return w.ds.FetchRegistrationEntry(ctx, entryID)
}

func (w metricsWrapper) FetchFederationRelationship(ctx context.Context, trustDomain spiffeid.TrustDomain) (_ *datastore.FederationRelationship, err error) {
callCounter := StartFetchFederationRelationshipCall(w.m)
defer callCounter.Done(&err)
return w.ds.FetchFederationRelationship(ctx, trustDomain)
}

func (w metricsWrapper) GetLatestAttestedNodeEventID(ctx context.Context) (_ uint, err error) {
callCounter := StartGetLatestAttestedNodeEventIDCall(w.m)
func (w metricsWrapper) FetchRegistrationEntryEvent(ctx context.Context, eventID uint) (_ *datastore.RegistrationEntryEvent, err error) {
callCounter := StartFetchRegistrationEntryEventCall(w.m)
defer callCounter.Done(&err)
return w.ds.GetLatestAttestedNodeEventID(ctx)
return w.ds.FetchRegistrationEntryEvent(ctx, eventID)
}

func (w metricsWrapper) GetLatestRegistrationEntryEventID(ctx context.Context) (_ uint, err error) {
callCounter := StartGetLatestRegistrationEntryEventIDCall(w.m)
func (w metricsWrapper) FetchFederationRelationship(ctx context.Context, trustDomain spiffeid.TrustDomain) (_ *datastore.FederationRelationship, err error) {
callCounter := StartFetchFederationRelationshipCall(w.m)
defer callCounter.Done(&err)
return w.ds.GetLatestRegistrationEntryEventID(ctx)
return w.ds.FetchFederationRelationship(ctx, trustDomain)
}

func (w metricsWrapper) GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency datastore.DataConsistency) (_ []*common.Selector, err error) {
Expand Down
60 changes: 46 additions & 14 deletions pkg/common/telemetry/server/datastore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.node.create",
methodName: "CreateAttestedNode",
},
{
key: "datastore.node_event.create",
methodName: "CreateAttestedNodeEventForTesting",
},
{
key: "datastore.bundle.create",
methodName: "CreateBundle",
Expand All @@ -78,10 +82,18 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.registration_entry.create",
methodName: "CreateOrReturnRegistrationEntry",
},
{
key: "datastore.registration_entry_event.create",
methodName: "CreateRegistrationEntryEventForTesting",
},
{
key: "datastore.node.delete",
methodName: "DeleteAttestedNode",
},
{
key: "datastore.node_event.delete",
methodName: "DeleteAttestedNodeEventForTesting",
},
{
key: "datastore.bundle.delete",
methodName: "DeleteBundle",
Expand All @@ -98,10 +110,18 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.registration_entry.delete",
methodName: "DeleteRegistrationEntry",
},
{
key: "datastore.registration_entry_event.delete",
methodName: "DeleteRegistrationEntryEventForTesting",
},
{
key: "datastore.node.fetch",
methodName: "FetchAttestedNode",
},
{
key: "datastore.node_event.fetch",
methodName: "FetchAttestedNodeEvent",
},
{
key: "datastore.bundle.fetch",
methodName: "FetchBundle",
Expand All @@ -115,16 +135,12 @@ func TestWithMetrics(t *testing.T) {
methodName: "FetchRegistrationEntry",
},
{
key: "datastore.federation_relationship.fetch",
methodName: "FetchFederationRelationship",
},
{
key: "datastore.node_event.fetch",
methodName: "GetLatestAttestedNodeEventID",
key: "datastore.registration_entry_event.fetch",
methodName: "FetchRegistrationEntryEvent",
},
{
key: "datastore.registration_entry_event.fetch",
methodName: "GetLatestRegistrationEntryEventID",
key: "datastore.federation_relationship.fetch",
methodName: "FetchFederationRelationship",
},
{
key: "datastore.node.selectors.fetch",
Expand Down Expand Up @@ -334,6 +350,10 @@ func (ds *fakeDataStore) CreateAttestedNode(context.Context, *common.AttestedNod
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) CreateAttestedNodeEventForTesting(context.Context, *datastore.AttestedNodeEvent) error {
return ds.err
}

func (ds *fakeDataStore) CreateBundle(context.Context, *common.Bundle) (*common.Bundle, error) {
return &common.Bundle{}, ds.err
}
Expand All @@ -358,10 +378,18 @@ func (ds *fakeDataStore) CreateOrReturnRegistrationEntry(context.Context, *commo
return &common.RegistrationEntry{}, true, ds.err
}

func (ds *fakeDataStore) CreateRegistrationEntryEventForTesting(context.Context, *datastore.RegistrationEntryEvent) error {
return ds.err
}

func (ds *fakeDataStore) DeleteAttestedNode(context.Context, string) (*common.AttestedNode, error) {
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) DeleteAttestedNodeEventForTesting(context.Context, uint) error {
return ds.err
}

func (ds *fakeDataStore) DeleteBundle(context.Context, string, datastore.DeleteMode) error {
return ds.err
}
Expand All @@ -378,10 +406,18 @@ func (ds *fakeDataStore) DeleteRegistrationEntry(context.Context, string) (*comm
return &common.RegistrationEntry{}, ds.err
}

func (ds *fakeDataStore) DeleteRegistrationEntryEventForTesting(context.Context, uint) error {
return ds.err
}

func (ds *fakeDataStore) FetchAttestedNode(context.Context, string) (*common.AttestedNode, error) {
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) FetchAttestedNodeEvent(context.Context, uint) (*datastore.AttestedNodeEvent, error) {
return &datastore.AttestedNodeEvent{}, ds.err
}

func (ds *fakeDataStore) FetchBundle(context.Context, string) (*common.Bundle, error) {
return &common.Bundle{}, ds.err
}
Expand All @@ -398,12 +434,8 @@ func (ds *fakeDataStore) FetchRegistrationEntry(context.Context, string) (*commo
return &common.RegistrationEntry{}, ds.err
}

func (ds *fakeDataStore) GetLatestAttestedNodeEventID(context.Context) (uint, error) {
return 0, ds.err
}

func (ds *fakeDataStore) GetLatestRegistrationEntryEventID(context.Context) (uint, error) {
return 0, ds.err
func (ds *fakeDataStore) FetchRegistrationEntryEvent(context.Context, uint) (*datastore.RegistrationEntryEvent, error) {
return &datastore.RegistrationEntryEvent{}, ds.err
}

func (ds *fakeDataStore) GetNodeSelectors(context.Context, string, datastore.DataConsistency) ([]*common.Selector, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Config struct {
// PruneEventsOlderThan controls how long events can live before they are pruned
PruneEventsOlderThan time.Duration

// SQLTransactionTimeout controls how long to wait for an event before giving up
SQLTransactionTimeout time.Duration

// AuthPolicyEngineConfig determines the config for authz policy
AuthOpaPolicyEngineConfig *authpolicy.OpaEngineConfig

Expand Down
8 changes: 6 additions & 2 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type DataStore interface {
// Entries Events
ListRegistrationEntriesEvents(ctx context.Context, req *ListRegistrationEntriesEventsRequest) (*ListRegistrationEntriesEventsResponse, error)
PruneRegistrationEntriesEvents(ctx context.Context, olderThan time.Duration) error
GetLatestRegistrationEntryEventID(ctx context.Context) (uint, error)
FetchRegistrationEntryEvent(ctx context.Context, eventID uint) (*RegistrationEntryEvent, error)
CreateRegistrationEntryEventForTesting(ctx context.Context, event *RegistrationEntryEvent) error
DeleteRegistrationEntryEventForTesting(ctx context.Context, eventID uint) error

// Nodes
CountAttestedNodes(context.Context, *CountAttestedNodesRequest) (int32, error)
Expand All @@ -56,7 +58,9 @@ type DataStore interface {
// Nodes Events
ListAttestedNodesEvents(ctx context.Context, req *ListAttestedNodesEventsRequest) (*ListAttestedNodesEventsResponse, error)
PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error
GetLatestAttestedNodeEventID(ctx context.Context) (uint, error)
FetchAttestedNodeEvent(ctx context.Context, eventID uint) (*AttestedNodeEvent, error)
CreateAttestedNodeEventForTesting(ctx context.Context, event *AttestedNodeEvent) error
DeleteAttestedNodeEventForTesting(ctx context.Context, eventID uint) error

// Node selectors
GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency DataConsistency) ([]*common.Selector, error)
Expand Down
Loading

0 comments on commit 09e0e36

Please sign in to comment.