Skip to content

Commit

Permalink
Merge pull request #5340 from ipfs/feat/coreapi/dagbatch
Browse files Browse the repository at this point in the history
 coreapi: dag: Batching interface
  • Loading branch information
whyrusleeping committed Aug 7, 2018
2 parents 8d3a350 + d6ee955 commit b0d9dfc
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 22 deletions.
87 changes: 68 additions & 19 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"

gopath "path"

Expand All @@ -17,34 +18,25 @@ import (

type DagAPI CoreAPI

type dagBatch struct {
api *DagAPI
toPut []ipld.Node

lk sync.Mutex
}

// Put inserts data using specified format and input encoding. Unless used with
// `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used.
// Returns the path of the inserted data.
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
settings, err := caopts.DagPutOptions(opts...)
if err != nil {
return nil, err
}

codec, ok := cid.CodecToStr[settings.Codec]
if !ok {
return nil, fmt.Errorf("invalid codec %d", settings.Codec)
}

nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength)
if err != nil {
return nil, err
}
if len(nds) == 0 {
return nil, fmt.Errorf("no node returned from ParseInputs")
}
nd, err := getNode(src, opts...)

err = api.node.DAG.Add(ctx, nds[0])
err = api.node.DAG.Add(ctx, nd)
if err != nil {
return nil, err
}

return coreiface.IpldPath(nds[0].Cid()), nil
return coreiface.IpldPath(nd.Cid()), nil
}

// Get resolves `path` using Unixfs resolver, returns the resolved Node.
Expand Down Expand Up @@ -75,6 +67,63 @@ func (api *DagAPI) Tree(ctx context.Context, p coreiface.Path, opts ...caopts.Da
return out, nil
}

// Batch creates new DagBatch
func (api *DagAPI) Batch(ctx context.Context) coreiface.DagBatch {
return &dagBatch{api: api}
}

// Put inserts data using specified format and input encoding. Unless used with
// `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used.
// Returns the path of the inserted data.
func (b *dagBatch) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
nd, err := getNode(src, opts...)
if err != nil {
return nil, err
}

b.lk.Lock()
b.toPut = append(b.toPut, nd)
b.lk.Unlock()

return coreiface.IpldPath(nd.Cid()), nil
}

// Commit commits nodes to the datastore and announces them to the network
func (b *dagBatch) Commit(ctx context.Context) error {
b.lk.Lock()
defer b.lk.Unlock()
defer func() {
b.toPut = nil
}()

return b.api.node.DAG.AddMany(ctx, b.toPut)
}

func getNode(src io.Reader, opts ...caopts.DagPutOption) (ipld.Node, error) {
settings, err := caopts.DagPutOptions(opts...)
if err != nil {
return nil, err
}

codec, ok := cid.CodecToStr[settings.Codec]
if !ok {
return nil, fmt.Errorf("invalid codec %d", settings.Codec)
}

nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength)
if err != nil {
return nil, err
}
if len(nds) == 0 {
return nil, fmt.Errorf("no node returned from ParseInputs")
}
if len(nds) != 1 {
return nil, fmt.Errorf("got more that one node from ParseInputs")
}

return nds[0], nil
}

func (api *DagAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}
33 changes: 33 additions & 0 deletions core/coreapi/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,36 @@ func TestTree(t *testing.T) {
}
}
}

func TestBatch(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}

batch := api.Dag().Batch(ctx)

c, err := batch.Put(ctx, strings.NewReader(`"Hello"`))
if err != nil {
t.Error(err)
}

if c.Cid().String() != "zdpuAqckYF3ToF3gcJNxPZXmnmGuXd3gxHCXhq81HGxBejEvv" {
t.Errorf("got wrong cid: %s", c.Cid().String())
}

_, err = api.Dag().Get(ctx, c)
if err == nil || err.Error() != "merkledag: not found" {
t.Error(err)
}

if err := batch.Commit(ctx); err != nil {
t.Error(err)
}

_, err = api.Dag().Get(ctx, c)
if err != nil {
t.Error(err)
}
}
23 changes: 20 additions & 3 deletions core/coreapi/interface/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,38 @@ import (
"context"
"io"

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"

ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
)

// DagAPI specifies the interface to IPLD
type DagAPI interface {
// DagOps groups operations that can be batched together
type DagOps interface {
// Put inserts data using specified format and input encoding.
// Unless used with WithCodec or WithHash, the defaults "dag-cbor" and
// "sha256" are used.
Put(ctx context.Context, src io.Reader, opts ...options.DagPutOption) (ResolvedPath, error)
}

// DagBatch is the batching version of DagAPI. All implementations of DagBatch
// should be threadsafe
type DagBatch interface {
DagOps

// Commit commits nodes to the datastore and announces them to the network
Commit(ctx context.Context) error
}

// DagAPI specifies the interface to IPLD
type DagAPI interface {
DagOps

// Get attempts to resolve and get the node specified by the path
Get(ctx context.Context, path Path) (ipld.Node, error)

// Tree returns list of paths within a node specified by the path.
Tree(ctx context.Context, path Path, opts ...options.DagTreeOption) ([]Path, error)

// Batch creates new DagBatch
Batch(ctx context.Context) DagBatch
}

0 comments on commit b0d9dfc

Please sign in to comment.