Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockservice: remove busyloop in getBlocks by removing batching #232

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 19 additions & 31 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}

if !allValid {
// can't shift in place because we don't want to clobber callers.
ks2 := make([]cid.Cid, 0, len(ks))
for _, c := range ks {
// hash security
Expand Down Expand Up @@ -333,52 +334,39 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
return
}

// batch available blocks together
const batchSize = 32
batch := make([]blocks.Block, 0, batchSize)
var cache [1]blocks.Block // preallocate once for all iterations
for {
var noMoreBlocks bool
batchLoop:
for len(batch) < batchSize {
select {
case b, ok := <-rblocks:
if !ok {
noMoreBlocks = true
break batchLoop
}

logger.Debugf("BlockService.BlockFetched %s", b.Cid())
batch = append(batch, b)
case <-ctx.Done():
var b blocks.Block
select {
case v, ok := <-rblocks:
if !ok {
return
default:
break batchLoop
}
b = v
Jorropo marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return
}

// also write in the blockstore for caching, inform the exchange that the blocks are available
err = bs.PutMany(ctx, batch)
// write in the blockstore for caching
err = bs.Put(ctx, b)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

err = f.NotifyNewBlocks(ctx, batch...)
// inform the exchange that the blocks are available
cache[0] = b
err = f.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc

for _, b := range batch {
select {
case out <- b:
case <-ctx.Done():
return
}
}
batch = batch[:0]
if noMoreBlocks {
break
select {
case out <- b:
case <-ctx.Done():
return
}
}
}()
Expand Down