diff --git a/store/http.go b/store/http.go index 4ee2275..9d7db24 100644 --- a/store/http.go +++ b/store/http.go @@ -2,6 +2,8 @@ package main import ( "bfs/libs/errors" + "bfs/libs/stat" + "bfs/store/conf" "encoding/json" log "github.com/golang/glog" "mime/multipart" @@ -11,6 +13,12 @@ import ( "time" ) +type Server struct { + store *Store + conf *conf.Config + info *stat.Info +} + type sizer interface { Size() int64 } diff --git a/store/http_admin.go b/store/http_admin.go index 8d75f4e..d5427e9 100644 --- a/store/http_admin.go +++ b/store/http_admin.go @@ -11,17 +11,17 @@ import ( ) // StartAdmin start admin http listen. -func StartAdmin(addr string, s *Store) { +func StartAdmin(addr string, s *Server) { go func() { var ( err error serveMux = http.NewServeMux() ) - serveMux.Handle("/probe", httpProbeHandler{s: s}) - serveMux.Handle("/bulk_volume", httpBulkVolumeHandler{s: s}) - serveMux.Handle("/compact_volume", httpCompactVolumeHandler{s: s}) - serveMux.Handle("/add_volume", httpAddVolumeHandler{s: s}) - serveMux.Handle("/add_free_volume", httpAddFreeVolumeHandler{s: s}) + serveMux.HandleFunc("/probe", s.probe) + serveMux.HandleFunc("/bulk_volume", s.bulkVolume) + serveMux.HandleFunc("/compact_volume", s.compactVolume) + serveMux.HandleFunc("/add_volume", s.addVolume) + serveMux.HandleFunc("/add_free_volume", s.addFreeVolume) if err = http.ListenAndServe(addr, serveMux); err != nil { log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) return @@ -30,12 +30,7 @@ func StartAdmin(addr string, s *Store) { return } -// httpProbeHandler http upload a file. -type httpProbeHandler struct { - s *Store -} - -func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) probe(wr http.ResponseWriter, r *http.Request) { var ( v *volume.Volume n *needle.Needle @@ -56,8 +51,8 @@ func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { ret = http.StatusBadRequest return } - n = h.s.Needle() - if v = h.s.Volumes[int32(vid)]; v != nil { + n = s.store.Needle() + if v = s.store.Volumes[int32(vid)]; v != nil { if err = v.Probe(n); err != nil { if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist { ret = http.StatusNotFound @@ -80,16 +75,11 @@ func (h httpProbeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { log.Infof("get a needle: %v", n) } } - h.s.FreeNeedle(n) + s.store.FreeNeedle(n) return } -// httpBulkVolumeHandler http bulk block. -type httpBulkVolumeHandler struct { - s *Store -} - -func (h httpBulkVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) bulkVolume(wr http.ResponseWriter, r *http.Request) { var ( err error vid int64 @@ -110,18 +100,13 @@ func (h httpBulkVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request } go func() { log.Infof("bulk volume: %d start", vid) - err = h.s.BulkVolume(int32(vid), bfile, ifile) + err = s.store.BulkVolume(int32(vid), bfile, ifile) log.Infof("bulk volume: %d stop", vid) }() return } -// httpCompactVolumeHandler http compact block. -type httpCompactVolumeHandler struct { - s *Store -} - -func (h httpCompactVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) compactVolume(wr http.ResponseWriter, r *http.Request) { var ( err error vid int64 @@ -140,7 +125,7 @@ func (h httpCompactVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Requ // long time processing, not block, we can from info stat api get status. go func() { log.Infof("compact volume: %d start", vid) - if err = h.s.CompactVolume(int32(vid)); err != nil { + if err = s.store.CompactVolume(int32(vid)); err != nil { log.Errorf("s.CompactVolume() error(%v)", err) } log.Infof("compact volume: %d stop", vid) @@ -148,12 +133,7 @@ func (h httpCompactVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Requ return } -// httpAddVolumeHandler http compact block. -type httpAddVolumeHandler struct { - s *Store -} - -func (h httpAddVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) addVolume(wr http.ResponseWriter, r *http.Request) { var ( err error vid int64 @@ -170,16 +150,11 @@ func (h httpAddVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) return } log.Infof("add volume: %d", vid) - _, err = h.s.AddVolume(int32(vid)) + _, err = s.store.AddVolume(int32(vid)) return } -// httpAddFreeVolumeHandler http compact block. -type httpAddFreeVolumeHandler struct { - s *Store -} - -func (h httpAddFreeVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) addFreeVolume(wr http.ResponseWriter, r *http.Request) { var ( err error sn int @@ -199,7 +174,7 @@ func (h httpAddFreeVolumeHandler) ServeHTTP(wr http.ResponseWriter, r *http.Requ return } log.Infof("add free volume: %d", n) - sn, err = h.s.AddFreeVolume(int(n), bdir, idir) + sn, err = s.store.AddFreeVolume(int(n), bdir, idir) res["succeed"] = sn return } diff --git a/store/http_api.go b/store/http_api.go index 6e0a4cc..32c8d7e 100644 --- a/store/http_api.go +++ b/store/http_api.go @@ -2,7 +2,6 @@ package main import ( "bfs/libs/errors" - "bfs/store/conf" "bfs/store/needle" "bfs/store/volume" log "github.com/golang/glog" @@ -13,16 +12,16 @@ import ( ) // StartApi start api http listen. -func StartApi(addr string, s *Store, c *conf.Config) { +func StartApi(addr string, s *Server) { go func() { var ( err error serveMux = http.NewServeMux() ) - serveMux.Handle("/get", httpGetHandler{s: s}) - serveMux.Handle("/upload", httpUploadHandler{s: s, c: c}) - serveMux.Handle("/uploads", httpUploadsHandler{s: s, c: c}) - serveMux.Handle("/del", httpDelHandler{s: s}) + serveMux.HandleFunc("/get", s.get) + serveMux.HandleFunc("/upload", s.upload) + serveMux.HandleFunc("/uploads", s.uploads) + serveMux.HandleFunc("/del", s.del) if err = http.ListenAndServe(addr, serveMux); err != nil { log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) return @@ -31,12 +30,7 @@ func StartApi(addr string, s *Store, c *conf.Config) { return } -// httpGetHandler http upload a file. -type httpGetHandler struct { - s *Store -} - -func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) get(wr http.ResponseWriter, r *http.Request) { var ( v *volume.Volume n *needle.Needle @@ -67,10 +61,10 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { ret = http.StatusBadRequest return } - n = h.s.Needle() + n = s.store.Needle() n.Key = key n.Cookie = int32(cookie) - if v = h.s.Volumes[int32(vid)]; v != nil { + if v = s.store.Volumes[int32(vid)]; v != nil { if err = v.Get(n); err != nil { if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist { ret = http.StatusNotFound @@ -93,17 +87,11 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { log.Infof("get a needle: %v", n) } } - h.s.FreeNeedle(n) + s.store.FreeNeedle(n) return } -// httpUploadHandler http upload a file. -type httpUploadHandler struct { - s *Store - c *conf.Config -} - -func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) upload(wr http.ResponseWriter, r *http.Request) { var ( vid int64 key int64 @@ -121,7 +109,7 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { return } defer HttpPostWriter(r, wr, time.Now(), &err, res) - if err = checkContentLength(r, h.c.NeedleMaxSize); err != nil { + if err = checkContentLength(r, s.conf.NeedleMaxSize); err != nil { return } str = r.FormValue("vid") @@ -147,28 +135,22 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { err = errors.ErrInternal return } - if size, err = checkFileSize(file, h.c.NeedleMaxSize); err == nil { - n = h.s.Needle() + if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil { + n = s.store.Needle() if err = n.WriteFrom(key, int32(cookie), int32(size), file); err == nil { - if v = h.s.Volumes[int32(vid)]; v != nil { + if v = s.store.Volumes[int32(vid)]; v != nil { err = v.Write(n) } else { err = errors.ErrVolumeNotExist } } - h.s.FreeNeedle(n) + s.store.FreeNeedle(n) } file.Close() return } -// httpUploads http upload files. -type httpUploadsHandler struct { - s *Store - c *conf.Config -} - -func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) uploads(wr http.ResponseWriter, r *http.Request) { var ( i, nn int err error @@ -191,7 +173,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { return } defer HttpPostWriter(r, wr, time.Now(), &err, res) - if err = checkContentLength(r, h.c.NeedleMaxSize*h.c.BatchMaxNum); err != nil { + if err = checkContentLength(r, s.conf.NeedleMaxSize*s.conf.BatchMaxNum); err != nil { return } str = r.FormValue("vid") @@ -214,7 +196,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { err = errors.ErrParam return } - ns = h.s.Needles(nn) + ns = s.store.Needles(nn) for i, fh = range fhs { if key, err = strconv.ParseInt(keys[i], 10, 64); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", keys[i], err) @@ -230,7 +212,7 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { log.Errorf("fh.Open() error(%v)", err) break } - if size, err = checkFileSize(file, h.c.NeedleMaxSize); err == nil { + if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil { err = ns.WriteFrom(key, int32(cookie), int32(size), file) } file.Close() @@ -239,21 +221,17 @@ func (h httpUploadsHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { } } if err == nil { - if v = h.s.Volumes[int32(vid)]; v != nil { + if v = s.store.Volumes[int32(vid)]; v != nil { err = v.Writes(ns) } else { err = errors.ErrVolumeNotExist } } - h.s.FreeNeedles(nn, ns) + s.store.FreeNeedles(nn, ns) return } -type httpDelHandler struct { - s *Store -} - -func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { +func (s *Server) del(wr http.ResponseWriter, r *http.Request) { var ( err error key, vid int64 @@ -278,7 +256,7 @@ func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) { err = errors.ErrParam return } - if v = h.s.Volumes[int32(vid)]; v != nil { + if v = s.store.Volumes[int32(vid)]; v != nil { err = v.Delete(key) } else { err = errors.ErrVolumeNotExist diff --git a/store/http_stat.go b/store/http_stat.go index 08ec5ae..ae30582 100644 --- a/store/http_stat.go +++ b/store/http_stat.go @@ -14,65 +14,68 @@ const ( statDuration = 1 * time.Second ) -func StartStat(addr string, s *Store) { +func StartStat(addr string, s *Server) { var info = &stat.Info{ Ver: Ver, GitSHA1: GitSHA1, StartTime: time.Now(), Stats: &stat.Stats{}, } - go startStat(s, info) - http.HandleFunc("/info", func(wr http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { - http.Error(wr, "Method Not Allowed", http.StatusMethodNotAllowed) - return - } - var ( - err error - data []byte - v *volume.Volume - volumes = make([]*volume.Volume, 0, len(s.Volumes)) - res = map[string]interface{}{"ret": errors.RetOK} - ) - for _, v = range s.Volumes { - volumes = append(volumes, v) - } - res["server"] = info - res["volumes"] = volumes - res["free_volumes"] = s.FreeVolumes - if data, err = json.Marshal(res); err == nil { - if _, err = wr.Write(data); err != nil { - log.Errorf("wr.Write() error(%v)", err) - } - } else { - log.Errorf("json.Marshal() error(%v)", err) - } - return - }) + s.info = info + go s.startStat() + http.HandleFunc("/info", s.stat) go func() { http.ListenAndServe(addr, nil) }() return } +func (s *Server) stat(wr http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + http.Error(wr, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + var ( + err error + data []byte + v *volume.Volume + volumes = make([]*volume.Volume, 0, len(s.store.Volumes)) + res = map[string]interface{}{"ret": errors.RetOK} + ) + for _, v = range s.store.Volumes { + volumes = append(volumes, v) + } + res["server"] = s.info + res["volumes"] = volumes + res["free_volumes"] = s.store.FreeVolumes + if data, err = json.Marshal(res); err == nil { + if _, err = wr.Write(data); err != nil { + log.Errorf("wr.Write() error(%v)", err) + } + } else { + log.Errorf("json.Marshal() error(%v)", err) + } + return +} + // startStat stat the store. -func startStat(s *Store, info *stat.Info) { +func (s *Server) startStat() { var ( v *volume.Volume stat1 *stat.Stats stat = new(stat.Stats) ) for { - *stat = *(info.Stats) - stat1 = info.Stats - info.Stats = stat + *stat = *(s.info.Stats) + stat1 = s.info.Stats + s.info.Stats = stat stat1.Reset() - for _, v = range s.Volumes { + for _, v = range s.store.Volumes { v.Stats.Calc() stat1.Merge(v.Stats) } stat1.Calc() - info.Stats = stat1 + s.info.Stats = stat1 time.Sleep(statDuration) } } diff --git a/store/main.go b/store/main.go index 1e27f53..ce56710 100644 --- a/store/main.go +++ b/store/main.go @@ -18,6 +18,7 @@ func main() { var ( c *conf.Config s *Store + svr *Server err error ) flag.Parse() @@ -32,9 +33,10 @@ func main() { return } log.Infof("init http...") - StartStat(c.StatListen, s) - StartApi(c.ApiListen, s, c) - StartAdmin(c.AdminListen, s) + svr = &Server{store: s, conf: c} + StartStat(c.StatListen, svr) + StartApi(c.ApiListen, svr) + StartAdmin(c.AdminListen, svr) if c.Pprof { StartPprof(c.PprofListen) }