Skip to content

Commit

Permalink
feat: lock the syncer log
Browse files Browse the repository at this point in the history
  • Loading branch information
rainest committed Dec 14, 2023
1 parent c3e6016 commit 78f4f43
Showing 1 changed file with 42 additions and 10 deletions.
52 changes: 42 additions & 10 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Syncer struct {
noMaskValues bool

isKonnect bool

eventLog EntityChanges
eventLogMutex sync.Mutex
}

type SyncerOpts struct {
Expand Down Expand Up @@ -167,6 +170,13 @@ func (sc *Syncer) init() error {
sc.processor.MustRegister(crud.Kind(entityType), entity.CRUDActions())
sc.entityDiffers[entityType] = entity.Differ()
}

sc.eventLog = EntityChanges{
Creating: []EntityState{},
Updating: []EntityState{},
Deleting: []EntityState{},
}

return nil
}

Expand Down Expand Up @@ -295,6 +305,34 @@ func (sc *Syncer) wait() {
}
}

// LogCreate adds a create action to the event log.
func (sc *Syncer) LogCreate(state EntityState) {
sc.eventLogMutex.Lock()
defer sc.eventLogMutex.Unlock()
sc.eventLog.Creating = append(sc.eventLog.Creating, state)
}

// LogUpdate adds an update action to the event log.
func (sc *Syncer) LogUpdate(state EntityState) {
sc.eventLogMutex.Lock()
defer sc.eventLogMutex.Unlock()
sc.eventLog.Updating = append(sc.eventLog.Updating, state)
}

// LogDelete adds a delete action to the event log.
func (sc *Syncer) LogDelete(state EntityState) {
sc.eventLogMutex.Lock()
defer sc.eventLogMutex.Unlock()
sc.eventLog.Deleting = append(sc.eventLog.Deleting, state)
}

// GetEventLog returns the syncer event log.
func (sc *Syncer) GetEventLog() EntityChanges {
sc.eventLogMutex.Lock()
defer sc.eventLogMutex.Unlock()
return sc.eventLog
}

// Run starts a diff and invokes d for every diff.
func (sc *Syncer) Run(ctx context.Context, parallelism int, action Do) []error {
if parallelism < 1 {
Expand Down Expand Up @@ -478,12 +516,6 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
}
}

output := EntityChanges{
Creating: []EntityState{},
Updating: []EntityState{},
Deleting: []EntityState{},
}

// NOTE TRC the length makes it confusing to read, but the code below _isn't being run here_, it's an anon func
// arg to Run(), which parallelizes it. However, because it's defined in Solve()'s scope, the output created above
// is available in aggregate and contains most of the content we need already
Expand All @@ -506,17 +538,17 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
// NOTE TRC currently we emit lines here, need to collect objects instead
switch e.Op {
case crud.Create:
output.Creating = append(output.Creating, item)
sc.LogCreate(item)
case crud.Update:
// TODO TRC this is not currently available in the item EntityState
diffString, err := generateDiffString(e, false, sc.noMaskValues)
if err != nil {
return nil, err
}
item.Diff = diffString
output.Updating = append(output.Updating, item)
sc.LogUpdate(item)
case crud.Delete:
output.Deleting = append(output.Deleting, item)
sc.LogDelete(item)
default:
panic("unknown operation " + e.Op.String())
}
Expand All @@ -542,5 +574,5 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
// event struct
return result, nil
})
return stats, errs, output
return stats, errs, sc.GetEventLog()
}

0 comments on commit 78f4f43

Please sign in to comment.