Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer Stats function #298

Merged
merged 3 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,21 @@ type Stats struct {
OutgoingResponses ResponseStats
}

// RequestState describes the current general state of a request
type RequestState uint64

// RequestStates describe a set of request IDs and their current state
type RequestStates map[RequestID]RequestState

const (
// Queued means a request has been received and is queued for processing
Queued RequestState = iota
// Running means a request is actively sending or receiving data
Running
// Paused means a request is paused
Paused
)

// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Expand Down
16 changes: 16 additions & 0 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,22 @@ func (gs *GraphSync) Stats() graphsync.Stats {
}
}

// PeerStats describes the state of graphsync for a given
type PeerStats struct {
// OutgoingRequests
OutgoingRequests graphsync.RequestStates
// IncomingRequests
IncomingRequests graphsync.RequestStates
}

// PeerStats produces insight on the current state of a given peer
func (gs *GraphSync) PeerStats(p peer.ID) PeerStats {
return PeerStats{
OutgoingRequests: gs.requestManager.PeerStats(p),
IncomingRequests: gs.responseManager.PeerStats(p),
}
}

type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
Expand Down
8 changes: 8 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,14 @@ func TestPauseResume(t *testing.T) {
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID())
require.Len(t, requestorPeerStats.OutgoingRequests, 1)
require.Len(t, requestorPeerStats.IncomingRequests, 0)

responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID())
require.Len(t, responderPeerStats.IncomingRequests, 1)
require.Len(t, responderPeerStats.OutgoingRequests, 0)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
require.NoError(t, err)
Expand Down
22 changes: 13 additions & 9 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ const (
defaultPriority = graphsync.Priority(0)
)

type state uint64

const (
queued state = iota
running
paused
)

type inProgressRequestStatus struct {
ctx context.Context
span trace.Span
Expand All @@ -60,7 +52,7 @@ type inProgressRequestStatus struct {
p peer.ID
terminalError error
pauseMessages chan struct{}
state state
state graphsync.RequestState
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
Expand Down Expand Up @@ -340,6 +332,18 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
}
}

// PeerStats gets stats on all outgoing requests for a given peer
func (rm *RequestManager) PeerStats(p peer.ID) graphsync.RequestStates {
response := make(chan graphsync.RequestStates)
rm.send(&peerStatsMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return nil
case peerStats := <-response:
return peerStats
}
}

// SendRequest sends a request to the message queue
func (rm *RequestManager) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
Expand Down
13 changes: 13 additions & 0 deletions requestmanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,16 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) {
case <-rm.ctx.Done():
}
}

type peerStatsMessage struct {
p peer.ID
peerStatsChan chan<- graphsync.RequestStates
}

func (psm *peerStatsMessage) handle(rm *RequestManager) {
peerStats := rm.peerStats(psm.p)
select {
case psm.peerStatsChan <- peerStats:
case <-rm.ctx.Done():
}
}
22 changes: 22 additions & 0 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,28 @@ func TestPauseResumeExternal(t *testing.T) {
testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}

func TestStats(t *testing.T) {
ctx := context.Background()
td := newTestData(ctx, t)

requestCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
peers := testutil.GeneratePeers(2)

blockChain2 := testutil.SetupBlockChain(ctx, t, td.persistence, 100, 5)

_, _ = td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
_, _ = td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
_, _ = td.requestManager.NewRequest(requestCtx, peers[1], td.blockChain.TipLink, td.blockChain.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3)

states := td.requestManager.peerStats(peers[0])
rvagg marked this conversation as resolved.
Show resolved Hide resolved
require.Len(t, states, 2)
require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running)
require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running)
}

type requestRecord struct {
gsr gsmsg.GraphSyncRequest
p peer.ID
Expand Down
24 changes: 17 additions & 7 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld
pauseMessages: make(chan struct{}, 1),
doNotSendCids: doNotSendCids,
request: request,
state: queued,
state: graphsync.Queued,
nodeStyleChooser: hooksResult.CustomChooser,
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
Expand Down Expand Up @@ -153,7 +153,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
rm.outgoingRequestProcessingListeners.NotifyOutgoingRequestProcessingListeners(ipr.p, ipr.request, inProgressCount)
}

