Skip to content

Commit

Permalink
automod: engine support for adding tags
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Sep 6, 2024
1 parent 4942010 commit 671313d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 10 deletions.
30 changes: 29 additions & 1 deletion automod/engine/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ type Effects struct {
CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names
// Label values which should be applied to the overall account, as a result of rule execution.
AccountLabels []string
// Moderation flags (similar to labels, but private) which should be applied to the overall account, as a result of rule execution.
// Moderation tags (similar to labels, but private) which should be applied to the overall account, as a result of rule execution.
AccountTags []string
// automod flags (metadata) which should be applied to the account as a result of rule execution.
AccountFlags []string
// Reports which should be filed against this account, as a result of rule execution.
AccountReports []ModReport
// If "true", indicates that a rule indicates that the entire account should have a takedown.
AccountTakedown bool
// Same as "AccountLabels", but at record-level
RecordLabels []string
// Same as "AccountTags", but at record-level
RecordTags []string
// Same as "AccountFlags", but at record-level
RecordFlags []string
// Same as "AccountReports", but at record-level
Expand Down Expand Up @@ -96,6 +100,18 @@ func (e *Effects) AddAccountLabel(val string) {
e.AccountLabels = append(e.AccountLabels, val)
}

// Enqueues the provided label (string value) to be added to the account at the end of rule processing.
func (e *Effects) AddAccountTag(val string) {
e.mu.Lock()
defer e.mu.Unlock()
for _, v := range e.AccountTags {
if v == val {
return
}
}
e.AccountTags = append(e.AccountTags, val)
}

// Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing.
func (e *Effects) AddAccountFlag(val string) {
e.mu.Lock()
Expand Down Expand Up @@ -140,6 +156,18 @@ func (e *Effects) AddRecordLabel(val string) {
e.RecordLabels = append(e.RecordLabels, val)
}

// Enqueues the provided tag (string value) to be added to the record at the end of rule processing.
func (e *Effects) AddRecordTag(val string) {
e.mu.Lock()
defer e.mu.Unlock()
for _, v := range e.RecordTags {
if v == val {
return
}
}
e.RecordTags = append(e.RecordTags, val)
}

// Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing.
func (e *Effects) AddRecordFlag(val string) {
e.mu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions automod/engine/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ var actionNewLabelCount = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of new labels persisted",
}, []string{"type", "val"})

var actionNewTagCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "automod_new_action_tags",
Help: "Number of new tags persisted",
}, []string{"type", "val"})

var actionNewFlagCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "automod_new_action_flags",
Help: "Number of new flags persisted",
Expand Down
78 changes: 69 additions & 9 deletions automod/engine/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (eng *Engine) persistCounters(ctx context.Context, eff *Effects) error {
return nil
}

