Skip to content

Commit

Permalink
refactor store
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed May 12, 2016
1 parent 6474257 commit 77ba840
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 126 deletions.
8 changes: 8 additions & 0 deletions store/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"bfs/libs/errors"
"bfs/libs/stat"
"bfs/store/conf"
"encoding/json"
log "github.com/golang/glog"
"mime/multipart"
Expand All @@ -11,6 +13,12 @@ import (
"time"
)

type Server struct {
store *Store
conf *conf.Config
info *stat.Info
}

type sizer interface {
Size() int64
}
Expand Down
61 changes: 18 additions & 43 deletions store/http_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import (
)

// StartAdmin start admin http listen.
func StartAdmin(addr string, s *Store) {
func StartAdmin(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
serveMux.Handle("/probe", httpProbeHandler{s: s})
serveMux.Handle("/bulk_volume", httpBulkVolumeHandler{s: s})
serveMux.Handle("/compact_volume", httpCompactVolumeHandler{s: s})
serveMux.Handle("/add_volume", httpAddVolumeHandler{s: s})
serveMux.Handle("/add_free_volume", httpAddFreeVolumeHandler{s: s})
serveMux.HandleFunc("/probe", s.probe)
serveMux.HandleFunc("/bulk_volume", s.bulkVolume)
serveMux.HandleFunc("/compact_volume", s.compactVolume)
serveMux.HandleFunc("/add_volume", s.addVolume)
serveMux.HandleFunc("/add_free_volume", s.addFreeVolume)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
Expand All @@ -30,12 +30,7 @@ func StartAdmin(addr string, s *Store) {
return
}

// httpProbeHandler http upload a file.
type httpProbeHandler struct {
s *Store
}

func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
var (
v *volume.Volume
n *needle.Needle
Expand All @@ -56,8 +51,8 @@ func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
ret = http.StatusBadRequest
return
}
n = h.s.Needle()
if v = h.s.Volumes[int32(vid)]; v != nil {
n = s.store.Needle()
if v = s.store.Volumes[int32(vid)]; v != nil {
if err = v.Probe(n); err != nil {
if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
ret = http.StatusNotFound
Expand All @@ -80,16 +75,11 @@ func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
log.Infof("get a needle: %v", n)
}
}
h.s.FreeNeedle(n)
s.store.FreeNeedle(n)
return
}

// httpBulkVolumeHandler http bulk block.
type httpBulkVolumeHandler struct {
s *Store
}

func (h httpBulkVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) bulkVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
Expand All @@ -110,18 +100,13 @@ func (h httpBulkVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request
}
go func() {
log.Infof("bulk volume: %d start", vid)
err = h.s.BulkVolume(int32(vid), bfile, ifile)
err = s.store.BulkVolume(int32(vid), bfile, ifile)
log.Infof("bulk volume: %d stop", vid)
}()
return
}

// httpCompactVolumeHandler http compact block.
type httpCompactVolumeHandler struct {
s *Store
}

func (h httpCompactVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) compactVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
Expand All @@ -140,20 +125,15 @@ func (h httpCompactVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Requ
// long time processing, not block, we can from info stat api get status.
go func() {
log.Infof("compact volume: %d start", vid)
if err = h.s.CompactVolume(int32(vid)); err != nil {
if err = s.store.CompactVolume(int32(vid)); err != nil {
log.Errorf("s.CompactVolume() error(%v)", err)
}
log.Infof("compact volume: %d stop", vid)
}()
return
}

// httpAddVolumeHandler http compact block.
type httpAddVolumeHandler struct {
s *Store
}

func (h httpAddVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) addVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
Expand All @@ -170,16 +150,11 @@ func (h httpAddVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request)
return
}
log.Infof("add volume: %d", vid)
_, err = h.s.AddVolume(int32(vid))
_, err = s.store.AddVolume(int32(vid))
return
}

// httpAddFreeVolumeHandler http compact block.
type httpAddFreeVolumeHandler struct {
s *Store
}

func (h httpAddFreeVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) addFreeVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
sn int
Expand All @@ -199,7 +174,7 @@ func (h httpAddFreeVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Requ
return
}
log.Infof("add free volume: %d", n)
sn, err = h.s.AddFreeVolume(int(n), bdir, idir)
sn, err = s.store.AddFreeVolume(int(n), bdir, idir)
res["succeed"] = sn
return
}
68 changes: 23 additions & 45 deletions store/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"bfs/libs/errors"
"bfs/store/conf"
"bfs/store/needle"
"bfs/store/volume"
log "github.com/golang/glog"
Expand All @@ -13,16 +12,16 @@ import (
)