ipr.state = running
ipr.state = graphsync.Running
return executor.RequestTask{
Ctx: ipr.ctx,
Span: ipr.span,
Expand Down Expand Up @@ -223,7 +223,7 @@ func (rm *RequestManager) releaseRequestTask(p peer.ID, task *peertask.Task, err
return
}
if _, ok := err.(hooks.ErrPaused); ok {
ipr.state = paused
ipr.state = graphsync.Paused
return
}
log.Infow("graphsync request complete", "request id", requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime))
Expand Down Expand Up @@ -253,7 +253,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
if ipr.terminalError == nil {
ipr.terminalError = terminalError
}
if ipr.state != running {
if ipr.state != graphsync.Running {
rm.terminateRequest(requestID, ipr)
} else {
ipr.cancelFn()
Expand Down Expand Up @@ -368,10 +368,10 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync
if !ok {
return graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.state != paused {
if inProgressRequestStatus.state != graphsync.Paused {
return errors.New("request is not paused")
}
inProgressRequestStatus.state = queued
inProgressRequestStatus.state = graphsync.Queued
inProgressRequestStatus.request = inProgressRequestStatus.request.ReplaceExtensions(extensions)
rm.requestQueue.PushTask(inProgressRequestStatus.p, peertask.Task{Topic: id, Priority: math.MaxInt32, Work: 1})
return nil
Expand All @@ -382,7 +382,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
if !ok {
return graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.state == paused {
if inProgressRequestStatus.state == graphsync.Paused {
return errors.New("request is already paused")
}
select {
Expand All @@ -391,3 +391,13 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
}
return nil
}

func (rm *RequestManager) peerStats(p peer.ID) graphsync.RequestStates {
peerStats := make(graphsync.RequestStates)
for id, ipr := range rm.inProgressRequestStatuses {
if ipr.p == p {
peerStats[id] = graphsync.RequestState(ipr.state)
}
}
return peerStats
}
22 changes: 13 additions & 9 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ import (

var log = logging.Logger("graphsync")

type state uint64

const (
queued state = iota
running
paused
)

type inProgressResponseStatus struct {
ctx context.Context
cancelFn func()
Expand All @@ -42,7 +34,7 @@ type inProgressResponseStatus struct {
traverser ipldutil.Traverser
signals queryexecutor.ResponseSignals
updates []gsmsg.GraphSyncRequest
state state
state graphsync.RequestState
subscriber *notifications.TopicDataSubscriber
}

Expand Down Expand Up @@ -237,6 +229,18 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
}

// PeerStats gets stats on all outgoing requests for a given peer
func (rm *ResponseManager) PeerStats(p peer.ID) graphsync.RequestStates {
response := make(chan graphsync.RequestStates)
rm.send(&peerStatsMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return nil
case peerStats := <-response:
return peerStats
}
}

func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
select {
case <-rm.ctx.Done():
Expand Down
13 changes: 13 additions & 0 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,16 @@ func (str *startTaskRequest) handle(rm *ResponseManager) {
func (prm *processRequestMessage) handle(rm *ResponseManager) {
rm.processRequests(prm.p, prm.requests)
}

type peerStatsMessage struct {
p peer.ID
peerStatsChan chan<- graphsync.RequestStates
}

func (psm *peerStatsMessage) handle(rm *ResponseManager) {
peerStats := rm.peerStats(psm.p)
select {
case psm.peerStatsChan <- peerStats:
case <-rm.ctx.Done():
}
}
22 changes: 22 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ func TestEarlyCancellation(t *testing.T) {
td.assertNoResponses()
td.connManager.RefuteProtected(t, td.p)
}

func TestStats(t *testing.T) {
td := newTestData(t)
defer td.cancel()
// we're not testing the queryexeuctor or taskqueue here, we're testing
// that cancellation inside the responsemanager itself is properly
// activated, so we won't let responses get far enough to race our
// cancellation
responseManager := td.nullTaskQueueResponseManager()
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
p2 := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(td.ctx, p2, td.requests)
stats := responseManager.PeerStats(td.p)
require.Len(t, stats, 1)
require.Equal(t, stats[td.requestID], graphsync.Queued)
stats = responseManager.PeerStats(p2)
require.Len(t, stats, 1)
require.Equal(t, stats[td.requestID], graphsync.Queued)

}
func TestMissingContent(t *testing.T) {
t.Run("missing root block", func(t *testing.T) {
td := newTestData(t)
Expand Down
28 changes: 19 additions & 9 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
return
}
if response.state != paused {
if response.state != graphsync.Paused {
response.updates = append(response.updates, update)
select {
case response.signals.UpdateSignal <- struct{}{}:
Expand Down Expand Up @@ -83,10 +83,10 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
if !ok {
return errors.New("could not find request")
}
if inProgressResponse.state != paused {
if inProgressResponse.state != graphsync.Paused {
return errors.New("request is not paused")
}
inProgressResponse.state = queued
inProgressResponse.state = graphsync.Queued
if len(extensions) > 0 {
_ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error {
for _, extension := range extensions {
Expand All @@ -107,7 +107,7 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
return errors.New("could not find request")
}

if response.state != running {
if response.state != graphsync.Running {
_ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error {
if ipldutil.IsContextCancelErr(err) {
rm.connManager.Unprotect(p, requestID.Tag())
Expand Down Expand Up @@ -167,7 +167,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
UpdateSignal: make(chan struct{}, 1),
ErrSignal: make(chan error, 1),
},
state: queued,
state: graphsync.Queued,
}
// TODO: Use a better work estimation metric.

Expand All @@ -190,11 +190,11 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
response.loader = loader
response.traverser = traverser
if isPaused {
response.state = paused
response.state = graphsync.Paused
return queryexecutor.ResponseTask{Empty: true}
}
}
response.state = running
response.state = graphsync.Running
return queryexecutor.ResponseTask{
Ctx: response.ctx,
Empty: false,
Expand Down Expand Up @@ -223,7 +223,7 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
return
}
if _, ok := err.(hooks.ErrPaused); ok {
response.state = paused
response.state = graphsync.Paused
return
}
if err != nil {
Expand All @@ -249,7 +249,7 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID
if !ok {
return errors.New("could not find request")
}
if inProgressResponse.state == paused {
if inProgressResponse.state == graphsync.Paused {
return errors.New("request is already paused")
}
select {
Expand All @@ -258,3 +258,13 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID
}
return nil
}

func (rm *ResponseManager) peerStats(p peer.ID) graphsync.RequestStates {
peerStats := make(graphsync.RequestStates)
for key, ipr := range rm.inProgressResponses {
if key.p == p {
peerStats[key.requestID] = ipr.state
}
}
return peerStats
}