Skip to content

Commit

Permalink
v1.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed Jun 27, 2016
1 parent 7facb45 commit a212581
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 455 deletions.
1 change: 0 additions & 1 deletion directory/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (d *Dispatcher) Update(group map[int][]string,
if !write {
continue
}

// calc score
for _, sid = range stores {
totalAdd, totalAddDelay, restSpace, minScore = 0, 0, 0, 0
Expand Down
9 changes: 9 additions & 0 deletions proxy/bfs/bfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ func (b *Bfs) Get(bucket, filename string) (src io.ReadCloser, ctlen int, mtime
}
td.Stop()
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
continue
}
src = resp.Body
ctlen = int(resp.ContentLength)
break
}
if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
err = errors.ErrStoreNotAvailable
}
return
}

Expand All @@ -140,6 +144,11 @@ func (b *Bfs) Upload(bucket, filename, mine, sha1 string, buf []byte) (err error
err = errors.ErrInternal
return
}
// same sha1sum.
if strings.HasPrefix(filename, sha1) && res.Ret == errors.RetNeedleExist {
err = errors.ErrNeedleExist
return
}

params = url.Values{}
for _, host = range res.Stores {
Expand Down
2 changes: 2 additions & 0 deletions proxy/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func (s *server) download(item *ibucket.Item, bucket, file string, wr http.Respo
} else {
if err == errors.ErrNeedleNotExist {
status = http.StatusNotFound
} else if err == errors.ErrStoreNotAvailable {
status = http.StatusServiceUnavailable
} else {
status = http.StatusInternalServerError
}
Expand Down
8 changes: 8 additions & 0 deletions proxy/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ Domain = "http://localhost:2232/"
Prefix = "/bfs/"

MaxFileSize = 20971520

AliyunKeyId = "xxxxx"
AliyunKeySecret = "xxxxx"

NetUserName = "xxxx"
NetPasswd = "xxxx"

PurgeMaxSize = 100000
15 changes: 14 additions & 1 deletion store/block/supper_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (b *SuperBlock) Scan(r *os.File, offset uint32, fn func(*needle.Needle, uin

// Recovery recovery needles map from super block.
func (b *SuperBlock) Recovery(offset uint32, fn func(*needle.Needle, uint32, uint32) error) (err error) {
var rsize int64
// WARN block may be no left data, must update block offset first
if offset == 0 {
offset = needle.NeedleOffset(_headerOffset)
Expand All @@ -340,9 +341,21 @@ func (b *SuperBlock) Recovery(offset uint32, fn func(*needle.Needle, uint32, uin
log.Errorf("block: %s Fadvise() error(%v)", b.File)
return
}
rsize = needle.BlockOffset(b.Offset)
// reset b.w offset, discard left space which can't parse to a needle
if _, err = b.w.Seek(needle.BlockOffset(b.Offset), os.SEEK_SET); err != nil {
if _, err = b.w.Seek(rsize, os.SEEK_SET); err != nil {
log.Errorf("block: %s Seek() error(%v)", b.File, err)
return
}
// recheck offset, keep size and offset consistency
if b.Size != rsize {
log.Warningf("block: %s [real size: %d, offset: %d] but [size: %d, offset: %d] not consistency, truncate file for force recovery, this may lost data",
b.File, b.Size, needle.NeedleOffset(b.Size),
rsize, b.Offset)
// truncate file
if err = b.w.Truncate(rsize); err != nil {
log.Errorf("block: %s Truncate() error(%v)", b.File, err)
}
}
return
}
Expand Down
44 changes: 40 additions & 4 deletions store/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/golang/glog"
"golang.org/x/time/rate"
"mime/multipart"
"net"
"net/http"
"os"
"strconv"
Expand All @@ -18,21 +19,56 @@ type Server struct {
store *Store
conf *conf.Config
info *stat.Info

// server
statSvr net.Listener
adminSvr net.Listener
apiSvr net.Listener
// limit
rl *rate.Limiter
wl *rate.Limiter
dl *rate.Limiter
}

func NewServer(s *Store, c *conf.Config) *Server {
svr := &Server{
func NewServer(s *Store, c *conf.Config) (svr *Server, err error) {
svr = &Server{
store: s,
conf: c,
rl: rate.NewLimiter(rate.Limit(c.Limit.Read.Rate), c.Limit.Read.Brust),
wl: rate.NewLimiter(rate.Limit(c.Limit.Write.Rate), c.Limit.Write.Brust),
dl: rate.NewLimiter(rate.Limit(c.Limit.Delete.Rate), c.Limit.Delete.Brust),
}
return svr
if svr.statSvr, err = net.Listen("tcp", c.StatListen); err != nil {
log.Errorf("net.Listen(%s) error(%v)", c.StatListen, err)
return
}
if svr.apiSvr, err = net.Listen("tcp", c.ApiListen); err != nil {
log.Errorf("net.Listen(%s) error(%v)", c.ApiListen, err)
return
}
if svr.adminSvr, err = net.Listen("tcp", c.AdminListen); err != nil {
log.Errorf("net.Listen(%s) error(%v)", c.AdminListen, err)
return
}
go svr.startStat()
go svr.startApi()
go svr.startAdmin()
if c.Pprof {
go StartPprof(c.PprofListen)
}
return
}

func (s *Server) Close() {
if s.statSvr != nil {
s.statSvr.Close()
}
if s.adminSvr != nil {
s.adminSvr.Close()
}
if s.apiSvr != nil {
s.apiSvr.Close()
}
return
}

type sizer interface {
Expand Down
35 changes: 19 additions & 16 deletions store/http_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,26 @@ import (
"time"
)

// StartAdmin start admin http listen.
func StartAdmin(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
serveMux.HandleFunc("/probe", s.probe)
serveMux.HandleFunc("/bulk_volume", s.bulkVolume)
serveMux.HandleFunc("/compact_volume", s.compactVolume)
serveMux.HandleFunc("/add_volume", s.addVolume)
serveMux.HandleFunc("/add_free_volume", s.addFreeVolume)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
// startAdmin start admin http listen.
func (s *Server) startAdmin() {
var (
err error
serveMux = http.NewServeMux()
server = &http.Server{
Addr: s.conf.AdminListen,
Handler: serveMux,
// TODO read/write timeout
}
}()
)
serveMux.HandleFunc("/probe", s.probe)
serveMux.HandleFunc("/bulk_volume", s.bulkVolume)
serveMux.HandleFunc("/compact_volume", s.compactVolume)
serveMux.HandleFunc("/add_volume", s.addVolume)
serveMux.HandleFunc("/add_free_volume", s.addFreeVolume)
if err = server.Serve(s.adminSvr); err != nil {
log.Errorf("server.Serve() error(%v)", err)
}
log.Info("http admin stop")
return
}

Expand Down
148 changes: 0 additions & 148 deletions store/http_admin_test.go

This file was deleted.

33 changes: 18 additions & 15 deletions store/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@ import (
"time"
)

// StartApi start api http listen.
func StartApi(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
serveMux.HandleFunc("/get", s.get)
serveMux.HandleFunc("/upload", s.upload)
serveMux.HandleFunc("/uploads", s.uploads)
serveMux.HandleFunc("/del", s.del)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
// startApi start api http listen.
func (s *Server) startApi() {
var (
err error
serveMux = http.NewServeMux()
server = &http.Server{
Addr: s.conf.ApiListen,
Handler: serveMux,
// TODO read/write timeout
}
}()
)
serveMux.HandleFunc("/get", s.get)
serveMux.HandleFunc("/upload", s.upload)
serveMux.HandleFunc("/uploads", s.uploads)
serveMux.HandleFunc("/del", s.del)
if err = server.Serve(s.apiSvr); err != nil {
log.Errorf("server.Serve() error(%v)", err)
}
log.Info("http api stop")
return
}

Expand Down
Loading

0 comments on commit a212581

Please sign in to comment.