// StartApi start api http listen.
func StartApi(addr string, s *Store, c *conf.Config) {
func StartApi(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
serveMux.Handle("/get", httpGetHandler{s: s})
serveMux.Handle("/upload", httpUploadHandler{s: s, c: c})
serveMux.Handle("/uploads", httpUploadsHandler{s: s, c: c})
serveMux.Handle("/del", httpDelHandler{s: s})
serveMux.HandleFunc("/get", s.get)
serveMux.HandleFunc("/upload", s.upload)
serveMux.HandleFunc("/uploads", s.uploads)
serveMux.HandleFunc("/del", s.del)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
Expand All @@ -31,12 +30,7 @@ func StartApi(addr string, s *Store, c *conf.Config) {
return
}

// httpGetHandler http upload a file.
type httpGetHandler struct {
s *Store
}

func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) get(wr http.ResponseWriter, r *http.Request) {
var (
v *volume.Volume
n *needle.Needle
Expand Down Expand Up @@ -67,10 +61,10 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
ret = http.StatusBadRequest
return
}
n = h.s.Needle()
n = s.store.Needle()
n.Key = key
n.Cookie = int32(cookie)
if v = h.s.Volumes[int32(vid)]; v != nil {
if v = s.store.Volumes[int32(vid)]; v != nil {
if err = v.Get(n); err != nil {
if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
ret = http.StatusNotFound
Expand All @@ -93,17 +87,11 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
log.Infof("get a needle: %v", n)
}
}
h.s.FreeNeedle(n)
s.store.FreeNeedle(n)
return
}

// httpUploadHandler http upload a file.
type httpUploadHandler struct {
s *Store
c *conf.Config
}

func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) upload(wr http.ResponseWriter, r *http.Request) {
var (
vid int64
key int64
Expand All @@ -121,7 +109,7 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
if err = checkContentLength(r, h.c.NeedleMaxSize); err != nil {
if err = checkContentLength(r, s.conf.NeedleMaxSize); err != nil {
return
}
str = r.FormValue("vid")
Expand All @@ -147,28 +135,22 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
err = errors.ErrInternal
return
}
if size, err = checkFileSize(file, h.c.NeedleMaxSize); err == nil {
n = h.s.Needle()
if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil {
n = s.store.Needle()
if err = n.WriteFrom(key, int32(cookie), int32(size), file); err == nil {
if v = h.s.Volumes[int32(vid)]; v != nil {
if v = s.store.Volumes[int32(vid)]; v != nil {
err = v.Write(n)
} else {
err = errors.ErrVolumeNotExist
}
}
h.s.FreeNeedle(n)
s.store.FreeNeedle(n)
}
file.Close()
return
}

// httpUploads http upload files.
type httpUploadsHandler struct {
s *Store
c *conf.Config
}

func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) uploads(wr http.ResponseWriter, r *http.Request) {
var (
i, nn int
err error
Expand All @@ -191,7 +173,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
if err = checkContentLength(r, h.c.NeedleMaxSize*h.c.BatchMaxNum); err != nil {
if err = checkContentLength(r, s.conf.NeedleMaxSize*s.conf.BatchMaxNum); err != nil {
return
}
str = r.FormValue("vid")
Expand All @@ -214,7 +196,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
err = errors.ErrParam
return
}
ns = h.s.Needles(nn)
ns = s.store.Needles(nn)
for i, fh = range fhs {
if key, err = strconv.ParseInt(keys[i], 10, 64); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", keys[i], err)
Expand All @@ -230,7 +212,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
log.Errorf("fh.Open() error(%v)", err)
break
}
if size, err = checkFileSize(file, h.c.NeedleMaxSize); err == nil {
if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil {
err = ns.WriteFrom(key, int32(cookie), int32(size), file)
}
file.Close()
Expand All @@ -239,21 +221,17 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
}
}
if err == nil {
if v = h.s.Volumes[int32(vid)]; v != nil {
if v = s.store.Volumes[int32(vid)]; v != nil {
err = v.Writes(ns)
} else {
err = errors.ErrVolumeNotExist
}
}
h.s.FreeNeedles(nn, ns)
s.store.FreeNeedles(nn, ns)
return
}

type httpDelHandler struct {
s *Store
}

func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *Server) del(wr http.ResponseWriter, r *http.Request) {
var (
err error
key, vid int64
Expand All @@ -278,7 +256,7 @@ func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
err = errors.ErrParam
return
}
if v = h.s.Volumes[int32(vid)]; v != nil {
if v = s.store.Volumes[int32(vid)]; v != nil {
err = v.Delete(key)
} else {
err = errors.ErrVolumeNotExist
Expand Down
Loading

0 comments on commit 77ba840

Please sign in to comment.