diff --git a/directory/directory.go b/directory/directory.go index 54d8ec7..cdee562 100644 --- a/directory/directory.go +++ b/directory/directory.go @@ -233,14 +233,14 @@ func (d *Directory) cookie() (cookie int32) { } // GetStores get readable stores for http get -func (d *Directory) GetStores(bucket, filename string) (n *meta.Needle, stores []string, err error) { +func (d *Directory) GetStores(bucket, filename string) (n *meta.Needle, f *meta.File, stores []string, err error) { var ( store string svrs []string storeMeta *meta.Store ok bool ) - if n, err = d.hBase.Get(bucket, filename); err != nil { + if n, f, err = d.hBase.Get(bucket, filename); err != nil { log.Errorf("hBase.Get error(%v)", err) if err != errors.ErrNeedleNotExist { err = errors.ErrHBase @@ -324,7 +324,7 @@ func (d *Directory) DelStores(bucket, filename string) (n *meta.Needle, stores [ svrs []string storeMeta *meta.Store ) - if n, err = d.hBase.Get(bucket, filename); err != nil { + if n, _, err = d.hBase.Get(bucket, filename); err != nil { log.Errorf("hBase.Get error(%v)", err) if err != errors.ErrNeedleNotExist { err = errors.ErrHBase diff --git a/directory/directory.toml b/directory/directory.toml index 3dc9eb4..ef84909 100644 --- a/directory/directory.toml +++ b/directory/directory.toml @@ -47,8 +47,8 @@ GroupRoot = "/group" PullInterval = "10s" [hbase] -# thrift addr localhost:9090 -Addr = "localhost:9090" +# addr 172.16.13.90:9090 +Addr = "172.16.13.90:9090" # Note that you must specify a number here. MaxActive = 100 diff --git a/directory/hbase/hbase.go b/directory/hbase/hbase.go index f9762ef..1f06549 100644 --- a/directory/hbase/hbase.go +++ b/directory/hbase/hbase.go @@ -28,6 +28,7 @@ var ( _columnSha1 = []byte("sha1") _columnMine = []byte("mine") _columnStatus = []byte("status") + // _columnUpdateTime = []byte("update_time") ) type HBaseClient struct { @@ -39,10 +40,7 @@ func NewHBaseClient() *HBaseClient { } // Get get needle from hbase -func (h *HBaseClient) Get(bucket, filename string) (n *meta.Needle, err error) { - var ( - f *meta.File - ) +func (h *HBaseClient) Get(bucket, filename string) (n *meta.Needle, f *meta.File, err error) { if f, err = h.getFile(bucket, filename); err != nil { return } @@ -113,6 +111,8 @@ func (h *HBaseClient) getNeedle(key int64) (n *meta.Needle, err error) { n.Vid = int32(binary.BigEndian.Uint32(cv.Value)) } else if bytes.Equal(cv.Qualifier, _columnCookie) { n.Cookie = int32(binary.BigEndian.Uint32(cv.Value)) + } else if bytes.Equal(cv.Qualifier, _columnUpdateTime) { + n.MTime = int64(binary.BigEndian.Uint64(cv.Value)) } } } @@ -125,6 +125,7 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) { ks []byte vbuf = make([]byte, 4) cbuf = make([]byte, 4) + ubuf = make([]byte, 8) exist bool c *hbasethrift.THBaseServiceClient ) @@ -143,6 +144,7 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) { } binary.BigEndian.PutUint32(vbuf, uint32(n.Vid)) binary.BigEndian.PutUint32(cbuf, uint32(n.Cookie)) + binary.BigEndian.PutUint64(ubuf, uint64(time.Now().UnixNano())) if err = c.Put(_table, &hbasethrift.TPut{ Row: ks, ColumnValues: []*hbasethrift.TColumnValue{ @@ -156,6 +158,11 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) { Qualifier: _columnCookie, Value: cbuf, }, + &hbasethrift.TColumnValue{ + Family: _familyBasic, + Qualifier: _columnUpdateTime, + Value: ubuf, + }, }, }); err != nil { hbasePool.Put(c, true) @@ -228,11 +235,13 @@ func (h *HBaseClient) getFile(bucket, filename string) (f *meta.File, err error) if bytes.Equal(cv.Qualifier, _columnKey) { f.Key = int64(binary.BigEndian.Uint64(cv.Value)) } else if bytes.Equal(cv.Qualifier, _columnSha1) { - f.Sha1 = cv.String() + f.Sha1 = string(cv.GetValue()) } else if bytes.Equal(cv.Qualifier, _columnMine) { - f.Mine = cv.String() + f.Mine = string(cv.GetValue()) } else if bytes.Equal(cv.Qualifier, _columnStatus) { f.Status = int32(binary.BigEndian.Uint32(cv.Value)) + } else if bytes.Equal(cv.Qualifier, _columnUpdateTime) { + f.MTime = int64(binary.BigEndian.Uint64(cv.Value)) } } } @@ -259,7 +268,8 @@ func (h *HBaseClient) putFile(bucket string, f *meta.File) (err error) { return } if exist { - hbasePool.Put(c, false) + err = h.updateFile(c, bucket, f.Filename, f.Sha1) + hbasePool.Put(c, err != nil) return errors.ErrNeedleExist } binary.BigEndian.PutUint64(kbuf, uint64(f.Key)) @@ -302,6 +312,32 @@ func (h *HBaseClient) putFile(bucket string, f *meta.File) (err error) { return } +// updateFile overwriting is bug, banned +func (h *HBaseClient) updateFile(c *hbasethrift.THBaseServiceClient, bucket, filename, sha1 string) (err error) { + var ( + ks []byte + ubuf = make([]byte, 8) + ) + ks = []byte(filename) + binary.BigEndian.PutUint64(ubuf, uint64(time.Now().UnixNano())) + err = c.Put(h.tableName(bucket), &hbasethrift.TPut{ + Row: ks, + ColumnValues: []*hbasethrift.TColumnValue{ + &hbasethrift.TColumnValue{ + Family: _familyFile, + Qualifier: _columnSha1, + Value: []byte(sha1), + }, + &hbasethrift.TColumnValue{ + Family: _familyFile, + Qualifier: _columnUpdateTime, + Value: ubuf, + }, + }, + }) + return +} + // delFile delete file from hbase.bucket_xxx. func (h *HBaseClient) delFile(bucket, filename string) (err error) { var ( diff --git a/directory/hbase/hbase_b_test.go b/directory/hbase/hbase_b_test.go index f7667dd..2eb8fce 100644 --- a/directory/hbase/hbase_b_test.go +++ b/directory/hbase/hbase_b_test.go @@ -15,7 +15,7 @@ func BenchmarkHbasePut(b *testing.B) { t int64 ) ch := make(chan int64, 1000000) - if err = Init("localhost:9090", 5*time.Second, 200, 200); err != nil { + if err = Init("172.16.13.90:9090", 5*time.Second, 200, 200); err != nil { b.Errorf("Init failed") b.FailNow() } @@ -43,7 +43,7 @@ func BenchmarkHbaseGet(b *testing.B) { t int64 r *rand.Rand ) - if err = Init("localhost:9090", 5*time.Second, 200, 200); err != nil { + if err = Init("172.16.13.90:9090", 5*time.Second, 200, 200); err != nil { b.Errorf("Init failed") b.FailNow() } diff --git a/directory/hbase/hbase_test.go b/directory/hbase/hbase_test.go index 8d6d140..6590ffd 100644 --- a/directory/hbase/hbase_test.go +++ b/directory/hbase/hbase_test.go @@ -12,7 +12,7 @@ func TestHbase(t *testing.T) { err error m, n *meta.Needle ) - if err = Init("localhost:9090", 5*time.Second, 10, 10); err != nil { + if err = Init("172.16.13.90:9090", 5*time.Second, 10, 10); err != nil { t.Errorf("Init failed") t.FailNow() } diff --git a/directory/http_api.go b/directory/http_api.go index c4d6b01..405e2ea 100644 --- a/directory/http_api.go +++ b/directory/http_api.go @@ -39,13 +39,14 @@ func StartApi(addr string, d *Directory) { func (s *server) get(wr http.ResponseWriter, r *http.Request) { var ( - err error - n *meta.Needle + ok bool bucket string filename string res meta.Response - ok bool + n *meta.Needle + f *meta.File uerr errors.Error + err error ) if r.Method != "GET" { http.Error(wr, "method not allowed", http.StatusMethodNotAllowed) @@ -60,7 +61,7 @@ func (s *server) get(wr http.ResponseWriter, r *http.Request) { return } defer HttpGetWriter(r, wr, time.Now(), &res) - if n, res.Stores, err = s.d.GetStores(bucket, filename); err != nil { + if n, f, res.Stores, err = s.d.GetStores(bucket, filename); err != nil { log.Errorf("GetStores() error(%v)", err) if uerr, ok = err.(errors.Error); ok { res.Ret = int(uerr) @@ -73,6 +74,13 @@ func (s *server) get(wr http.ResponseWriter, r *http.Request) { res.Key = n.Key res.Cookie = n.Cookie res.Vid = n.Vid + res.Mine = f.Mine + if f.MTime != 0 { + res.MTime = f.MTime + } else { + res.MTime = n.MTime + } + res.Sha1 = f.Sha1 return } @@ -114,7 +122,7 @@ func (s *server) upload(wr http.ResponseWriter, r *http.Request) { if err == errors.ErrNeedleExist { // update file data res.Ret = errors.RetNeedleExist - if n, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil { + if n, _, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil { log.Errorf("GetStores() error(%v)", err) if uerr, ok = err.(errors.Error); ok { res.Ret = int(uerr) diff --git a/directory/http_api_test.go b/directory/http_api_test.go index a9f569f..b4cbd03 100644 --- a/directory/http_api_test.go +++ b/directory/http_api_test.go @@ -29,7 +29,7 @@ func TestHTTPAPI(t *testing.T) { t.FailNow() } - if zk, err = NewZookeeper([]string{"localhost:2181"}, time.Second*15, "/rack", "/volume", "/group"); err != nil { + if zk, err = NewZookeeper([]string{"123.56.108.22:2181"}, time.Second*15, "/rack", "/volume", "/group"); err != nil { t.Errorf("NewZookeeper() error(%v)", err) t.FailNow() } @@ -41,7 +41,7 @@ func TestHTTPAPI(t *testing.T) { time.Sleep(1 * time.Second) buf.Reset() buf.WriteString("num=1") - if resp, err = http.Post("http://localhost:6065/upload", "application/x-www-form-urlencoded", buf); err != nil { + if resp, err = http.Post("http://172.16.13.86:6065/upload", "application/x-www-form-urlencoded", buf); err != nil { t.Errorf("http.Post error(%v)", err) t.FailNow() } @@ -62,7 +62,7 @@ func TestHTTPAPI(t *testing.T) { cookie = res.Cookie fmt.Println("put vid:", res.Vid) buf.Reset() - url = fmt.Sprintf("http://localhost:6065/get?key=%d&cookie=%d", key, cookie) + url = fmt.Sprintf("http://172.16.13.86:6065/get?key=%d&cookie=%d", key, cookie) if resp, err = http.Get(url); err != nil { t.Errorf("http ERROR error(%v)", err) t.FailNow() @@ -82,7 +82,7 @@ func TestHTTPAPI(t *testing.T) { fmt.Println("get vid:", res.Vid) buf.Reset() buf.WriteString(fmt.Sprintf("key=%d&cookie=%d", key, cookie)) - if resp, err = http.Post("http://localhost:6065/del", "application/x-www-form-urlencoded", buf); err != nil { + if resp, err = http.Post("http://172.16.13.86:6065/del", "application/x-www-form-urlencoded", buf); err != nil { t.Errorf("http.Post error(%v)", err) t.FailNow() }