Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ADL/single-node-view of a full unixFS file. #14

Merged
merged 10 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions data/builder/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package builder

import (
"bytes"
"context"
"io"
"testing"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-unixfsnode/file"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
)
Expand Down Expand Up @@ -39,3 +43,38 @@ func TestBuildUnixFSFile(t *testing.T) {
t.Fatal("expected top of file to be in store.")
}
}

func TestUnixFSFileRoundtrip(t *testing.T) {
buf := make([]byte, 10*1024*1024)
u.NewSeededRand(0xdeadbeef).Read(buf)
r := bytes.NewReader(buf)

ls := cidlink.DefaultLinkSystem()
storage := cidlink.Memory{}
ls.StorageReadOpener = storage.OpenRead
ls.StorageWriteOpener = storage.OpenWrite

f, _, err := BuildUnixFSFile(r, "", &ls)
if err != nil {
t.Fatal(err)
}

// get back the root node substrate from the link at the top of the builder.
fr, err := ls.Load(ipld.LinkContext{}, f, dagpb.Type.PBNode)
if err != nil {
t.Fatal(err)
}

ufn, err := file.NewUnixFSFile(context.Background(), fr, &ls)
if err != nil {
t.Fatal(err)
}
// read back out the file.
out, err := io.ReadAll(ufn)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, buf) {
t.Fatal("Not equal")
}
}
59 changes: 59 additions & 0 deletions file/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package file

import (
"context"
"io"

"github.com/ipld/go-ipld-prime"
)

// NewUnixFSFile attempts to construct an ipld node from the base protobuf node representing the
// root of a unixfs File.
// It provides a `bytes` view over the file, along with access to io.Reader streaming access
// to file data.
func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (StreamableByteNode, error) {
if substrate.Kind() == ipld.Kind_Bytes {
// A raw / single-node file.
return &singleNodeFile{substrate, 0}, nil
}
// see if it's got children.
links, err := substrate.LookupByString("Links")
if err != nil {
return nil, err
}
if links.Length() == 0 {
// no children.
return newWrappedNode(substrate)
}

return &shardNodeFile{
ctx: ctx,
lsys: lsys,
substrate: substrate,
done: false,
rdr: nil}, nil
}

// A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type.
type StreamableByteNode interface {
ipld.Node
io.Reader
}

type singleNodeFile struct {
ipld.Node
offset int
}

func (f *singleNodeFile) Read(p []byte) (int, error) {
buf, err := f.Node.AsBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we worried about repeated calls to AsBytes? it won't be a problem for basicnode.Bytes, but I don't think AsBytes is guaranteed to be trivially expensive. Perhaps we could call it on the first Read call, and hold onto it in a field until we hit EOF.

if err != nil {
return 0, err
}
if f.offset >= len(buf) {
return 0, io.EOF
}
n := copy(p, buf[f.offset:])
f.offset += n
return n, nil
}
90 changes: 90 additions & 0 deletions file/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package file_test

import (
"bytes"
"context"
"fmt"
"io"
"testing"

"github.com/ipfs/go-unixfsnode"
"github.com/ipfs/go-unixfsnode/directory"
"github.com/ipfs/go-unixfsnode/file"
"github.com/ipld/go-car/v2/blockstore"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
)

func TestRootV0File(t *testing.T) {
baseFile := "./fixtures/QmT78zSuBmuS4z925WZfrqQ1qHaJ56DQaTfyMUF7F8ff5o.car"
root, ls := open(baseFile, t)
file, err := file.NewUnixFSFile(context.Background(), root, ls)
if err != nil {
t.Fatal(err)
}
fc, err := file.AsBytes()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(fc, []byte("hello world\n")) {
t.Errorf("file content does not match: %s", string(fc))
}
}

func TestNamedV0File(t *testing.T) {
baseFile := "./fixtures/QmT8EC9sJq63SkDZ1mWLbWWyVV66PuqyHWpKkH4pcVyY4H.car"
root, ls := open(baseFile, t)
dir, err := unixfsnode.Reify(ipld.LinkContext{}, root, ls)
if err != nil {
t.Fatal(err)
}
dpbn := dir.(directory.UnixFSBasicDir)
name, link := dpbn.Iterator().Next()
if name.String() != "b.txt" {
t.Fatal("unexpected filename")
}
fileNode, err := ls.Load(ipld.LinkContext{}, link.Link(), dagpb.Type.PBNode)
if err != nil {
t.Fatal(err)
}
file, err := file.NewUnixFSFile(context.Background(), fileNode, ls)
if err != nil {
t.Fatal(err)
}
fc, err := file.AsBytes()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(fc, []byte("hello world\n")) {
t.Errorf("file content does not match: %s", string(fc))
}
}

