Skip to content

Commit

Permalink
Base grpc2 (#77)
Browse files Browse the repository at this point in the history
* update grpc

* add fs block
  • Loading branch information
AstaFrode authored Sep 26, 2023
1 parent 5a47120 commit 95ccbc9
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 622 deletions.
186 changes: 186 additions & 0 deletions core/fileblock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
Copyright (C) CESS. All rights reserved.
Copyright (C) Cumulus Encrypted Storage System. All rights reserved.
SPDX-License-Identifier: Apache-2.0
*/

package core

import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multihash"
)

var BlocksDir = "blocks"

// Datastore uses a uses a file per key to store values.
type Datastore struct {
path string
}

var _ ds.Datastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
var _ ds.PersistentDatastore = (*Datastore)(nil)

// NewDatastore returns a new fs Datastore at given `path`
func NewDatastore(path string) (ds.Datastore, error) {
if !isDir(path) {
return nil, fmt.Errorf("failed to find directory at: %v (file? perms?)", path)
}

return &Datastore{path: path}, nil
}

// KeyFilename returns the filename associated with `key`
func (d *Datastore) KeyFilename(key ds.Key) string {
return filepath.Join(d.path, BlocksDir)
}

// Put stores the given value.
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) {
fn := d.KeyFilename(key)
hash, err := CalcSHA256(value)
if err != nil {
return err
}

mhash, err := multihash.FromHexString("1220" + hash)
if err != nil {
return err
}

fn = filepath.Join(fn, cid.NewCidV0(mhash).String())
// mkdirall above.
err = os.MkdirAll(filepath.Dir(fn), 0755)
if err != nil {
return err
}

return os.WriteFile(fn, value, 0666)
}

// Sync would ensure that any previous Puts under the prefix are written to disk.
// However, they already are.
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
return nil
}

// Get returns the value for given key
func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
fn := d.KeyFilename(key)
if !isFile(fn) {
return nil, ds.ErrNotFound
}

return os.ReadFile(fn)
}

// Has returns whether the datastore has a value for a given key
func (d *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
return ds.GetBackedHas(ctx, d, key)
}

func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
return ds.GetBackedSize(ctx, d, key)
}

// Delete removes the value for given key
func (d *Datastore) Delete(ctx context.Context, key ds.Key) (err error) {
fn := d.KeyFilename(key)
if !isFile(fn) {
return nil
}

err = os.Remove(fn)
if os.IsNotExist(err) {
err = nil // idempotent
}
return err
}

// Query implements Datastore.Query
func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) {
results := make(chan query.Result)

walkFn := func(path string, info os.FileInfo, _ error) error {
// remove ds path prefix
relPath, err := filepath.Rel(d.path, path)
if err == nil {
path = filepath.ToSlash(relPath)
}

if !info.IsDir() {
path = strings.TrimSuffix(path, BlocksDir)
var result query.Result
key := ds.NewKey(path)
result.Entry.Key = key.String()
if !q.KeysOnly {
result.Entry.Value, result.Error = d.Get(ctx, key)
}
results <- result
}
return nil
}

go func() {
filepath.Walk(d.path, walkFn)
close(results)
}()
r := query.ResultsWithChan(q, results)
r = query.NaiveQueryApply(q, r)
return r, nil
}

// isDir returns whether given path is a directory
func isDir(path string) bool {
finfo, err := os.Stat(path)
if err != nil {
return false
}

return finfo.IsDir()
}

// isFile returns whether given path is a file
func isFile(path string) bool {
finfo, err := os.Stat(path)
if err != nil {
return false
}

return !finfo.IsDir()
}

func (d *Datastore) Close() error {
return nil
}

func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) {
return ds.NewBasicBatch(d), nil
}

