Skip to content

Commit

Permalink
rate-limit some sectorbuilder ops
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 4, 2019
1 parent 13da5a5 commit c76ce2a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var runCmd = &cli.Command{
}
return lr.SetAPIEndpoint(apima)
}),
node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath)),
node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath, 5)), // TODO: grab worker count from config
node.Override(new(api.FullNode), nodeApi),
)
if err != nil {
Expand Down
37 changes: 36 additions & 1 deletion lib/sectorbuilder/sectorbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"unsafe"

sectorbuilder "github.com/filecoin-project/go-sectorbuilder"

logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/address"
)

const PoStReservedWorkers = 1

var log = logging.Logger("sectorbuilder")

type SectorSealingStatus = sectorbuilder.SectorSealingStatus
Expand All @@ -38,6 +40,8 @@ const CommLen = sectorbuilder.CommitmentBytesLen

type SectorBuilder struct {
handle unsafe.Pointer

rateLimit chan struct{}
}

type Config struct {
Expand All @@ -53,6 +57,10 @@ type Config struct {
}

func New(cfg *Config) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers + 1, cfg.WorkerThreads)
}

proverId := addressToProverID(cfg.Miner)

sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
Expand All @@ -62,9 +70,21 @@ func New(cfg *Config) (*SectorBuilder, error) {

return &SectorBuilder{
handle: sbp,
rateLimit: make(chan struct{}, cfg.WorkerThreads - PoStReservedWorkers),
}, nil
}

func (sb *SectorBuilder) rlimit() func() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")
}
sb.rateLimit <- struct{}{}

return func() {
<-sb.rateLimit
}
}

func addressToProverID(a address.Address) [32]byte {
var proverId [32]byte
copy(proverId[:], a.Payload())
Expand All @@ -81,6 +101,9 @@ func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Rea
return 0, err
}

ret := sb.rlimit()
defer ret()

sectorID, err := sectorbuilder.AddPieceFromFile(sb.handle, pieceKey, pieceSize, f)
if err != nil {
return 0, err
Expand All @@ -91,18 +114,30 @@ func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Rea

// TODO: should *really really* return an io.ReadCloser
func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) {
ret := sb.rlimit()
defer ret()

return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}

func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket) (SealPreCommitOutput, error) {
ret := sb.rlimit()
defer ret()

return sectorbuilder.SealPreCommit(sb.handle, sectorID, ticket)
}

func (sb *SectorBuilder) SealCommit(sectorID uint64, seed SealSeed) (SealCommitOutput, error) {
ret := sb.rlimit()
defer ret()

return sectorbuilder.SealCommit(sb.handle, sectorID, seed)
}

func (sb *SectorBuilder) ResumeSealCommit(sectorID uint64) (SealCommitOutput, error) {
ret := sb.rlimit()
defer ret()

return sectorbuilder.ResumeSealCommit(sb.handle, sectorID)
}

Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a
node.Repo(r),
node.Test(),

node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath)),
node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath, 2)),
node.Override(new(api.FullNode), tnd),
)
require.NoError(t, err)
Expand Down

0 comments on commit c76ce2a

Please sign in to comment.