From d804d99161e2ba27e03715acfe9a35a78f28b0ba Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 3 Dec 2021 18:33:38 -0800 Subject: [PATCH 1/2] feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer --- graphsync.go | 15 +++++++++++++ impl/graphsync.go | 16 ++++++++++++++ impl/graphsync_test.go | 8 +++++++ requestmanager/client.go | 22 +++++++++++-------- requestmanager/messages.go | 13 ++++++++++++ requestmanager/requestmanager_test.go | 22 +++++++++++++++++++ requestmanager/server.go | 24 ++++++++++++++------- responsemanager/client.go | 22 +++++++++++-------- responsemanager/messages.go | 13 ++++++++++++ responsemanager/responsemanager_test.go | 22 +++++++++++++++++++ responsemanager/server.go | 28 +++++++++++++++++-------- 11 files changed, 171 insertions(+), 34 deletions(-) diff --git a/graphsync.go b/graphsync.go index 23f6f0fc..20b69266 100644 --- a/graphsync.go +++ b/graphsync.go @@ -332,6 +332,21 @@ type Stats struct { OutgoingResponses ResponseStats } +// RequestState describes the current general state of a request +type RequestState uint64 + +// RequestStates describe a set of request IDs and their current state +type RequestStates map[RequestID]RequestState + +const ( + // Queued means a request has been received and is queued for processing + Queued RequestState = iota + // Running means a request is actively sending or receiving data + Running + // Paused means a request is paused + Paused +) + // GraphExchange is a protocol that can exchange IPLD graphs based on a selector type GraphExchange interface { // Request initiates a new GraphSync request to the given peer using the given selector spec. diff --git a/impl/graphsync.go b/impl/graphsync.go index bd8434cf..f33e7246 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -458,6 +458,22 @@ func (gs *GraphSync) Stats() graphsync.Stats { } } +// PeerStats describes the state of graphsync for a given +type PeerStats struct { + // OutgoingRequests + OutgoingRequests graphsync.RequestStates + // IncomingRequests + IncomingRequests graphsync.RequestStates +} + +// PeerStats produces insight on the current state of a given peer +func (gs *GraphSync) PeerStats(p peer.ID) PeerStats { + return PeerStats{ + OutgoingRequests: gs.requestManager.PeerStats(p), + IncomingRequests: gs.responseManager.PeerStats(p), + } +} + type graphSyncReceiver GraphSync func (gsr *graphSyncReceiver) graphSync() *GraphSync { diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index ae651d86..54645ca4 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -588,6 +588,14 @@ func TestPauseResume(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan) + requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID()) + require.Len(t, requestorPeerStats.OutgoingRequests, 1) + require.Len(t, requestorPeerStats.IncomingRequests, 0) + + responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID()) + require.Len(t, responderPeerStats.IncomingRequests, 1) + require.Len(t, responderPeerStats.OutgoingRequests, 0) + requestID := <-requestIDChan err := responder.UnpauseResponse(td.host1.ID(), requestID) require.NoError(t, err) diff --git a/requestmanager/client.go b/requestmanager/client.go index 51cd75f2..578a6961 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -44,14 +44,6 @@ const ( defaultPriority = graphsync.Priority(0) ) -type state uint64 - -const ( - queued state = iota - running - paused -) - type inProgressRequestStatus struct { ctx context.Context span trace.Span @@ -60,7 +52,7 @@ type inProgressRequestStatus struct { p peer.ID terminalError error pauseMessages chan struct{} - state state + state graphsync.RequestState lastResponse atomic.Value onTerminated []chan<- error request gsmsg.GraphSyncRequest @@ -340,6 +332,18 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err } } +// PeerStats gets stats on all outgoing requests for a given peer +func (rm *RequestManager) PeerStats(p peer.ID) graphsync.RequestStates { + response := make(chan graphsync.RequestStates) + rm.send(&peerStatsMessage{p, response}, nil) + select { + case <-rm.ctx.Done(): + return nil + case peerStats := <-response: + return peerStats + } +} + // SendRequest sends a request to the message queue func (rm *RequestManager) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}) diff --git a/requestmanager/messages.go b/requestmanager/messages.go index b8f4fa87..ff99833d 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -108,3 +108,16 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) { case <-rm.ctx.Done(): } } + +type peerStatsMessage struct { + p peer.ID + peerStatsChan chan<- graphsync.RequestStates +} + +func (psm *peerStatsMessage) handle(rm *RequestManager) { + peerStats := rm.peerStats(psm.p) + select { + case psm.peerStatsChan <- peerStats: + case <-rm.ctx.Done(): + } +} diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 44e7dc4e..ed2adbf7 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -978,6 +978,28 @@ func TestPauseResumeExternal(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan) } +func TestStats(t *testing.T) { + ctx := context.Background() + td := newTestData(ctx, t) + + requestCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + peers := testutil.GeneratePeers(2) + + blockChain2 := testutil.SetupBlockChain(ctx, t, td.persistence, 100, 5) + + _, _ = td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + _, _ = td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) + _, _ = td.requestManager.NewRequest(requestCtx, peers[1], td.blockChain.TipLink, td.blockChain.Selector()) + + requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3) + + states := td.requestManager.peerStats(peers[0]) + require.Len(t, states, 2) + require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running) + require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running) +} + type requestRecord struct { gsr gsmsg.GraphSyncRequest p peer.ID diff --git a/requestmanager/server.go b/requestmanager/server.go index e8e2ddf8..328d7751 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -95,7 +95,7 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld pauseMessages: make(chan struct{}, 1), doNotSendCids: doNotSendCids, request: request, - state: queued, + state: graphsync.Queued, nodeStyleChooser: hooksResult.CustomChooser, inProgressChan: make(chan graphsync.ResponseProgress), inProgressErr: make(chan error), @@ -153,7 +153,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re rm.outgoingRequestProcessingListeners.NotifyOutgoingRequestProcessingListeners(ipr.p, ipr.request, inProgressCount) } - ipr.state = running + ipr.state = graphsync.Running return executor.RequestTask{ Ctx: ipr.ctx, Span: ipr.span, @@ -223,7 +223,7 @@ func (rm *RequestManager) releaseRequestTask(p peer.ID, task *peertask.Task, err return } if _, ok := err.(hooks.ErrPaused); ok { - ipr.state = paused + ipr.state = graphsync.Paused return } log.Infow("graphsync request complete", "request id", requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime)) @@ -253,7 +253,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr if ipr.terminalError == nil { ipr.terminalError = terminalError } - if ipr.state != running { + if ipr.state != graphsync.Running { rm.terminateRequest(requestID, ipr) } else { ipr.cancelFn() @@ -368,10 +368,10 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync if !ok { return graphsync.RequestNotFoundErr{} } - if inProgressRequestStatus.state != paused { + if inProgressRequestStatus.state != graphsync.Paused { return errors.New("request is not paused") } - inProgressRequestStatus.state = queued + inProgressRequestStatus.state = graphsync.Queued inProgressRequestStatus.request = inProgressRequestStatus.request.ReplaceExtensions(extensions) rm.requestQueue.PushTask(inProgressRequestStatus.p, peertask.Task{Topic: id, Priority: math.MaxInt32, Work: 1}) return nil @@ -382,7 +382,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { if !ok { return graphsync.RequestNotFoundErr{} } - if inProgressRequestStatus.state == paused { + if inProgressRequestStatus.state == graphsync.Paused { return errors.New("request is already paused") } select { @@ -391,3 +391,13 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { } return nil } + +func (rm *RequestManager) peerStats(p peer.ID) graphsync.RequestStates { + peerStats := make(graphsync.RequestStates) + for id, ipr := range rm.inProgressRequestStatuses { + if ipr.p == p { + peerStats[id] = graphsync.RequestState(ipr.state) + } + } + return peerStats +} diff --git a/responsemanager/client.go b/responsemanager/client.go index 62f1dca9..7ee38bd5 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -26,14 +26,6 @@ import ( var log = logging.Logger("graphsync") -type state uint64 - -const ( - queued state = iota - running - paused -) - type inProgressResponseStatus struct { ctx context.Context cancelFn func() @@ -42,7 +34,7 @@ type inProgressResponseStatus struct { traverser ipldutil.Traverser signals queryexecutor.ResponseSignals updates []gsmsg.GraphSyncRequest - state state + state graphsync.RequestState subscriber *notifications.TopicDataSubscriber } @@ -237,6 +229,18 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync. rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil) } +// PeerStats gets stats on all outgoing requests for a given peer +func (rm *ResponseManager) PeerStats(p peer.ID) graphsync.RequestStates { + response := make(chan graphsync.RequestStates) + rm.send(&peerStatsMessage{p, response}, nil) + select { + case <-rm.ctx.Done(): + return nil + case peerStats := <-response: + return peerStats + } +} + func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) { select { case <-rm.ctx.Done(): diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 0dad608d..26e319a6 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -113,3 +113,16 @@ func (str *startTaskRequest) handle(rm *ResponseManager) { func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processRequests(prm.p, prm.requests) } + +type peerStatsMessage struct { + p peer.ID + peerStatsChan chan<- graphsync.RequestStates +} + +func (psm *peerStatsMessage) handle(rm *ResponseManager) { + peerStats := rm.peerStats(psm.p) + select { + case psm.peerStatsChan <- peerStats: + case <-rm.ctx.Done(): + } +} diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index fd2e6b21..5d81f252 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -162,6 +162,28 @@ func TestEarlyCancellation(t *testing.T) { td.assertNoResponses() td.connManager.RefuteProtected(t, td.p) } + +func TestStats(t *testing.T) { + td := newTestData(t) + defer td.cancel() + // we're not testing the queryexeuctor or taskqueue here, we're testing + // that cancellation inside the responsemanager itself is properly + // activated, so we won't let responses get far enough to race our + // cancellation + responseManager := td.nullTaskQueueResponseManager() + td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) + responseManager.Startup() + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + p2 := testutil.GeneratePeers(1)[0] + responseManager.ProcessRequests(td.ctx, p2, td.requests) + stats := responseManager.PeerStats(td.p) + require.Len(t, stats, 1) + require.Equal(t, stats[td.requestID], graphsync.Queued) + stats = responseManager.PeerStats(p2) + require.Len(t, stats, 1) + require.Equal(t, stats[td.requestID], graphsync.Queued) + +} func TestMissingContent(t *testing.T) { t.Run("missing root block", func(t *testing.T) { td := newTestData(t) diff --git a/responsemanager/server.go b/responsemanager/server.go index bfcfa04f..0037265b 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -45,7 +45,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID) return } - if response.state != paused { + if response.state != graphsync.Paused { response.updates = append(response.updates, update) select { case response.signals.UpdateSignal <- struct{}{}: @@ -83,10 +83,10 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request if !ok { return errors.New("could not find request") } - if inProgressResponse.state != paused { + if inProgressResponse.state != graphsync.Paused { return errors.New("request is not paused") } - inProgressResponse.state = queued + inProgressResponse.state = graphsync.Queued if len(extensions) > 0 { _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { for _, extension := range extensions { @@ -107,7 +107,7 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID return errors.New("could not find request") } - if response.state != running { + if response.state != graphsync.Running { _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { if ipldutil.IsContextCancelErr(err) { rm.connManager.Unprotect(p, requestID.Tag()) @@ -167,7 +167,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync UpdateSignal: make(chan struct{}, 1), ErrSignal: make(chan error, 1), }, - state: queued, + state: graphsync.Queued, } // TODO: Use a better work estimation metric. @@ -190,11 +190,11 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons response.loader = loader response.traverser = traverser if isPaused { - response.state = paused + response.state = graphsync.Paused return queryexecutor.ResponseTask{Empty: true} } } - response.state = running + response.state = graphsync.Running return queryexecutor.ResponseTask{ Ctx: response.ctx, Empty: false, @@ -223,7 +223,7 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) { return } if _, ok := err.(hooks.ErrPaused); ok { - response.state = paused + response.state = graphsync.Paused return } if err != nil { @@ -249,7 +249,7 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID if !ok { return errors.New("could not find request") } - if inProgressResponse.state == paused { + if inProgressResponse.state == graphsync.Paused { return errors.New("request is already paused") } select { @@ -258,3 +258,13 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID } return nil } + +func (rm *ResponseManager) peerStats(p peer.ID) graphsync.RequestStates { + peerStats := make(graphsync.RequestStates) + for key, ipr := range rm.inProgressResponses { + if key.p == p { + peerStats[key.requestID] = ipr.state + } + } + return peerStats +} From b524aeb5c7d75534eb9074f191fa4a8d23de6778 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 6 Dec 2021 16:32:05 -0800 Subject: [PATCH 2/2] fix(requestmanager): fix tested method --- requestmanager/requestmanager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index ed2adbf7..f5f1bc15 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -994,7 +994,7 @@ func TestStats(t *testing.T) { requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3) - states := td.requestManager.peerStats(peers[0]) + states := td.requestManager.PeerStats(peers[0]) require.Len(t, states, 2) require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running) require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running)