Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert object sync status events #1360

Merged
merged 5 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ packages:
github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus:
interfaces:
Updater:
UpdateReceiver:
config:
dir: "{{.InterfaceDir}}"
outpkg: "{{.PackageName}}"
inpackage: true
github.com/anyproto/anytype-heart/space/spacecore/peermanager:
interfaces:
Updater:
Expand Down
6 changes: 5 additions & 1 deletion core/syncstatus/filestatus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package syncstatus

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -51,7 +52,10 @@ func (s *service) indexFileSyncStatus(fileObjectId string, status filesyncstatus
if err != nil {
return fmt.Errorf("get object: %w", err)
}

err = s.updateReceiver.UpdateTree(context.Background(), fileObjectId, status.ToSyncStatus())
if err != nil {
return fmt.Errorf("update tree: %w", err)
}
s.sendSpaceStatusUpdate(status, spaceId, bytesLeftPercentage)
return nil
}
Expand Down
116 changes: 116 additions & 0 deletions core/syncstatus/objectsyncstatus/mock_UpdateReceiver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions core/syncstatus/objectsyncstatus/syncstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const (
var log = logger.NewNamed(syncstatus.CName)

type UpdateReceiver interface {
UpdateTree(ctx context.Context, treeId string, status SyncStatus)
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
UpdateNodeStatus()
}

type SyncStatus int
Expand All @@ -51,6 +52,7 @@ type StatusUpdater interface {
type StatusWatcher interface {
Watch(treeId string) (err error)
Unwatch(treeId string)
SetUpdateReceiver(updater UpdateReceiver)
}

type StatusService interface {
Expand All @@ -77,9 +79,10 @@ type Updater interface {

type syncStatusService struct {
sync.Mutex
configuration nodeconf.NodeConf
periodicSync periodicsync.PeriodicSync
storage spacestorage.SpaceStorage
configuration nodeconf.NodeConf
periodicSync periodicsync.PeriodicSync
updateReceiver UpdateReceiver
storage spacestorage.SpaceStorage

spaceId string
treeHeads map[string]treeHeadsEntry
Expand Down Expand Up @@ -127,6 +130,13 @@ func (s *syncStatusService) Name() (name string) {
return syncstatus.CName
}

func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) {
s.Lock()
defer s.Unlock()

s.updateReceiver = updater
}

func (s *syncStatusService) Run(ctx context.Context) error {
s.periodicSync.Run()
return nil
Expand All @@ -152,6 +162,10 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
s.treeStatusBuf = s.treeStatusBuf[:0]

s.Lock()
if s.updateReceiver == nil {
s.Unlock()
return
}
for treeId := range s.watchers {
// that means that we haven't yet got the status update
treeHeads, exists := s.treeHeads[treeId]
Expand All @@ -163,7 +177,12 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus})
}
s.Unlock()
s.updateReceiver.UpdateNodeStatus()
for _, entry := range s.treeStatusBuf {
err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status)
if err != nil {
return
}
s.updateDetails(entry.treeId, mapStatus(entry.status))
}
return
Expand Down
5 changes: 5 additions & 0 deletions core/syncstatus/objectsyncstatus/syncstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func TestSyncStatusService_update(t *testing.T) {
t.Run("update: got updates on objects", func(t *testing.T) {
// given
s := newFixture(t)
updateReceiver := NewMockUpdateReceiver(t)
updateReceiver.EXPECT().UpdateNodeStatus().Return()
updateReceiver.EXPECT().UpdateTree(context.Background(), "id", StatusNotSynced).Return(nil)
s.SetUpdateReceiver(updateReceiver)

// when
s.detailsUpdater.EXPECT().UpdateDetails([]string{"id"}, domain.ObjectSyncing, domain.Null, "spaceId")
Expand All @@ -239,6 +243,7 @@ func TestSyncStatusService_update(t *testing.T) {

// then
assert.Nil(t, err)
updateReceiver.AssertCalled(t, "UpdateTree", context.Background(), "id", StatusNotSynced)
})
}

Expand Down
13 changes: 13 additions & 0 deletions core/syncstatus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/nodeconf"

"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/block/cache"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
"github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
Expand All @@ -27,6 +31,7 @@ type Service interface {
var _ Service = (*service)(nil)

type service struct {
updateReceiver *updateReceiver
fileSyncService filesync.FileSync

objectWatchersLock sync.Mutex
Expand All @@ -53,6 +58,12 @@ func (s *service) Init(a *app.App) (err error) {
s.fileSyncService.OnUploadStarted(s.onFileUploadStarted)
s.fileSyncService.OnLimited(s.onFileLimited)
s.fileSyncService.OnDelete(s.OnFileDelete)

nodeConfService := app.MustComponent[nodeconf.Service](a)
cfg := app.MustComponent[*config.Config](a)
eventSender := app.MustComponent[event.Sender](a)
nodeStatus := app.MustComponent[nodestatus.NodeStatus](a)
s.updateReceiver = newUpdateReceiver(nodeConfService, cfg, eventSender, s.objectStore, nodeStatus)
return nil
}

Expand All @@ -68,7 +79,9 @@ func (s *service) RegisterSpace(space commonspace.Space, sw objectsyncstatus.Sta
s.objectWatchersLock.Lock()
defer s.objectWatchersLock.Unlock()

sw.SetUpdateReceiver(s.updateReceiver)
s.objectWatchers[space.Id()] = sw
s.updateReceiver.spaceId = space.Id()
}

func (s *service) UnregisterSpace(space commonspace.Space) {
Expand Down
Loading
Loading