Skip to content

Commit

Permalink
Merge updates from 0.11.x series
Browse files Browse the repository at this point in the history
feat(responsemanager): clarify response completion (#304)

only delete requests when they finish going over the network. put requests that are not processing
but still going over the network in a state of CompletingSend

feat(taskqueue): fix race on peer state gather (#303)
  • Loading branch information
hannahhoward committed Dec 9, 2021
1 parent 4fc82a6 commit a081634
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 110 deletions.
5 changes: 5 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ const (
Running
// Paused means a request is paused
Paused
// CompletingSend means we have processed a query and are waiting for data to
// go over the network
CompletingSend
)

func (rs RequestState) String() string {
Expand All @@ -355,6 +358,8 @@ func (rs RequestState) String() string {
return "running"
case Paused:
return "paused"
case CompletingSend:
return "completing send"
default:
return "unrecognized request state"
}
Expand Down
2 changes: 0 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager,
outgoingBlockHooks,
requestUpdatedHooks,
requestorCancelledListeners,
responseAssembler,
network.ConnectionManager(),
)
graphSync := &GraphSync{
network: network,
Expand Down
17 changes: 10 additions & 7 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,17 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
}

func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState {
requestStates := make(graphsync.RequestStates)
for id, ipr := range rm.inProgressRequestStatuses {
if ipr.p == p {
requestStates[id] = graphsync.RequestState(ipr.state)
var peerState peerstate.PeerState
rm.requestQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
requestStates := make(graphsync.RequestStates)
for id, ipr := range rm.inProgressRequestStatuses {
if ipr.p == p {
requestStates[id] = graphsync.RequestState(ipr.state)
}
}
}
peerTopics := rm.requestQueue.PeerTopics(p)
return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
})
return peerState
}

func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState {
Expand Down
10 changes: 10 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
}

// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
Expand Down
14 changes: 14 additions & 0 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,17 @@ func (psm *peerStateMessage) handle(rm *ResponseManager) {
case <-rm.ctx.Done():
}
}

type terminateRequestMessage struct {
p peer.ID
requestID graphsync.RequestID
done chan<- struct{}
}

func (trm *terminateRequestMessage) handle(rm *ResponseManager) {
rm.terminateRequest(responseKey{trm.p, trm.requestID})
select {
case <-rm.ctx.Done():
case trm.done <- struct{}{}:
}
}
37 changes: 10 additions & 27 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
Expand Down Expand Up @@ -54,32 +53,26 @@ type ResponseSignals struct {

// QueryExecutor is responsible for performing individual requests by executing their traversals
type QueryExecutor struct {
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
connManager network.ConnManager
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
responseAssembler ResponseAssembler
}

// New creates a new QueryExecutor
func New(ctx context.Context,
manager Manager,
blockHooks BlockHooks,
updateHooks UpdateHooks,
cancelledListeners CancelledListeners,
responseAssembler ResponseAssembler,
connManager network.ConnManager,
) *QueryExecutor {
qm := &QueryExecutor{
blockHooks: blockHooks,
updateHooks: updateHooks,
cancelledListeners: cancelledListeners,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
connManager: connManager,
blockHooks: blockHooks,
updateHooks: updateHooks,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
}
return qm
}
Expand All @@ -106,11 +99,6 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
isCancelled := err != nil && ipldutil.IsContextCancelErr(err)
if isCancelled {
qe.connManager.Unprotect(pid, rt.Request.ID().Tag())
qe.cancelledListeners.NotifyCancelledListeners(pid, rt.Request)
}
qe.manager.FinishTask(task, err)
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
return false
Expand Down Expand Up @@ -286,11 +274,6 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
Expand Down
72 changes: 25 additions & 47 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
Expand All @@ -44,7 +43,6 @@ func TestOneBlockTask(t *testing.T) {
notifeeExpect(t, td, 1, td.responseCode)
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
}

func TestSmallGraphTask(t *testing.T) {
Expand Down Expand Up @@ -83,7 +81,6 @@ func TestSmallGraphTask(t *testing.T) {
notifeeExpect(t, td, 10, td.responseCode) // AddNotifee called on all blocks
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by hook", func(t *testing.T) {
Expand All @@ -98,7 +95,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by signal", func(t *testing.T) {
Expand All @@ -117,7 +113,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("partial cancelled by hook", func(t *testing.T) {
Expand All @@ -130,7 +125,6 @@ func TestSmallGraphTask(t *testing.T) {
transactionExpect(t, td, []int{6, 7}, ipldutil.ContextCancelError{}.Error()) // last 2 transactions are ContextCancelled

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.cancelledCalls)
require.Equal(t, 1, td.clearRequestCalls)
})

Expand All @@ -153,7 +147,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, 0, td.clearRequestCalls)
// cancelled by signal doesn't mean we get a cancelled call here
// ErrCancelledByCommand is treated differently to a context cancellation error
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by hook", func(t *testing.T) {
Expand All @@ -168,7 +161,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by signal", func(t *testing.T) {
Expand All @@ -189,7 +181,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by hook", func(t *testing.T) {
Expand All @@ -204,7 +195,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by signal", func(t *testing.T) {
Expand All @@ -225,7 +215,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("first block wont load", func(t *testing.T) {
Expand All @@ -238,7 +227,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})
}

Expand Down Expand Up @@ -277,34 +265,31 @@ func newRandomBlock(index int64) *blockData {
}

type testData struct {
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
connManager *testutil.TestConnManager
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
cancelledListeners *listeners.RequestorCancelledListeners
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
cancelledCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
}

func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, *QueryExecutor) {
Expand All @@ -318,10 +303,8 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.task = &peertask.Task{}
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task}
td.responseAssembler = &fauxResponseAssembler{}
td.connManager = testutil.NewTestConnManager()
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
td.requestID = graphsync.RequestID(rand.Int31())
td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
td.requestSelector = basicnode.NewInt(rand.Int63())
Expand Down Expand Up @@ -401,18 +384,13 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.responseAssembler.responseBuilder.pauseCb = func() {
td.pauseCalls++
}
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.cancelledCalls++
})

qe := New(
td.ctx,
td.manager,
td.blockHooks,
td.updateHooks,
td.cancelledListeners,
td.responseAssembler,
td.connManager,
)
return td, qe
}
Expand Down
8 changes: 4 additions & 4 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestCancellationQueryInProgress(t *testing.T) {
})
cancelledListenerCalled := make(chan struct{}, 1)
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.connManager.RefuteProtected(t, td.p)
cancelledListenerCalled <- struct{}{}
})
responseManager.Startup()
Expand All @@ -105,6 +104,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
close(waitForCancel)

testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()
}
Expand Down Expand Up @@ -1138,7 +1138,7 @@ func (td *testData) alternateLoaderResponseManager() *ResponseManager {
}

func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecutor.QueryExecutor {
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.cancelledListeners, td.responseAssembler, td.connManager)
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.responseAssembler)
}

func (td *testData) assertPausedRequest() {
Expand Down Expand Up @@ -1306,8 +1306,8 @@ func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {
func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {}
func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {}
func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} }
func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics {
return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]}
func (ntq nullTaskQueue) WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics)) {
f(&peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]})
}

var _ taskqueue.TaskQueue = nullTaskQueue{}
Loading

0 comments on commit a081634

Please sign in to comment.