From a2125810a994428ca2b95f6f4fc9a5144f8e7ad1 Mon Sep 17 00:00:00 2001 From: "Terry.Mao" <83873308@qq.com> Date: Mon, 27 Jun 2016 17:03:06 +0800 Subject: [PATCH] v1.4.2 --- directory/dispatcher.go | 1 - proxy/bfs/bfs.go | 9 ++ proxy/http_api.go | 2 + proxy/proxy.toml | 8 ++ store/block/supper_block.go | 15 ++- store/http.go | 44 +++++++- store/http_admin.go | 35 +++--- store/http_admin_test.go | 148 ------------------------ store/http_api.go | 33 +++--- store/http_api_test.go | 220 ------------------------------------ store/http_perf.go | 13 +-- store/http_stat.go | 29 +++-- store/main.go | 25 ++-- store/signal.go | 4 +- store/store.go | 8 +- store/store.toml | 6 +- store/test_test.go | 18 +++ store/volume/volume.go | 21 ++-- 18 files changed, 184 insertions(+), 455 deletions(-) delete mode 100644 store/http_admin_test.go delete mode 100644 store/http_api_test.go diff --git a/directory/dispatcher.go b/directory/dispatcher.go index 2b036ea..8937019 100644 --- a/directory/dispatcher.go +++ b/directory/dispatcher.go @@ -70,7 +70,6 @@ func (d *Dispatcher) Update(group map[int][]string, if !write { continue } - // calc score for _, sid = range stores { totalAdd, totalAddDelay, restSpace, minScore = 0, 0, 0, 0 diff --git a/proxy/bfs/bfs.go b/proxy/bfs/bfs.go index a965d85..d5bb22b 100644 --- a/proxy/bfs/bfs.go +++ b/proxy/bfs/bfs.go @@ -109,12 +109,16 @@ func (b *Bfs) Get(bucket, filename string) (src io.ReadCloser, ctlen int, mtime } td.Stop() if resp.StatusCode != http.StatusOK { + resp.Body.Close() continue } src = resp.Body ctlen = int(resp.ContentLength) break } + if err == nil && resp.StatusCode == http.StatusServiceUnavailable { + err = errors.ErrStoreNotAvailable + } return } @@ -140,6 +144,11 @@ func (b *Bfs) Upload(bucket, filename, mine, sha1 string, buf []byte) (err error err = errors.ErrInternal return } + // same sha1sum. + if strings.HasPrefix(filename, sha1) && res.Ret == errors.RetNeedleExist { + err = errors.ErrNeedleExist + return + } params = url.Values{} for _, host = range res.Stores { diff --git a/proxy/http_api.go b/proxy/http_api.go index 4940f30..a995426 100644 --- a/proxy/http_api.go +++ b/proxy/http_api.go @@ -190,6 +190,8 @@ func (s *server) download(item *ibucket.Item, bucket, file string, wr http.Respo } else { if err == errors.ErrNeedleNotExist { status = http.StatusNotFound + } else if err == errors.ErrStoreNotAvailable { + status = http.StatusServiceUnavailable } else { status = http.StatusInternalServerError } diff --git a/proxy/proxy.toml b/proxy/proxy.toml index cc9a9f9..dacb14a 100644 --- a/proxy/proxy.toml +++ b/proxy/proxy.toml @@ -11,3 +11,11 @@ Domain = "http://localhost:2232/" Prefix = "/bfs/" MaxFileSize = 20971520 + +AliyunKeyId = "xxxxx" +AliyunKeySecret = "xxxxx" + +NetUserName = "xxxx" +NetPasswd = "xxxx" + +PurgeMaxSize = 100000 diff --git a/store/block/supper_block.go b/store/block/supper_block.go index df323ba..02ddaf4 100644 --- a/store/block/supper_block.go +++ b/store/block/supper_block.go @@ -319,6 +319,7 @@ func (b *SuperBlock) Scan(r *os.File, offset uint32, fn func(*needle.Needle, uin // Recovery recovery needles map from super block. func (b *SuperBlock) Recovery(offset uint32, fn func(*needle.Needle, uint32, uint32) error) (err error) { + var rsize int64 // WARN block may be no left data, must update block offset first if offset == 0 { offset = needle.NeedleOffset(_headerOffset) @@ -340,9 +341,21 @@ func (b *SuperBlock) Recovery(offset uint32, fn func(*needle.Needle, uint32, uin log.Errorf("block: %s Fadvise() error(%v)", b.File) return } + rsize = needle.BlockOffset(b.Offset) // reset b.w offset, discard left space which can't parse to a needle - if _, err = b.w.Seek(needle.BlockOffset(b.Offset), os.SEEK_SET); err != nil { + if _, err = b.w.Seek(rsize, os.SEEK_SET); err != nil { log.Errorf("block: %s Seek() error(%v)", b.File, err) + return + } + // recheck offset, keep size and offset consistency + if b.Size != rsize { + log.Warningf("block: %s [real size: %d, offset: %d] but [size: %d, offset: %d] not consistency, truncate file for force recovery, this may lost data", + b.File, b.Size, needle.NeedleOffset(b.Size), + rsize, b.Offset) + // truncate file + if err = b.w.Truncate(rsize); err != nil { + log.Errorf("block: %s Truncate() error(%v)", b.File, err) + } } return } diff --git a/store/http.go b/store/http.go index b892431..1a6644a 100644 --- a/store/http.go +++ b/store/http.go @@ -8,6 +8,7 @@ import ( log "github.com/golang/glog" "golang.org/x/time/rate" "mime/multipart" + "net" "net/http" "os" "strconv" @@ -18,21 +19,56 @@ type Server struct { store *Store conf *conf.Config info *stat.Info - + // server + statSvr net.Listener + adminSvr net.Listener + apiSvr net.Listener + // limit rl *rate.Limiter wl *rate.Limiter dl *rate.Limiter } -func NewServer(s *Store, c *conf.Config) *Server { - svr := &Server{ +func NewServer(s *Store, c *conf.Config) (svr *Server, err error) { + svr = &Server{ store: s, conf: c, rl: rate.NewLimiter(rate.Limit(c.Limit.Read.Rate), c.Limit.Read.Brust), wl: rate.NewLimiter(rate.Limit(c.Limit.Write.Rate), c.Limit.Write.Brust), dl: rate.NewLimiter(rate.Limit(c.Limit.Delete.Rate), c.Limit.Delete.Brust), } - return svr + if svr.statSvr, err = net.Listen("tcp", c.StatListen); err != nil { + log.Errorf("net.Listen(%s) error(%v)", c.StatListen, err) + return + } + if svr.apiSvr, err = net.Listen("tcp", c.ApiListen); err != nil { + log.Errorf("net.Listen(%s) error(%v)", c.ApiListen, err) + return + } + if svr.adminSvr, err = net.Listen("tcp", c.AdminListen); err != nil { + log.Errorf("net.Listen(%s) error(%v)", c.AdminListen, err) + return + } + go svr.startStat() + go svr.startApi() + go svr.startAdmin() + if c.Pprof { + go StartPprof(c.PprofListen) + } + return +} + +func (s *Server) Close() { + if s.statSvr != nil { + s.statSvr.Close() + } + if s.adminSvr != nil { + s.adminSvr.Close() + } + if s.apiSvr != nil { + s.apiSvr.Close() + } + return } type sizer interface { diff --git a/store/http_admin.go b/store/http_admin.go index 32e26ee..10f924b 100644 --- a/store/http_admin.go +++ b/store/http_admin.go @@ -9,23 +9,26 @@ import ( "time" ) -// StartAdmin start admin http listen. -func StartAdmin(addr string, s *Server) { - go func() { - var ( - err error - serveMux = http.NewServeMux() - ) - serveMux.HandleFunc("/probe", s.probe) - serveMux.HandleFunc("/bulk_volume", s.bulkVolume) - serveMux.HandleFunc("/compact_volume", s.compactVolume) - serveMux.HandleFunc("/add_volume", s.addVolume) - serveMux.HandleFunc("/add_free_volume", s.addFreeVolume) - if err = http.ListenAndServe(addr, serveMux); err != nil { - log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) - return +// startAdmin start admin http listen. +func (s *Server) startAdmin() { + var ( + err error + serveMux = http.NewServeMux() + server = &http.Server{ + Addr: s.conf.AdminListen, + Handler: serveMux, + // TODO read/write timeout } - }() + ) + serveMux.HandleFunc("/probe", s.probe) + serveMux.HandleFunc("/bulk_volume", s.bulkVolume) + serveMux.HandleFunc("/compact_volume", s.compactVolume) + serveMux.HandleFunc("/add_volume", s.addVolume) + serveMux.HandleFunc("/add_free_volume", s.addFreeVolume) + if err = server.Serve(s.adminSvr); err != nil { + log.Errorf("server.Serve() error(%v)", err) + } + log.Info("http admin stop") return } diff --git a/store/http_admin_test.go b/store/http_admin_test.go deleted file mode 100644 index ffa9feb..0000000 --- a/store/http_admin_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package main - -import ( - "bfs/store/zk" - "bytes" - "encoding/json" - "io/ioutil" - "net/http" - "os" - "testing" - "time" -) - -func TestHTTPAdmin(t *testing.T) { - var ( - s *Store - z *zk.Zookeeper - resp *http.Response - body []byte - err error - buf = &bytes.Buffer{} - tr = &testRet{} - ) - os.Remove(testConf.Store.VolumeIndex) - os.Remove(testConf.Store.FreeVolumeIndex) - os.Remove("./test/_free_block_1") - os.Remove("./test/_free_block_1.idx") - os.Remove("./test/_free_block_2") - os.Remove("./test/_free_block_2.idx") - os.Remove("./test/_free_block_3") - os.Remove("./test/_free_block_3.idx") - os.Remove("./test/1_0") - os.Remove("./test/1_1") - os.Remove("./test/block_admin_1") - os.Remove("./test/block_admin_1.idx") - defer os.Remove(testConf.Store.VolumeIndex) - defer os.Remove(testConf.Store.FreeVolumeIndex) - defer os.Remove("./test/_free_block_1") - defer os.Remove("./test/_free_block_1.idx") - defer os.Remove("./test/_free_block_2") - defer os.Remove("./test/_free_block_2.idx") - defer os.Remove("./test/_free_block_3") - defer os.Remove("./test/_free_block_3.idx") - defer os.Remove("./test/1_0") - defer os.Remove("./test/1_1") - defer os.Remove("./test/block_admin_1") - defer os.Remove("./test/block_admin_1.idx") - if z, err = zk.NewZookeeper(testConf); err != nil { - t.Errorf("NewZookeeper() error(%v)", err) - t.FailNow() - } - defer z.Close() - z.DelVolume(1) - z.DelVolume(2) - z.DelVolume(3) - defer z.DelVolume(1) - defer z.DelVolume(2) - defer z.DelVolume(3) - if s, err = NewStore(testConf); err != nil { - t.Errorf("NewStore() error(%v)", err) - t.FailNow() - } - defer s.Close() - StartAdmin("localhost:6063", &Server{store: s, conf: testConf}) - time.Sleep(1 * time.Second) - // AddFreeVolume - buf.Reset() - buf.WriteString("n=2&bdir=./test/&idir=./test/") - if resp, err = http.Post("http://localhost:6063/add_free_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("add_free_volume: %d", tr.Ret) - t.FailNow() - } - // AddVolume - buf.Reset() - buf.WriteString("vid=1") - if resp, err = http.Post("http://localhost:6063/add_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("add_volume: %d", tr.Ret) - t.FailNow() - } - // CompactVolume - buf.Reset() - buf.WriteString("vid=1") - if resp, err = http.Post("http://localhost:6063/compact_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("compact_volume: %d", tr.Ret) - t.FailNow() - } - time.Sleep(_compactSleep * 2) - // BulkVolume - buf.Reset() - buf.WriteString("vid=2&bfile=./test/block_admin_1&ifile=./test/block_admin_1.idx") - if resp, err = http.Post("http://localhost:6063/bulk_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("bulk_volume: %d", tr.Ret) - t.FailNow() - } - time.Sleep(2 * time.Second) -} diff --git a/store/http_api.go b/store/http_api.go index 8629e24..beb294d 100644 --- a/store/http_api.go +++ b/store/http_api.go @@ -11,22 +11,25 @@ import ( "time" ) -// StartApi start api http listen. -func StartApi(addr string, s *Server) { - go func() { - var ( - err error - serveMux = http.NewServeMux() - ) - serveMux.HandleFunc("/get", s.get) - serveMux.HandleFunc("/upload", s.upload) - serveMux.HandleFunc("/uploads", s.uploads) - serveMux.HandleFunc("/del", s.del) - if err = http.ListenAndServe(addr, serveMux); err != nil { - log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) - return +// startApi start api http listen. +func (s *Server) startApi() { + var ( + err error + serveMux = http.NewServeMux() + server = &http.Server{ + Addr: s.conf.ApiListen, + Handler: serveMux, + // TODO read/write timeout } - }() + ) + serveMux.HandleFunc("/get", s.get) + serveMux.HandleFunc("/upload", s.upload) + serveMux.HandleFunc("/uploads", s.uploads) + serveMux.HandleFunc("/del", s.del) + if err = server.Serve(s.apiSvr); err != nil { + log.Errorf("server.Serve() error(%v)", err) + } + log.Info("http api stop") return } diff --git a/store/http_api_test.go b/store/http_api_test.go deleted file mode 100644 index 9a059ed..0000000 --- a/store/http_api_test.go +++ /dev/null @@ -1,220 +0,0 @@ -package main - -import ( - "bfs/store/zk" - "bytes" - "encoding/json" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "os" - "strconv" - "testing" - "time" -) - -func TestHTTPAPI(t *testing.T) { - // get volume - // upload - // uploads - // delete - // deletes - var ( - client http.Client - s *Store - z *zk.Zookeeper - w *multipart.Writer - f *os.File - bw io.Writer - req *http.Request - resp *http.Response - body []byte - err error - buf = &bytes.Buffer{} - tr = &testRet{} - ) - os.Remove(testConf.Store.VolumeIndex) - os.Remove(testConf.Store.FreeVolumeIndex) - os.Remove("./test/_free_block_1") - os.Remove("./test/1_0") - defer os.Remove(testConf.Store.VolumeIndex) - defer os.Remove(testConf.Store.FreeVolumeIndex) - defer os.Remove("./test/_free_block_1") - defer os.Remove("./test/1_0") - if z, err = zk.NewZookeeper(testConf); err != nil { - t.Errorf("NewZookeeper() error(%v)", err) - t.FailNow() - } - defer z.Close() - z.DelVolume(1) - defer z.DelVolume(1) - if s, err = NewStore(testConf); err != nil { - t.Errorf("NewStore() error(%v)", err) - t.FailNow() - - } - defer s.Close() - StartAdmin("localhost:6064", &Server{store: s, conf: testConf}) - time.Sleep(1 * time.Second) - buf.Reset() - buf.WriteString("n=1&bdir=./test/&idir=./test/") - if resp, err = http.Post("http://localhost:6064/add_free_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("add_free_volume: %d", tr.Ret) - t.FailNow() - } - buf.Reset() - buf.WriteString("vid=1") - if resp, err = http.Post("http://localhost:6064/add_volume", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.FailNow() - } - StartApi("localhost:6062", &Server{store: s, conf: testConf}) - time.Sleep(1 * time.Second) - buf.Reset() - w = multipart.NewWriter(buf) - if bw, err = w.CreateFormFile("file", "./test/1.jpg"); err != nil { - t.Errorf("w.CreateFormFile() error(%v)", err) - t.FailNow() - } - if f, err = os.Open("./test/1.jpg"); err != nil { - t.Errorf("os.Open() error(%v)", err) - t.FailNow() - } - defer f.Close() - if _, err = io.Copy(bw, f); err != nil { - t.Errorf("io.Copy() error(%v)", err) - t.FailNow() - } - if err = w.WriteField("vid", "1"); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - if err = w.WriteField("key", "15"); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - if err = w.WriteField("cookie", "15"); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - w.Close() - if req, err = http.NewRequest("POST", "http://localhost:6062/upload", buf); err != nil { - t.Errorf("http..NewRequest() error(%v)", err) - t.FailNow() - } - req.Header.Set("Content-Type", w.FormDataContentType()) - if resp, err = client.Do(req); err != nil { - t.Errorf("client.Do() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("ret: %d", tr.Ret) - t.FailNow() - } - buf.Reset() - w = multipart.NewWriter(buf) - if err = w.WriteField("vid", "1"); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - for i := 1; i < 10; i++ { - if bw, err = w.CreateFormFile("file", "./test/"+strconv.Itoa(i)+".jpg"); err != nil { - t.Errorf("w.CreateFormFile() error(%v)", err) - t.FailNow() - } - if f, err = os.Open("./test/" + strconv.Itoa(i) + ".jpg"); err != nil { - t.Errorf("os.Open() error(%v)", err) - t.FailNow() - } - defer f.Close() - if _, err = io.Copy(bw, f); err != nil { - t.Errorf("io.Copy() error(%v)", err) - t.FailNow() - } - if err = w.WriteField("keys", strconv.Itoa(20+i)); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - if err = w.WriteField("cookies", strconv.Itoa(20+i)); err != nil { - t.Errorf("w.WriteField() error(%v)", err) - t.FailNow() - } - } - w.Close() - if req, err = http.NewRequest("POST", "http://localhost:6062/uploads", buf); err != nil { - t.Errorf("http..NewRequest() error(%v)", err) - t.FailNow() - } - req.Header.Set("Content-Type", w.FormDataContentType()) - if resp, err = client.Do(req); err != nil { - t.Errorf("client.Do() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.Errorf("uploads: %d", tr.Ret) - t.FailNow() - } - buf.Reset() - buf.WriteString("vid=1&key=21") - if resp, err = http.Post("http://localhost:6062/del", "application/x-www-form-urlencoded", buf); err != nil { - t.Errorf("http.Post() error(%v)", err) - t.FailNow() - } - defer resp.Body.Close() - if body, err = ioutil.ReadAll(resp.Body); err != nil { - t.Errorf("ioutil.ReadAll() error(%v)", err) - t.FailNow() - } - if err = json.Unmarshal(body, tr); err != nil { - t.Errorf("json.Unmarshal() error(%v)", err) - t.FailNow() - } - if tr.Ret != 1 { - t.FailNow() - } - // TODO add get -} diff --git a/store/http_perf.go b/store/http_perf.go index 72aa51a..c76e417 100644 --- a/store/http_perf.go +++ b/store/http_perf.go @@ -8,11 +8,10 @@ import ( // StartPprof start a golang pprof. func StartPprof(addr string) { - go func() { - var err error - if err = http.ListenAndServe(addr, nil); err != nil { - log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) - return - } - }() + var err error + if err = http.ListenAndServe(addr, nil); err != nil { + log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err) + panic(err) + return + } } diff --git a/store/http_stat.go b/store/http_stat.go index af3e743..b12eba7 100644 --- a/store/http_stat.go +++ b/store/http_stat.go @@ -14,19 +14,28 @@ const ( statDuration = 1 * time.Second ) -func StartStat(addr string, s *Server) { - var info = &stat.Info{ +func (s *Server) startStat() { + var ( + err error + serveMux = http.NewServeMux() + server = &http.Server{ + Addr: s.conf.StatListen, + Handler: serveMux, + // TODO read/write timeout + } + ) + s.info = &stat.Info{ Ver: Ver, GitSHA1: GitSHA1, StartTime: time.Now(), Stats: &stat.Stats{}, } - s.info = info - go s.startStat() - http.HandleFunc("/info", s.stat) - go func() { - http.ListenAndServe(addr, nil) - }() + go s.statproc() + serveMux.HandleFunc("/info", s.stat) + if err = server.Serve(s.statSvr); err != nil { + log.Errorf("server.Serve() error(%v)", err) + } + log.Info("http stat stop") return } @@ -58,8 +67,8 @@ func (s *Server) stat(wr http.ResponseWriter, r *http.Request) { return } -// startStat stat the store. -func (s *Server) startStat() { +// statproc stat the store. +func (s *Server) statproc() { var ( v *volume.Volume olds *stat.Stats diff --git a/store/main.go b/store/main.go index 5e8655c..c2460ab 100644 --- a/store/main.go +++ b/store/main.go @@ -16,34 +16,29 @@ func init() { func main() { var ( - c *conf.Config - s *Store - svr *Server - err error + c *conf.Config + store *Store + server *Server + err error ) flag.Parse() defer log.Flush() log.Infof("bfs store[%s] start", Ver) + defer log.Infof("bfs store[%s] stop", Ver) if c, err = conf.NewConfig(configFile); err != nil { log.Errorf("NewConfig(\"%s\") error(%v)", configFile, err) return } - log.Infof("init store...") - if s, err = NewStore(c); err != nil { + if store, err = NewStore(c); err != nil { return } - log.Infof("init http...") - svr = NewServer(s, c) - StartStat(c.StatListen, svr) - StartApi(c.ApiListen, svr) - StartAdmin(c.AdminListen, svr) - if c.Pprof { - StartPprof(c.PprofListen) + if server, err = NewServer(store, c); err != nil { + return } - if err = s.SetZookeeper(); err != nil { + if err = store.SetZookeeper(); err != nil { return } log.Infof("wait signal...") - StartSignal() + StartSignal(store, server) return } diff --git a/store/signal.go b/store/signal.go index 32c6795..6d0a7ce 100644 --- a/store/signal.go +++ b/store/signal.go @@ -8,7 +8,7 @@ import ( ) // StartSignal register signals handler. -func StartSignal() { +func StartSignal(store *Store, server *Server) { var ( c chan os.Signal s os.Signal @@ -22,6 +22,8 @@ func StartSignal() { log.Infof("get a signal %s", s.String()) switch s { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT: + server.Close() + store.Close() return case syscall.SIGHUP: // TODO reload diff --git a/store/store.go b/store/store.go index f35c59b..c25ec71 100644 --- a/store/store.go +++ b/store/store.go @@ -581,6 +581,7 @@ func (s *Store) CompactVolume(id int32) (err error) { // WARN the global variable store must first set nil and reject any other // requests then safty close. func (s *Store) Close() { + log.Info("store close") var v *volume.Volume if s.vf != nil { s.vf.Close() @@ -588,10 +589,9 @@ func (s *Store) Close() { if s.fvf != nil { s.fvf.Close() } - if s.Volumes != nil { - for _, v = range s.Volumes { - v.Close() - } + for _, v = range s.Volumes { + log.Infof("volume[%d] close", v.Id) + v.Close() } if s.zk != nil { s.zk.Close() diff --git a/store/store.toml b/store/store.toml index 9f9b2f9..976467d 100644 --- a/store/store.toml +++ b/store/store.toml @@ -64,15 +64,15 @@ Syncfilerange = true # # limit read iops speed [Limit.Read] -Rate = 150 +Rate = 150.0 Brust = 50 # limit write iops speed [Limit.Write] -Rate = 150 +Rate = 150.0 Brust = 50 # limit delete iops speed [Limit.Delete] -Rate = 150 +Rate = 150.0 Brust = 50 [Zookeeper] diff --git a/store/test_test.go b/store/test_test.go index fd7d404..d9f24d4 100644 --- a/store/test_test.go +++ b/store/test_test.go @@ -8,6 +8,10 @@ import ( var ( testConf = &conf.Config{ + Pprof: false, + AdminListen: "localhost:6063", + ApiListen: "localhost:6064", + StatListen: "localhost:6065", NeedleMaxSize: 4 * 1024 * 1024, BlockMaxSize: needle.Size(4 * 1024 * 1024), BatchMaxNum: 16, @@ -39,6 +43,20 @@ var ( SyncWrite: 10, Syncfilerange: true, }, + Limit: &conf.Limit{ + Read: &conf.Rate{ + Rate: 150.0, + Brust: 200, + }, + Write: &conf.Rate{ + Rate: 150.0, + Brust: 200, + }, + Delete: &conf.Rate{ + Rate: 150.0, + Brust: 200, + }, + }, } ) diff --git a/store/volume/volume.go b/store/volume/volume.go index 3cd65cc..21ca4f4 100644 --- a/store/volume/volume.go +++ b/store/volume/volume.go @@ -123,13 +123,6 @@ func (v *Volume) init() (err error) { }); err != nil { return } - // recheck offset, keep size and offset consistency - if v.Block.Size != needle.BlockOffset(v.Block.Offset) { - log.Errorf("block: %s [real size: %d, offset: %d] but [size: %d, offset: %d] not consistency", - v.Block.File, v.Block.Size, needle.NeedleOffset(v.Block.Size), - needle.BlockOffset(v.Block.Offset), v.Block.Offset) - return errors.ErrSuperBlockOffset - } // flush index err = v.Indexer.Flush() return @@ -172,7 +165,6 @@ func (v *Volume) read(n *needle.Needle) (err error) { size = n.TotalSize now = time.Now().UnixNano() ) - // TODO iops limit // pread syscall is atomic, no lock if err = v.Block.ReadAt(n); err != nil { return @@ -404,6 +396,9 @@ func (v *Volume) delproc() { sort.Sort(uint32Slice(offsets)) for _, offset = range offsets { now = time.Now().UnixNano() + // NOTE Modify no lock here, canuse only a atomic write + // operation but when Compact must finish the job, the cached + // offset is a old block owner. if err = v.Block.Delete(offset); err != nil { break } @@ -415,12 +410,11 @@ func (v *Volume) delproc() { } // signal exit if exit { - log.Warningf("signal volume: %d del job exit", v.Id) break } } v.wg.Done() - log.Warningf("volume: %d del job exit", v.Id) + log.Warningf("volume[%d] del job exit", v.Id) return } @@ -477,10 +471,17 @@ func (v *Volume) StopCompact(nv *Volume) (err error) { goto free } } + // NOTE MUST wait old block finish async delete operations. + v.ch <- _finish + v.wg.Wait() + // then replace old & new block/index/needles variables v.Block, nv.Block = nv.Block, v.Block v.Indexer, nv.Indexer = nv.Indexer, v.Indexer v.needles, nv.needles = nv.needles, v.needles atomic.AddUint64(&v.Stats.TotalCompactDelay, uint64(time.Now().UnixNano()-v.CompactTime)) + // NOTE MUST restart delproc job + v.wg.Add(1) + go v.delproc() } free: v.Compact = false