diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index 3f8ff1c..d6c3294 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -85,6 +85,9 @@ type Syncer struct { noMaskValues bool isKonnect bool + + eventLog EntityChanges + eventLogMutex sync.Mutex } type SyncerOpts struct { @@ -167,6 +170,13 @@ func (sc *Syncer) init() error { sc.processor.MustRegister(crud.Kind(entityType), entity.CRUDActions()) sc.entityDiffers[entityType] = entity.Differ() } + + sc.eventLog = EntityChanges{ + Creating: []EntityState{}, + Updating: []EntityState{}, + Deleting: []EntityState{}, + } + return nil } @@ -295,6 +305,34 @@ func (sc *Syncer) wait() { } } +// LogCreate adds a create action to the event log. +func (sc *Syncer) LogCreate(state EntityState) { + sc.eventLogMutex.Lock() + defer sc.eventLogMutex.Unlock() + sc.eventLog.Creating = append(sc.eventLog.Creating, state) +} + +// LogUpdate adds an update action to the event log. +func (sc *Syncer) LogUpdate(state EntityState) { + sc.eventLogMutex.Lock() + defer sc.eventLogMutex.Unlock() + sc.eventLog.Updating = append(sc.eventLog.Updating, state) +} + +// LogDelete adds a delete action to the event log. +func (sc *Syncer) LogDelete(state EntityState) { + sc.eventLogMutex.Lock() + defer sc.eventLogMutex.Unlock() + sc.eventLog.Deleting = append(sc.eventLog.Deleting, state) +} + +// GetEventLog returns the syncer event log. +func (sc *Syncer) GetEventLog() EntityChanges { + sc.eventLogMutex.Lock() + defer sc.eventLogMutex.Unlock() + return sc.eventLog +} + // Run starts a diff and invokes d for every diff. func (sc *Syncer) Run(ctx context.Context, parallelism int, action Do) []error { if parallelism < 1 { @@ -478,12 +516,6 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu } } - output := EntityChanges{ - Creating: []EntityState{}, - Updating: []EntityState{}, - Deleting: []EntityState{}, - } - // NOTE TRC the length makes it confusing to read, but the code below _isn't being run here_, it's an anon func // arg to Run(), which parallelizes it. However, because it's defined in Solve()'s scope, the output created above // is available in aggregate and contains most of the content we need already @@ -506,7 +538,7 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu // NOTE TRC currently we emit lines here, need to collect objects instead switch e.Op { case crud.Create: - output.Creating = append(output.Creating, item) + sc.LogCreate(item) case crud.Update: // TODO TRC this is not currently available in the item EntityState diffString, err := generateDiffString(e, false, sc.noMaskValues) @@ -514,9 +546,9 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu return nil, err } item.Diff = diffString - output.Updating = append(output.Updating, item) + sc.LogUpdate(item) case crud.Delete: - output.Deleting = append(output.Deleting, item) + sc.LogDelete(item) default: panic("unknown operation " + e.Op.String()) } @@ -542,5 +574,5 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu // event struct return result, nil }) - return stats, errs, output + return stats, errs, sc.GetEventLog() }