Skip to content

Commit

Permalink
clean up code, make it have a nicer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Sep 15, 2014
1 parent 1a7c083 commit 7845488
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 114 deletions.
4 changes: 2 additions & 2 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// NewDagFromReader constructs a Merkle DAG from the given io.Reader.
// size required for block construction.
func NewDagFromReader(r io.Reader) (*dag.Node, error) {
return NewDagFromReaderWithSplitter(r, SplitterBySize(1024*512))
return NewDagFromReaderWithSplitter(r, &SizeSplitter{1024 * 512})
}

func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, error) {
blkChan := spl(r)
blkChan := spl.Split(r)
root := &dag.Node{Data: dag.FilePBData()}

for blk := range blkChan {
Expand Down
43 changes: 3 additions & 40 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,15 @@ func TestBuildDag(t *testing.T) {

//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
bs := SplitterBySize(512)
bs := &SizeSplitter{512}
testFileConsistency(t, bs, 32*512)
bs = SplitterBySize(4096)
bs = &SizeSplitter{4096}
testFileConsistency(t, bs, 32*4096)

// Uneven offset
testFileConsistency(t, bs, 31*4095)
}

func TestOtherSplit(t *testing.T) {
//split := WhyrusleepingCantImplementRabin
//testFileConsistency(t, split, 4096*64)
}

type testData struct{ n uint64 }

func (t *testData) Read(b []byte) (int, error) {
for i, _ := range b {
b[i] = byte(t.n % 256)
t.n++
}
return len(b), nil
}

func testFileConsistency(t *testing.T, bs BlockSplitter, nbytes int) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, int64(nbytes))
Expand Down Expand Up @@ -95,27 +80,5 @@ func arrComp(a, b []byte) error {
}

func TestMaybeRabinConsistency(t *testing.T) {
testFileConsistency(t, ThisMightBeRabin, 256*4096)
}

func TestRabinSplit(t *testing.T) {

//Generate some random data
nbytes := 256 * 4096
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, int64(nbytes))
good := buf.Bytes()

// Get block generator for random data
ch := ThisMightBeRabin(buf)

i := 0
var blocks [][]byte
for blk := range ch {
if !bytes.Equal(blk, good[i:len(blk)+i]) {
t.Fatalf("bad block! %v", blk[:32])
}
i += len(blk)
blocks = append(blocks, blk)
}
testFileConsistency(t, NewMaybeRabin(4096), 256*4096)
}
88 changes: 37 additions & 51 deletions importer/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,93 +5,79 @@ import (
"bytes"
"fmt"
"io"
"math"
)

//pseudocode stolen from the internet
func rollhash(S []byte) {
a := 10
mask := 0xfff
MOD := 33554383 //randomly chosen
windowSize := 16
an := 1
rollingHash := 0
for i := 0; i < windowSize; i++ {
rollingHash = (rollingHash*a + int(S[i])) % MOD
an = (an * a) % MOD
}
if rollingHash&mask == mask {
// "match"
fmt.Println("match")
}
for i := 1; i < len(S)-windowSize; i++ {
rollingHash = (rollingHash*a + int(S[i+windowSize-1]) - an*int(S[i-1])) % MOD
if rollingHash&mask == mask {
//print "match"
fmt.Println("match")
}
}
type MaybeRabin struct {
mask int
windowSize int
}

func NewMaybeRabin(avgBlkSize int) *MaybeRabin {
blkbits := uint(math.Log2(float64(avgBlkSize)))
rb := new(MaybeRabin)
rb.mask = (1 << blkbits) - 1
rb.windowSize = 16 // probably a good number...
return rb
}

func ThisMightBeRabin(r io.Reader) chan []byte {
out := make(chan []byte)
func (mr *MaybeRabin) Split(r io.Reader) chan []byte {
out := make(chan []byte, 16)
go func() {
inbuf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)

// some bullshit numbers
a := 10
mask := 0xfff //make this smaller for smaller blocks
MOD := 33554383 //randomly chosen
windowSize := 16
// some bullshit numbers i made up
a := 10 // honestly, no idea what this is
MOD := 33554383 // randomly chosen (seriously)
an := 1
rollingHash := 0

window := make([]byte, windowSize)
get := func(i int) int { return int(window[i%len(window)]) }
set := func(i int, val byte) { window[i%len(window)] = val }
// Window is a circular buffer
window := make([]byte, mr.windowSize)
push := func(i int, val byte) (outval int) {
outval = int(window[i%len(window)])
window[i%len(window)] = val
return
}

// Duplicate byte slice
dup := func(b []byte) []byte {
d := make([]byte, len(b))
copy(d, b)
return d
}

// Fill up the window
i := 0
for ; i < windowSize; i++ {
for ; i < mr.windowSize; i++ {
b, err := inbuf.ReadByte()
if err != nil {
fmt.Println(err)
return
}
blkbuf.WriteByte(b)
window[i] = b
push(i, b)
rollingHash = (rollingHash*a + int(b)) % MOD
an = (an * a) % MOD
}
/* This is too short for a block
if rollingHash&mask == mask {
// "match"
fmt.Println("match")
}
*/

for ; true; i++ {
b, err := inbuf.ReadByte()
if err != nil {
break
}
outval := get(i)
set(i, b)
outval := push(i, b)
blkbuf.WriteByte(b)
rollingHash = (rollingHash*a + get(i) - an*outval) % MOD
if rollingHash&mask == mask {
//print "match"
rollingHash = (rollingHash*a + int(b) - an*outval) % MOD
if rollingHash&mr.mask == mr.mask {
out <- dup(blkbuf.Bytes())
blkbuf.Reset()
}
peek, err := inbuf.Peek(windowSize)
if err != nil {
break
}
if len(peek) != windowSize {

// Check if there are enough remaining
peek, err := inbuf.Peek(mr.windowSize)
if err != nil || len(peek) != mr.windowSize {
break
}
}
Expand Down
46 changes: 25 additions & 21 deletions importer/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,33 @@ import (
u "github.com/jbenet/go-ipfs/util"
)

type BlockSplitter func(io.Reader) chan []byte
type BlockSplitter interface {
Split(io.Reader) chan []byte
}

type SizeSplitter struct {
Size int
}

func SplitterBySize(n int) BlockSplitter {
return func(r io.Reader) chan []byte {
out := make(chan []byte)
go func(n int) {
defer close(out)
for {
chunk := make([]byte, n)
nread, err := r.Read(chunk)
if err != nil {
if err == io.EOF {
return
}
u.PErr("block split error: %v\n", err)
func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for {
chunk := make([]byte, ss.Size)
nread, err := r.Read(chunk)
if err != nil {
if err == io.EOF {
return
}
if nread < n {
chunk = chunk[:nread]
}
out <- chunk
u.PErr("block split error: %v\n", err)
return
}
if nread < ss.Size {
chunk = chunk[:nread]
}
}(n)
return out
}
out <- chunk
}
}()
return out
}

0 comments on commit 7845488

Please sign in to comment.