Skip to content

Commit

Permalink
Remove the coordinator
Browse files Browse the repository at this point in the history
Remove the policy coordinator and policy leader election mechanisms
from fleet-server. Deprecate the coordinator_idx value in
fleet-server's json schema and remove coordinator_idx references when
processing policies.
  • Loading branch information
michel-laterman committed Nov 29, 2023
1 parent 9417467 commit f691382
Show file tree
Hide file tree
Showing 28 changed files with 63 additions and 1,518 deletions.
24 changes: 6 additions & 18 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,30 +384,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()
// If more than one, pick the winner;
// 0) Correct policy id
// 1) Highest revision/coordinator number
// 1) Highest revision number

found := false
currRev := agent.PolicyRevisionIdx
currCoord := agent.PolicyCoordinatorIdx
vSpan, _ := apm.StartSpan(ctx, "checkPolicyActions", "validate")
for _, a := range actionIds {
rev, ok := policy.RevisionFromString(a)

zlog.Debug().
Str("agent.policyId", agent.PolicyID).
Int64("agent.revisionIdx", currRev).
Int64("agent.coordinatorIdx", currCoord).
Str("rev.policyId", rev.PolicyID).
Int64("rev.revisionIdx", rev.RevisionIdx).
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID && rev.RevisionIdx > currRev {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
}
}

Expand All @@ -432,7 +426,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag

err := ack.updateAgentDoc(ctx, zlog,
agent.Id,
currRev, currCoord,
currRev,
agent.PolicyID)
if err != nil {
return err
Expand Down Expand Up @@ -506,15 +500,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog zerolog.Logger,
agentID string,
currRev, currCoord int64,
currRev int64,
policyID string,
) error {
span, ctx := apm.StartSpan(ctx, "updateAgentDoc", "update")
defer span.End()
body := makeUpdatePolicyBody(
policyID,
currRev,
currCoord,
)

err := ack.bulk.Update(
Expand All @@ -529,7 +522,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog.Err(err).
Str(LogPolicyID, policyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Msg("ack policy")

if err != nil {
Expand Down Expand Up @@ -708,7 +700,7 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
// an agent while we were busy updating the old one. A blind update to the
// agent record without a check could set the revision and coordIdx for the wrong
// agent record without a check could set the revision for the wrong
// policy. This script should be coupled with a "retry_on_conflict" parameter
// to allow for *other* changes to the agent record while we running the script.
// (For example, say the background bulk check-in timestamp update task fires)
Expand All @@ -718,12 +710,10 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` +
dl.FieldPolicyRevisionIdx +
` = params.rev;ctx._source.` +
dl.FieldPolicyCoordinatorIdx +
`= params.coord;ctx._source.` +
dl.FieldUpdatedAt +
` = params.ts;} else {ctx.op = \"noop\";}","params": {"id":"`

func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
func makeUpdatePolicyBody(policyID string, newRev int64) []byte {
var buf bytes.Buffer
buf.Grow(410)

Expand All @@ -732,8 +722,6 @@ func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
buf.WriteString(policyID)
buf.WriteString(`","rev":`)
buf.WriteString(strconv.FormatInt(newRev, 10))
buf.WriteString(`,"coord":`)
buf.WriteString(strconv.FormatInt(coordIdx, 10))
buf.WriteString(`,"ts":"`)
buf.WriteString(time.Now().UTC().Format(time.RFC3339))
buf.WriteString(`"}}}`)
Expand Down
7 changes: 2 additions & 5 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ func BenchmarkMakeUpdatePolicyBody(b *testing.B) {

const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

for n := 0; n < b.N; n++ {
makeUpdatePolicyBody(policyID, newRev, coord)
makeUpdatePolicyBody(policyID, newRev)
}
}

func TestMakeUpdatePolicyBody(t *testing.T) {

const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

data := makeUpdatePolicyBody(policyID, newRev, coord)
data := makeUpdatePolicyBody(policyID, newRev)

var i interface{}
err := json.Unmarshal(data, &i)
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
actCh := aSub.Ch()

// Subscribe to policy manager for changes on PolicyId > policyRev
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx, agent.PolicyCoordinatorIdx)
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx)
if err != nil {
return fmt.Errorf("subscribe policy monitor: %w", err)
}
Expand Down Expand Up @@ -730,7 +730,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
zlog = zlog.With().
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

Expand Down
33 changes: 0 additions & 33 deletions internal/pkg/coordinator/coordinator.go

This file was deleted.

Loading

0 comments on commit f691382

Please sign in to comment.