Skip to content

Commit

Permalink
refactor store
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed Jun 15, 2016
1 parent 568a6c7 commit 420744f
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 410 deletions.
40 changes: 13 additions & 27 deletions store/block/supper_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,6 @@ func (b *SuperBlock) Write(n *needle.Needle) (err error) {
return
}

// Writes write needles to the block.
func (b *SuperBlock) Writes(ns *needle.Needles) (err error) {
if b.LastErr != nil {
return b.LastErr
}
if _maxOffset-ns.IncrOffset < b.Offset {
err = errors.ErrSuperBlockNoSpace
return
}
if _, err = b.w.Write(ns.Buffer()); err == nil {
err = b.flush(false)
} else {
b.LastErr = err
return
}
b.Offset += ns.IncrOffset
b.Size += int64(ns.TotalSize)
return
}

// flush flush writer buffer.
func (b *SuperBlock) flush(force bool) (err error) {
var (
Expand Down Expand Up @@ -239,19 +219,23 @@ func (b *SuperBlock) WriteAt(offset uint32, n *needle.Needle) (err error) {
if b.LastErr != nil {
return b.LastErr
}
err = n.WriteAt(offset, b.w)
b.LastErr = err
if _, err = b.w.WriteAt(n.Buffer(), needle.BlockOffset(offset)); err != nil {
b.LastErr = err
}
return
}

// ReadAt read a needle by specified offset, before call it, must set needle
// TotalSize.
func (b *SuperBlock) ReadAt(offset uint32, n *needle.Needle) (err error) {
func (b *SuperBlock) ReadAt(n *needle.Needle) (err error) {
if b.LastErr != nil {
return b.LastErr
}
err = n.ReadAt(offset, b.r)
b.LastErr = err
if _, err = b.r.ReadAt(n.Buffer(), needle.BlockOffset(n.Offset)); err == nil {
err = n.Parse()
} else {
b.LastErr = err
}
return
}

Expand All @@ -261,8 +245,10 @@ func (b *SuperBlock) Delete(offset uint32) (err error) {
return b.LastErr
}
// WriteAt won't update the file offset.
_, err = b.w.WriteAt(needle.FlagDelBytes, needle.BlockOffset(offset)+needle.FlagOffset)
b.LastErr = err
if _, err = b.w.WriteAt(needle.FlagDelBytes,
needle.BlockOffset(offset)+needle.FlagOffset); err != nil {
b.LastErr = err
}
return
}

Expand Down
65 changes: 32 additions & 33 deletions store/block/supper_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ var (
func TestSuperBlock(t *testing.T) {
var (
b *SuperBlock
n *needle.Needle
offset, v2, v3, v4 uint32
err error
buf = &bytes.Buffer{}
needles = make(map[int64]int64)
data = []byte("test")
n = needle.NewBufferNeedle(4)
file = "../test/test.block"
ifile = "../test/test.idx"
//indexer *Indexer
Expand Down Expand Up @@ -61,10 +61,9 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("buf.Write() error(%v)", err)
t.FailNow()
}
if err = n.WriteFrom(1, 1, 4, buf); err != nil {
t.Errorf("n.Write() error(%v)", err)
t.FailNow()
}
n = needle.NewWriter(1, 1, 4)
defer n.Close()
n.ReadFrom(buf)
if err = b.Write(n); err != nil {
t.Errorf("b.Write() error(%v)", err)
t.FailNow()
Expand All @@ -76,7 +75,8 @@ func TestSuperBlock(t *testing.T) {
offset = b.Offset
v2 = b.Offset
// test get
if err = b.ReadAt(1, n); err != nil {
n.Offset = 1
if err = b.ReadAt(n); err != nil {
t.Errorf("b.ReadAt() error(%v)", err)
t.FailNow()
}
Expand All @@ -89,10 +89,9 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("buf.Write() error(%v)", err)
t.FailNow()
}
if err = n.WriteFrom(2, 2, 4, buf); err != nil {
t.Errorf("n.Write() error(%v)", err)
t.FailNow()
}
n = needle.NewWriter(2, 2, 4)
defer n.Close()
n.ReadFrom(buf)
if err = b.Write(n); err != nil {
t.Errorf("b.Write() error(%v)", err)
t.FailNow()
Expand All @@ -103,7 +102,8 @@ func TestSuperBlock(t *testing.T) {
}
offset = b.Offset
v3 = b.Offset
if err = b.ReadAt(6, n); err != nil {
n.Offset = 6
if err = b.ReadAt(n); err != nil {
t.Errorf("b.ReadAt() error(%v)", err)
t.FailNow()
}
Expand All @@ -116,10 +116,9 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("buf.Write() error(%v)", err)
t.FailNow()
}
if err = n.WriteFrom(3, 3, 4, buf); err != nil {
t.Errorf("n.Write() error(%v)", err)
t.FailNow()
}
n = needle.NewWriter(3, 3, 4)
defer n.Close()
n.ReadFrom(buf)
if err = b.Write(n); err != nil {
t.Errorf("b.Write() error(%v)", err)
t.FailNow()
Expand All @@ -131,10 +130,9 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("buf.Write() error(%v)", err)
t.FailNow()
}
if err = n.WriteFrom(4, 4, 4, buf); err != nil {
t.Errorf("n.Write() error(%v)", err)
t.FailNow()
}
n = needle.NewWriter(4, 4, 4)
defer n.Close()
n.ReadFrom(buf)
if err = b.Write(n); err != nil {
t.Errorf("b.Write() error(%v)", err)
t.FailNow()
Expand All @@ -147,15 +145,17 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("compareTestOffset() error(%v)", err)
t.FailNow()
}
if err = b.ReadAt(11, n); err != nil {
n.Offset = 11
if err = b.ReadAt(n); err != nil {
t.Errorf("Get() error(%v)", err)
t.FailNow()
}
if err = compareTestNeedle(t, 3, 3, needle.FlagOK, n, data); err != nil {
t.Error("compareTestNeedle(3)")
t.FailNow()
}
if err = b.ReadAt(16, n); err != nil {
n.Offset = 16
if err = b.ReadAt(n); err != nil {
t.Errorf("Get() error(%v)", err)
t.FailNow()
}
Expand All @@ -169,21 +169,24 @@ func TestSuperBlock(t *testing.T) {
t.FailNow()
}
// test get
if err = b.ReadAt(1, n); err != nil {
n.Offset = 1
if err = b.ReadAt(n); err != nil {
t.Errorf("Get() error(%v)", err)
t.FailNow()
}
if err = compareTestNeedle(t, 1, 1, needle.FlagDel, n, data); err != nil {
t.FailNow()
}
if err = b.ReadAt(11, n); err != nil {
n.Offset = 11
if err = b.ReadAt(n); err != nil {
t.Errorf("Get() error(%v)", err)
t.FailNow()
}
if err = compareTestNeedle(t, 3, 3, needle.FlagOK, n, data); err != nil {
t.FailNow()
}
if err = b.ReadAt(16, n); err != nil {
n.Offset = 16
if err = b.ReadAt(n); err != nil {
t.Errorf("b.Get() error(%v)", err)
t.FailNow()
}
Expand Down Expand Up @@ -256,15 +259,15 @@ func TestSuperBlock(t *testing.T) {
t.Errorf("buf.Write() error(%v)", err)
t.FailNow()
}
if err = n.WriteFrom(3, 3, 4, buf); err != nil {
t.Errorf("n.Write() error(%v)", err)
t.FailNow()
}
n = needle.NewWriter(3, 3, 4)
defer n.Close()
n.ReadFrom(buf)
if err = b.WriteAt(v3, n); err != nil {
t.Errorf("b.Repair(3) error(%v)", err)
t.FailNow()
}
if err = b.ReadAt(v3, n); err != nil {
n.Offset = v3
if err = b.ReadAt(n); err != nil {
t.Errorf("b.Get() error(%v)", err)
t.FailNow()
}
Expand Down Expand Up @@ -303,10 +306,6 @@ func TestSuperBlock(t *testing.T) {
}

func compareTestNeedle(t *testing.T, key int64, cookie int32, flag byte, n *needle.Needle, data []byte) (err error) {
if err = n.Parse(); err != nil {
t.Error(err)
return
}
if !bytes.Equal(n.Data, data) {
err = fmt.Errorf("data: %s not match", n.Data)
t.Error(err)
Expand Down
12 changes: 12 additions & 0 deletions store/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
Volume *Volume
Block *Block
Index *Index
Limit *Limit
Zookeeper *Zookeeper
}

Expand Down Expand Up @@ -59,6 +60,17 @@ type Zookeeper struct {
Timeout Duration
}

type Rate struct {
Rate float64
Brust int
}

type Limit struct {
Read *Rate
Write *Rate
Delete *Rate
}

// Code to implement the TextUnmarshaler interface for `Duration`:
//
type Duration struct {
Expand Down
16 changes: 16 additions & 0 deletions store/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bfs/store/conf"
"encoding/json"
log "github.com/golang/glog"
"golang.org/x/time/rate"
"mime/multipart"
"net/http"
"os"
Expand All @@ -17,6 +18,21 @@ type Server struct {
store *Store
conf *conf.Config
info *stat.Info

rl *rate.Limiter
wl *rate.Limiter
dl *rate.Limiter
}

func NewServer(s *Store, c *conf.Config) *Server {
svr := &Server{
store: s,
conf: c,
rl: rate.NewLimiter(rate.Limit(c.Limit.Read.Rate), c.Limit.Read.Brust),
wl: rate.NewLimiter(rate.Limit(c.Limit.Write.Rate), c.Limit.Write.Brust),
dl: rate.NewLimiter(rate.Limit(c.Limit.Delete.Rate), c.Limit.Delete.Brust),
}
return svr
}

type sizer interface {
Expand Down
19 changes: 2 additions & 17 deletions store/http_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"bfs/libs/errors"
"bfs/store/needle"
"bfs/store/volume"
log "github.com/golang/glog"
"net/http"
Expand Down Expand Up @@ -33,14 +32,13 @@ func StartAdmin(addr string, s *Server) {
func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
var (
v *volume.Volume
n *needle.Needle
err error
vid int64
ret = http.StatusOK
params = r.URL.Query()
now = time.Now()
)
if r.Method != "GET" && r.Method != "HEAD" {
if r.Method != "HEAD" {
ret = http.StatusMethodNotAllowed
http.Error(wr, "method not allowed", ret)
return
Expand All @@ -51,9 +49,8 @@ func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
ret = http.StatusBadRequest
return
}
n = s.store.Needle()
if v = s.store.Volumes[int32(vid)]; v != nil {
if err = v.Probe(n); err != nil {
if err = v.Probe(); err != nil {
if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
ret = http.StatusNotFound
} else {
Expand All @@ -64,18 +61,6 @@ func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
ret = http.StatusNotFound
err = errors.ErrVolumeNotExist
}
if err == nil {
if r.Method == "GET" {
if _, err = wr.Write(n.Data); err != nil {
log.Errorf("wr.Write() error(%v)", err)
ret = http.StatusInternalServerError
}
}
if log.V(1) {
log.Infof("get a needle: %v", n)
}
}
s.store.FreeNeedle(n)
return
}

Expand Down
3 changes: 2 additions & 1 deletion store/http_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestHTTPAdmin(t *testing.T) {
t.FailNow()
}
defer s.Close()
StartAdmin("localhost:6063", s)
StartAdmin("localhost:6063", &Server{store: s, conf: testConf})
time.Sleep(1 * time.Second)
// AddFreeVolume
buf.Reset()
Expand Down Expand Up @@ -123,6 +123,7 @@ func TestHTTPAdmin(t *testing.T) {
t.Errorf("compact_volume: %d", tr.Ret)
t.FailNow()
}
time.Sleep(_compactSleep * 2)
// BulkVolume
buf.Reset()
buf.WriteString("vid=2&bfile=./test/block_admin_1&ifile=./test/block_admin_1.idx")
Expand Down
Loading

0 comments on commit 420744f

Please sign in to comment.