// Persists account-level moderation actions: new labels, new flags, new takedowns, and reports.
// Persists account-level moderation actions: new labels, new tags, new flags, new takedowns, and reports.
//
// If necessary, will "purge" identity and account caches, so that state updates will be picked up for subsequent events.
//
Expand All @@ -42,6 +42,11 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error {

// de-dupe actions
newLabels := dedupeLabelActions(c.effects.AccountLabels, c.Account.AccountLabels, c.Account.AccountNegatedLabels)
existingTags := []string{}
if c.Account.Private != nil {
existingTags = c.Account.Private.AccountTags
}
newTags := dedupeTagActions(c.effects.AccountTags, existingTags)
newFlags := dedupeFlagActions(c.effects.AccountFlags, c.Account.AccountFlags)

// don't report the same account multiple times on the same day for the same reason. this is a quick check; we also query the mod service API just before creating the report.
Expand All @@ -58,7 +63,7 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error {
return fmt.Errorf("circuit-breaking takedowns: %w", err)
}

anyModActions := newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0
anyModActions := newTakedown || len(newLabels) > 0 || len(newTags) > 0 || len(newFlags) > 0 || len(newReports) > 0
if anyModActions && eng.Notifier != nil {
for _, srv := range dedupeStrings(c.effects.NotifyServices) {
if err := eng.Notifier.SendAccount(ctx, srv, c); err != nil {
Expand Down Expand Up @@ -87,7 +92,7 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error {
xrpcc := eng.OzoneClient

if len(newLabels) > 0 {
c.Logger.Info("labeling record", "newLabels", newLabels)
c.Logger.Info("labeling account", "newLabels", newLabels)
for _, val := range newLabels {
// note: WithLabelValues is a prometheus label, not an atproto label
actionNewLabelCount.WithLabelValues("account", val).Inc()
Expand All @@ -113,6 +118,33 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error {
}
}

if len(newTags) > 0 {
c.Logger.Info("tagging account", "newTags", newTags)
for _, val := range newTags {
// note: WithLabelValues is a prometheus label, not an atproto label
actionNewTagCount.WithLabelValues("account", val).Inc()
}
comment := "[automod]: auto-tagging account"
_, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{
CreatedBy: xrpcc.Auth.Did,
Event: &toolsozone.ModerationEmitEvent_Input_Event{
ModerationDefs_ModEventTag: &toolsozone.ModerationDefs_ModEventTag{
Add: newTags,
Remove: []string{},
Comment: &comment,
},
},
Subject: &toolsozone.ModerationEmitEvent_Input_Subject{
AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{
Did: c.Account.Identity.DID.String(),
},
},
})
if err != nil {
c.Logger.Error("failed to create account tags", "err", err)
}
}

// reports are additionally de-duped when persisting the action, so track with a flag
createdReports := false
for _, mr := range newReports {
Expand Down Expand Up @@ -147,15 +179,15 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error {
}
}

needCachePurge := newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || createdReports
needCachePurge := newTakedown || len(newLabels) > 0 || len(newTags) > 0 || len(newFlags) > 0 || createdReports
if needCachePurge {
return eng.PurgeAccountCaches(ctx, c.Account.Identity.DID)
}

return nil
}

// Persists some record-level state: labels, takedowns, reports.
// Persists some record-level state: labels, tags, takedowns, reports.
//
// NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state.
func (eng *Engine) persistRecordModActions(c *RecordContext) error {
Expand All @@ -166,7 +198,9 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error {

atURI := c.RecordOp.ATURI().String()
newLabels := dedupeStrings(c.effects.RecordLabels)
if len(newLabels) > 0 && eng.OzoneClient != nil {
newTags := dedupeStrings(c.effects.RecordTags)
if (len(newLabels) > 0 || len(newTags) > 0) && eng.OzoneClient != nil {
// fetch existing record labels, tags, etc
rv, err := toolsozone.ModerationGetRecord(ctx, eng.OzoneClient, c.RecordOp.CID.String(), c.RecordOp.ATURI().String())
if err != nil {
// NOTE: there is a frequent 4xx error here from Ozone because this record has not been indexed yet
Expand All @@ -183,10 +217,11 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error {
}
existingLabels = dedupeStrings(existingLabels)
negLabels = dedupeStrings(negLabels)
// fetch existing record labels
newLabels = dedupeLabelActions(newLabels, existingLabels, negLabels)
newTags = dedupeTagActions(newTags, rv.Moderation.SubjectStatus.Tags)
}
}

newFlags := dedupeStrings(c.effects.RecordFlags)
if len(newFlags) > 0 {
// fetch existing flags, and de-dupe
Expand All @@ -211,7 +246,7 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error {
return fmt.Errorf("failed to circuit break takedowns: %w", err)
}

if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 {
if newTakedown || len(newLabels) > 0 || len(newTags) > 0 || len(newFlags) > 0 || len(newReports) > 0 {
if eng.Notifier != nil {
for _, srv := range dedupeStrings(c.effects.NotifyServices) {
if err := eng.Notifier.SendRecord(ctx, srv, c); err != nil {
Expand All @@ -231,7 +266,7 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error {
}

// exit early
if !newTakedown && len(newLabels) == 0 && len(newReports) == 0 {
if !newTakedown && len(newLabels) == 0 && len(newTags) == 0 && len(newReports) == 0 {
return nil
}

Expand Down Expand Up @@ -276,6 +311,31 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error {
}
}

if len(newTags) > 0 {
c.Logger.Info("tagging record", "newTags", newTags)
for _, val := range newTags {
// note: WithLabelValues is a prometheus label, not an atproto label
actionNewTagCount.WithLabelValues("record", val).Inc()
}
comment := "[automod]: auto-tagging record"
_, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{
CreatedBy: xrpcc.Auth.Did,
Event: &toolsozone.ModerationEmitEvent_Input_Event{
ModerationDefs_ModEventTag: &toolsozone.ModerationDefs_ModEventTag{
Add: newLabels,
Remove: []string{},
Comment: &comment,
},
},
Subject: &toolsozone.ModerationEmitEvent_Input_Subject{
RepoStrongRef: &strongRef,
},
})
if err != nil {
c.Logger.Error("failed to create record tag", "err", err)
}
}

for _, mr := range newReports {
_, err := eng.createRecordReportIfFresh(ctx, xrpcc, c.RecordOp.ATURI(), c.RecordOp.CID, mr)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions automod/engine/persisthelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ func dedupeLabelActions(labels, existing, existingNegated []string) []string {
return newLabels
}

func dedupeTagActions(tags, existing []string) []string {
newTags := []string{}
for _, val := range dedupeStrings(tags) {
exists := false
for _, e := range existing {
if val == e {
exists = true
break
}
}
if !exists {
newTags = append(newTags, val)
}
}
return newTags
}

func dedupeFlagActions(flags, existing []string) []string {
newFlags := []string{}
for _, val := range dedupeStrings(flags) {
Expand Down

0 comments on commit 671313d

Please sign in to comment.