Skip to content

Commit

Permalink
Fix unsealing, sector based data refs
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Dec 1, 2019
1 parent c92b9d5 commit a59d0f0
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 205 deletions.
6 changes: 3 additions & 3 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ type SectorInfo struct {
}

type SealedRef struct {
Piece string
Offset uint64
Size uint64
SectorID uint64
Offset uint64
Size uint64
}

type SealedRefs struct {
Expand Down
23 changes: 10 additions & 13 deletions api/cbor_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,8 @@ func (t *SealedRef) MarshalCBOR(w io.Writer) error {
return err
}

// t.t.Piece (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Piece)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Piece)); err != nil {
// t.t.SectorID (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SectorID))); err != nil {
return err
}

Expand Down Expand Up @@ -172,16 +169,16 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input had wrong number of fields")
}

// t.t.Piece (string) (string)

{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
// t.t.SectorID (uint64) (uint64)

t.Piece = string(sval)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
// t.t.Offset (uint64) (uint64)

maj, extra, err = cbg.CborReadHeader(br)
Expand Down
2 changes: 1 addition & 1 deletion chain/blocksync/cbor_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"io"

"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
Expand Down
2 changes: 1 addition & 1 deletion chain/deals/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
}

sectorID, err := p.secb.AddUnixfsPiece(ctx, deal.Ref, uf, deal.DealID)
sectorID, err := p.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion chain/types/cbor_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"io"
"math"

"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-seed/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func PreSeal(maddr address.Address, ssize uint64, sectors int, sbroot string, pr
CacheDir: filepath.Join(sbroot, "cache"),
SealedDir: filepath.Join(sbroot, "sealed"),
StagedDir: filepath.Join(sbroot, "staging"),
MetadataDir: filepath.Join(sbroot, "meta"),
UnsealedDir: filepath.Join(sbroot, "unsealed"),
WorkerThreads: 2,
}

for _, d := range []string{cfg.CacheDir, cfg.SealedDir, cfg.StagedDir, cfg.MetadataDir} {
for _, d := range []string{cfg.CacheDir, cfg.SealedDir, cfg.StagedDir, cfg.UnsealedDir} {
if err := os.MkdirAll(d, 0775); err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/lotus-storage-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var initCmd = &cli.Command{
SealedDir: filepath.Join(pssb, "sealed"),
CacheDir: filepath.Join(pssb, "cache"),
StagedDir: filepath.Join(pssb, "staging"),
MetadataDir: filepath.Join(pssb, "meta"),
UnsealedDir: filepath.Join(pssb, "unsealed"),
}, oldmds)
if err != nil {
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
Expand All @@ -156,7 +156,7 @@ var initCmd = &cli.Command{
SealedDir: filepath.Join(lr.Path(), "sealed"),
CacheDir: filepath.Join(lr.Path(), "cache"),
StagedDir: filepath.Join(lr.Path(), "staging"),
MetadataDir: filepath.Join(lr.Path(), "meta"),
UnsealedDir: filepath.Join(lr.Path(), "unsealed"),
}, mds)
if err != nil {
return xerrors.Errorf("failed to open up sectorbuilder: %w", err)
Expand Down Expand Up @@ -221,7 +221,6 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
Pieces: []storage.Piece{
{
DealID: dealID,
Ref: fmt.Sprintf("preseal-%d", sector.SectorID),
Size: meta.SectorSize,
CommP: sector.CommD[:],
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var sectorsRefsCmd = &cli.Command{
for name, refs := range refs {
fmt.Printf("Block %s:\n", name)
for _, ref := range refs {
fmt.Printf("\t%s+%d %d bytes\n", ref.Piece, ref.Offset, ref.Size)
fmt.Printf("\t%d+%d %d bytes\n", ref.SectorID, ref.Offset, ref.Size)
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions lib/sectorbuilder/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.sectorName(sectorID))
}

func (sb *SectorBuilder) unsealedSectorPath(sectorID uint64) string {
return filepath.Join(sb.unsealedDir, sb.sectorName(sectorID))
}

func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/sectorbuilder/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds dtypes.MetadataDS) (
return nil, err
}

metadata := filepath.Join(dir, "meta")
unsealed := filepath.Join(dir, "unsealed")
sealed := filepath.Join(dir, "sealed")
staging := filepath.Join(dir, "staging")
cache := filepath.Join(dir, "cache")
Expand All @@ -39,7 +39,7 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds dtypes.MetadataDS) (

SealedDir: sealed,
StagedDir: staging,
MetadataDir: metadata,
UnsealedDir: unsealed,
CacheDir: cache,

WorkerThreads: 2,
Expand Down
82 changes: 72 additions & 10 deletions lib/sectorbuilder/sectorbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ type SectorBuilder struct {

Miner address.Address

stagedDir string
sealedDir string
cacheDir string
stagedDir string
sealedDir string
cacheDir string
unsealedDir string

unsealLk sync.Mutex

rateLimit chan struct{}
}
Expand All @@ -71,15 +74,15 @@ type Config struct {
CacheDir string
SealedDir string
StagedDir string
MetadataDir string
UnsealedDir string
}

func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
}

for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} {
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.UnsealedDir} {
if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) {
continue
Expand Down Expand Up @@ -187,13 +190,72 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
}, werr()
}

// TODO: should *really really* return an io.ReadCloser
func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) {
ret := sb.RateLimit()
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret()

panic("fixme")
//return sectorbuilder.Unseal(sb.handle, pieceKey)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Lock()

cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}