// DiskUsage returns the disk size used by the datastore in bytes.
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
var du uint64
err := filepath.Walk(d.path, func(p string, f os.FileInfo, err error) error {
if err != nil {
log.Println(err)
return err
}
if f != nil && f.Mode().IsRegular() {
du += uint64(f.Size())
}
return nil
})
return du, err
}
90 changes: 34 additions & 56 deletions core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/gogo/protobuf/proto"
bitswap "github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"
u "github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/connmgr"
Expand Down Expand Up @@ -124,21 +124,18 @@ type P2P interface {
// Close p2p
Close() error

// NewBitSwapBlock creates block data of data
NewBitSwapBlock(data []byte) (cid.Cid, error)
//
NewCidFromFid(fid string) (cid.Cid, error)

//
SaveAndNotifyDataBlock(buf []byte) (cid.Cid, error)

// SaveDataToBlock saves data to block data
SaveDataToBlock(data []byte) (cid.Cid, error)
//
NotifyData(buf []byte) error

// GetDataFromBlock get data from block
GetDataFromBlock(wantCid string) ([]byte, error)

// NewBitSwapBlockWithCid creates block data of data and cidstr
NewBitSwapBlockWithCid(data []byte, cidstr cid.Cid) (*blocks.BasicBlock, error)

// SaveAndNotifyBlock save and notify block
SaveAndNotifyBlock(block *blocks.BasicBlock) error

//
GetDiscoveredPeers() <-chan *routing.QueryEvent

Expand Down Expand Up @@ -197,12 +194,7 @@ type P2P interface {
//
PoisRequestVerifyDeletionProof(
addr string,
roots [][]byte,
witChain *pb.AccWitnessNode,
accPath [][]byte,
minerId []byte,
minerPoisInfo *pb.MinerPoisInfo,
minerSign []byte,
RequestVerifyDeletionProof *pb.RequestVerifyDeletionProof,
timeout time.Duration,
) (*pb.ResponseVerifyCommitOrDeletionProof, error)

Expand Down Expand Up @@ -268,12 +260,7 @@ type P2P interface {

PoisRequestVerifyDeletionProofP2P(
peerid peer.ID,
roots [][]byte,
witChain *pb.AccWitnessNode,
accPath [][]byte,
minerId []byte,
minerPoisInfo *pb.MinerPoisInfo,
minerSign []byte,
requestVerifyDeletionProof *pb.RequestVerifyDeletionProof,
timeout time.Duration,
) (*pb.ResponseVerifyCommitOrDeletionProof, error)

Expand Down Expand Up @@ -468,52 +455,43 @@ func NewBasicNode(
}

network := bsnet.NewFromIpfsHost(n.host, n.RoutingDiscovery)
// blockData := blocks.NewBlock([]byte("123456"))
// fmt.Println("Generate a cid: ", blockData.Cid().String())
n.bstore = blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
fsdatastore, err := NewDatastore(n.workspace)
if err != nil {
return nil, err
}

n.bstore = blockstore.NewBlockstore(ds_sync.MutexWrap(fsdatastore))
n.bswap = bitswap.New(n.ctxQueryFromCtxCancel, network, n.bstore)

n.initProtocol(protocolPrefix)

return n, nil
}

// NewBitSwapBlock creates block data of data
func (n *Node) NewBitSwapBlock(data []byte) (cid.Cid, error) {
if len(data) <= 0 {
return cid.Cid{}, errors.New("[NewBitSwapBlock] empty data")
}
blockData := blocks.NewBlock(data)
return blockData.Cid(), nil
}

// NewBitSwapBlockWithCid creates block data of data and cidstr
func (n *Node) NewBitSwapBlockWithCid(data []byte, cidstr cid.Cid) (*blocks.BasicBlock, error) {
if len(data) <= 0 {
return nil, errors.New("[NewBitSwapBlockWithCid] empty data")
// NewCidFromFid creates block data of data
func (n *Node) NewCidFromFid(fid string) (cid.Cid, error) {
if fid == "" {
return cid.Cid{}, errors.New("empty fid")
}
return blocks.NewBlockWithCid(data, cidstr)
newCid := cid.NewCidV0(u.Hash([]byte(fid)))
return newCid, nil
}

// SaveAndNotifyBlock save and notify block
func (n *Node) SaveAndNotifyBlock(block *blocks.BasicBlock) error {
err := n.bstore.Put(n.ctxQueryFromCtxCancel, block)
// SaveAndNotifyDataBlock
func (n *Node) SaveAndNotifyDataBlock(buf []byte) (cid.Cid, error) {
blockData := blocks.NewBlock(buf)
err := n.bstore.Put(n.ctxQueryFromCtxCancel, blockData)
if err != nil {
return err
return blockData.Cid(), err
}
err = n.bswap.NotifyNewBlocks(n.ctxQueryFromCtxCancel, block)
n.bswap.GetWantHaves()
return err
err = n.bswap.NotifyNewBlocks(n.ctxQueryFromCtxCancel, blockData)
return blockData.Cid(), err
}

// SaveDataToBlock saves data to block data
func (n *Node) SaveDataToBlock(data []byte) (cid.Cid, error) {
if len(data) <= 0 {
return cid.Cid{}, errors.New("[SaveDataToBlock] empty data")
}
blockData := blocks.NewBlock(data)
err := n.bstore.Put(n.ctxQueryFromCtxCancel, blockData)
return blockData.Cid(), err
// NotifyData notify data
func (n *Node) NotifyData(buf []byte) error {
blockData := blocks.NewBlock(buf)
return n.bswap.NotifyNewBlocks(n.ctxQueryFromCtxCancel, blockData)
}

// GetDataFromBlock get data from block
Expand Down
16 changes: 2 additions & 14 deletions core/pois.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,7 @@ func (n *Node) PoisRequestVerifySpaceTotal(

func (n *Node) PoisRequestVerifyDeletionProof(
addr string,
roots [][]byte,
witChain *pb.AccWitnessNode,
accPath [][]byte,
minerId []byte,
minerPoisInfo *pb.MinerPoisInfo,
minerSign []byte,
RequestVerifyDeletionProof *pb.RequestVerifyDeletionProof,
timeout time.Duration,
) (*pb.ResponseVerifyCommitOrDeletionProof, error) {
conn, err := grpc.Dial(
Expand All @@ -176,13 +171,6 @@ func (n *Node) PoisRequestVerifyDeletionProof(
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

result, err := c.RequestVerifyDeletionProof(ctx, &pb.RequestVerifyDeletionProof{
Roots: roots,
WitChain: witChain,
AccPath: accPath,
MinerId: minerId,
PoisInfo: minerPoisInfo,
MinerSign: minerSign,
})
result, err := c.RequestVerifyDeletionProof(ctx, RequestVerifyDeletionProof)
return result, err
}
16 changes: 2 additions & 14 deletions core/pois_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,7 @@ func (n *Node) PoisRequestVerifySpaceTotalP2P(

func (n *Node) PoisRequestVerifyDeletionProofP2P(
peerid peer.ID,
roots [][]byte,
witChain *pb.AccWitnessNode,
accPath [][]byte,
minerId []byte,
minerPoisInfo *pb.MinerPoisInfo,
minerSign []byte,
requestVerifyDeletionProof *pb.RequestVerifyDeletionProof,
timeout time.Duration,
) (*pb.ResponseVerifyCommitOrDeletionProof, error) {
opts := []grpc.DialOption{grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())}
Expand All @@ -145,13 +140,6 @@ func (n *Node) PoisRequestVerifyDeletionProofP2P(
defer conn.Close()
c := pb.NewPoisApiClient(conn)

result, err := c.RequestVerifyDeletionProof(ctx, &pb.RequestVerifyDeletionProof{
Roots: roots,
WitChain: witChain,
AccPath: accPath,
MinerId: minerId,
PoisInfo: minerPoisInfo,
MinerSign: minerSign,
})
result, err := c.RequestVerifyDeletionProof(ctx, requestVerifyDeletionProof)
return result, err
}
Loading

0 comments on commit 95ccbc9

Please sign in to comment.