func open(car string, t *testing.T) (ipld.Node, *ipld.LinkSystem) {
baseStore, err := blockstore.OpenReadOnly(car)
if err != nil {
t.Fatal(err)
}
ls := cidlink.DefaultLinkSystem()
ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) {
cl, ok := l.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("couldn't load link")
}
blk, err := baseStore.Get(cl.Cid)
if err != nil {
return nil, err
}
return bytes.NewBuffer(blk.RawData()), nil
}
carRoots, err := baseStore.Roots()
if err != nil {
t.Fatal(err)
}
root, err := ls.Load(ipld.LinkContext{}, cidlink.Link{Cid: carRoots[0]}, dagpb.Type.PBNode)
if err != nil {
t.Fatal(err)
}
return root, &ls
}
Binary file not shown.
Binary file not shown.
154 changes: 154 additions & 0 deletions file/shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package file

import (
"context"
"io"

dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/multiformats/go-multicodec"
)

type shardNodeFile struct {
ctx context.Context
lsys *ipld.LinkSystem
substrate ipld.Node
done bool
rdr io.Reader
}

var _ ipld.Node = (*shardNodeFile)(nil)

func (s *shardNodeFile) Read(p []byte) (int, error) {
if s.done {
return 0, io.EOF
}
// collect the sub-nodes on first use
if s.rdr == nil {
links, err := s.substrate.LookupByString("Links")
if err != nil {
return 0, err
}
readers := make([]io.Reader, 0)
lnki := links.ListIterator()
for !lnki.Done() {
_, lnk, err := lnki.Next()
if err != nil {
return 0, err
}
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return 0, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return 0, err
}
target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk))
if err != nil {
return 0, err
}

asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys)
if err != nil {
return 0, err
}
readers = append(readers, asFSNode)
}
s.rdr = io.MultiReader(readers...)
}
n, err := s.rdr.Read(p)
if err == io.EOF {
s.rdr = nil
s.done = true
}
return n, err
}

func protoFor(link ipld.Link) ipld.NodePrototype {
if lc, ok := link.(cidlink.Link); ok {
if lc.Cid.Prefix().Codec == uint64(multicodec.DagPb) {
return dagpb.Type.PBNode
}
}
return basicnode.Prototype.Any
}

func (s *shardNodeFile) Kind() ipld.Kind {
return ipld.Kind_Bytes
}

func (s *shardNodeFile) AsBytes() ([]byte, error) {
return io.ReadAll(s)
}

func (s *shardNodeFile) AsBool() (bool, error) {
return false, ipld.ErrWrongKind{TypeName: "bool", MethodName: "AsBool", AppropriateKind: ipld.KindSet_JustBytes}
}

func (s *shardNodeFile) AsInt() (int64, error) {
return 0, ipld.ErrWrongKind{TypeName: "int", MethodName: "AsInt", AppropriateKind: ipld.KindSet_JustBytes}
}

func (s *shardNodeFile) AsFloat() (float64, error) {
return 0, ipld.ErrWrongKind{TypeName: "float", MethodName: "AsFloat", AppropriateKind: ipld.KindSet_JustBytes}
}

func (s *shardNodeFile) AsString() (string, error) {
return "", ipld.ErrWrongKind{TypeName: "string", MethodName: "AsString", AppropriateKind: ipld.KindSet_JustBytes}
}

func (s *shardNodeFile) AsLink() (ipld.Link, error) {
return nil, ipld.ErrWrongKind{TypeName: "link", MethodName: "AsLink", AppropriateKind: ipld.KindSet_JustBytes}
}

func (s *shardNodeFile) AsNode() (ipld.Node, error) {
return nil, nil
}

func (s *shardNodeFile) Size() int {
return 0
}

func (s *shardNodeFile) IsAbsent() bool {
return false
}

func (s *shardNodeFile) IsNull() bool {
return s.substrate.IsNull()
}

func (s *shardNodeFile) Length() int64 {
return 0
}

func (s *shardNodeFile) ListIterator() ipld.ListIterator {
return nil
}

func (s *shardNodeFile) MapIterator() ipld.MapIterator {
return nil
}

func (s *shardNodeFile) LookupByIndex(idx int64) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (s *shardNodeFile) LookupByString(key string) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (s *shardNodeFile) LookupByNode(key ipld.Node) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (s *shardNodeFile) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

// shardded files / nodes look like dagpb nodes.
func (s *shardNodeFile) Prototype() ipld.NodePrototype {
return dagpb.Type.PBNode
}
34 changes: 34 additions & 0 deletions file/wrapped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package file

import (
"github.com/ipfs/go-unixfsnode/data"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
)

func newWrappedNode(substrate ipld.Node) (StreamableByteNode, error) {
dataField, err := substrate.LookupByString("Data")
if err != nil {
return nil, err
}
// unpack as unixfs proto.
dfb, err := dataField.AsBytes()
if err != nil {
return nil, err
}
ufd, err := data.DecodeUnixFSData(dfb)
if err != nil {
return nil, err
}

if ufd.Data.Exists() {
return &singleNodeFile{
Node: ufd.Data.Must(),
}, nil
}

// an empty degenerate one.
return &singleNodeFile{
Node: basicnode.NewBytes(nil),
}, nil
}
Loading