sealedPath, err := sb.sealedSectorPath(sectorID)
if err != nil {
return nil, err
}

unsealedPath := sb.unsealedSectorPath(sectorID)

// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

var commd [CommLen]byte
copy(commd[:], commD)

var tkt [CommLen]byte
copy(tkt[:], ticket)

err = sectorbuilder.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
sectorID,
addressToProverID(sb.Miner),
tkt,
commd)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}

f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
}

if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}

lr := io.LimitReader(f, int64(size))

return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}

func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
Expand Down
4 changes: 2 additions & 2 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD
}

cache := filepath.Join(sp, "cache")
metadata := filepath.Join(sp, "meta")
unsealed := filepath.Join(sp, "unsealed")
sealed := filepath.Join(sp, "sealed")
staging := filepath.Join(sp, "staging")

Expand All @@ -76,7 +76,7 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD
WorkerThreads: uint8(threads),

CacheDir: cache,
MetadataDir: metadata,
UnsealedDir: unsealed,
SealedDir: sealed,
StagedDir: staging,
}
Expand Down
22 changes: 2 additions & 20 deletions storage/cbor_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (t *Piece) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{132}); err != nil {
if _, err := w.Write([]byte{131}); err != nil {
return err
}

Expand All @@ -164,14 +164,6 @@ func (t *Piece) MarshalCBOR(w io.Writer) error {
return err
}

// t.t.Ref (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Ref)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Ref)); err != nil {
return err
}

// t.t.Size (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil {
return err
Expand All @@ -198,7 +190,7 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}

if extra != 4 {
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}

Expand All @@ -212,16 +204,6 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.t.Ref (string) (string)

{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}

t.Ref = string(sval)
}
// t.t.Size (uint64) (uint64)

maj, extra, err = cbg.CborReadHeader(br)
Expand Down
5 changes: 1 addition & 4 deletions storage/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"math"
"math/rand"
Expand Down Expand Up @@ -95,7 +94,6 @@ func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, existingPiece
out := make([]Piece, len(sizes))

for i, size := range sizes {
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
if err != nil {
return nil, err
Expand All @@ -105,7 +103,6 @@ func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, existingPiece

out[i] = Piece{
DealID: resp.DealIDs[i],
Ref: name,
Size: ppi.Size,
CommP: ppi.CommP[:],
}
Expand Down Expand Up @@ -134,7 +131,7 @@ func (m *Miner) StoreGarbageData() error {
return
}

if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].Ref, pieces[0].ppi()); err != nil {
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
log.Errorf("%+v", err)
return
}
Expand Down
Loading

0 comments on commit a59d0f0

Please sign in to comment.