Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrub response errors #320

Merged
merged 12 commits into from
Dec 17, 2021
4 changes: 2 additions & 2 deletions benchmarks/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
fromWaiter peer.ID,
msgFromWaiter gsmsg.GraphSyncMessage) {

builder := gsmsg.NewBuilder(gsmsg.Topic(0))
builder := gsmsg.NewBuilder()
builder.AddBlock(blocks.NewBlock([]byte(expectedStr)))
msgToWaiter, err := builder.Build()
require.NoError(t, err)
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}
}))

builder := gsmsg.NewBuilder(gsmsg.Topic(0))
builder := gsmsg.NewBuilder()
builder.AddBlock(blocks.NewBlock([]byte("data")))
messageSentAsync, err := builder.Build()
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager,
outgoingBlockHooks,
requestUpdatedHooks,
responseAssembler,
)
graphSync := &GraphSync{
network: network,
Expand Down
17 changes: 9 additions & 8 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestSendResponseToIncomingRequest(t *testing.T) {

requestID := graphsync.RequestID(rand.Int31())

builder := gsmsg.NewBuilder(gsmsg.Topic(0))
builder := gsmsg.NewBuilder()
builder.AddRequest(gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32), td.extension))
message, err := builder.Build()
require.NoError(t, err)
Expand Down Expand Up @@ -978,13 +978,14 @@ func TestNetworkDisconnect(t *testing.T) {
drain(responder)

tracing := collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
// may contain multiple abortRequest traces as the network error can bubble up >1 times
// but these will only record if the request is still executing
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.ElementsMatch(t, []string{
"response(0)->executeTask(0)",
"response(0)->abortRequest(0)",
"response(0)->executeTask(1)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
}, tracing.TracesToStrings())
// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
}
Expand Down
37 changes: 26 additions & 11 deletions message/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// requests for a given peer and then generates the corresponding
// GraphSync message when ready to send
type Builder struct {
topic Topic
outgoingBlocks map[cid.Cid]blocks.Block
blkSize uint64
completedResponses map[graphsync.RequestID]graphsync.ResponseStatusCode
Expand All @@ -23,13 +22,9 @@ type Builder struct {
requests map[graphsync.RequestID]GraphSyncRequest
}

// Topic is an identifier for notifications about this response builder
type Topic uint64

// NewBuilder generates a new Builder.
func NewBuilder(topic Topic) *Builder {
func NewBuilder() *Builder {
return &Builder{
topic: topic,
requests: make(map[graphsync.RequestID]GraphSyncRequest),
outgoingBlocks: make(map[cid.Cid]blocks.Block),
completedResponses: make(map[graphsync.RequestID]graphsync.ResponseStatusCode),
Expand Down Expand Up @@ -87,6 +82,31 @@ func (b *Builder) Empty() bool {
return len(b.requests) == 0 && len(b.outgoingBlocks) == 0 && len(b.outgoingResponses) == 0
}

// ScrubResponse removes a response from a message and any blocks only referenced by that response
func (b *Builder) ScrubResponses(requestIDs []graphsync.RequestID) uint64 {
for _, requestID := range requestIDs {
delete(b.completedResponses, requestID)
delete(b.extensions, requestID)
delete(b.outgoingResponses, requestID)
}
oldSize := b.blkSize
newBlkSize := uint64(0)
savedBlocks := make(map[cid.Cid]blocks.Block, len(b.outgoingBlocks))
for _, metadata := range b.outgoingResponses {
for _, item := range metadata {
block, willSendBlock := b.outgoingBlocks[item.Link]
_, alreadySavedBlock := savedBlocks[item.Link]
if item.BlockPresent && willSendBlock && !alreadySavedBlock {
savedBlocks[item.Link] = block
newBlkSize += uint64(len(block.RawData()))
}
}
}
b.blkSize = newBlkSize
b.outgoingBlocks = savedBlocks
return oldSize - newBlkSize
}

// Build assembles and encodes message data from the added requests, links, and blocks.
func (b *Builder) Build() (GraphSyncMessage, error) {
responses := make(map[graphsync.RequestID]GraphSyncResponse, len(b.outgoingResponses))
Expand All @@ -107,11 +127,6 @@ func (b *Builder) Build() (GraphSyncMessage, error) {
}, nil
}

// Topic returns the identifier for notifications sent about this builder
func (b *Builder) Topic() Topic {
return b.topic
}

func responseCode(status graphsync.ResponseStatusCode, isComplete bool) graphsync.ResponseStatusCode {
if !isComplete {
return graphsync.PartialResponse
Expand Down
Loading