Skip to content

Commit

Permalink
refactor directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed Jun 15, 2016
1 parent 420744f commit 97ce4b8
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 24 deletions.
6 changes: 3 additions & 3 deletions directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ func (d *Directory) cookie() (cookie int32) {
}

// GetStores get readable stores for http get
func (d *Directory) GetStores(bucket, filename string) (n *meta.Needle, stores []string, err error) {
func (d *Directory) GetStores(bucket, filename string) (n *meta.Needle, f *meta.File, stores []string, err error) {
var (
store string
svrs []string
storeMeta *meta.Store
ok bool
)
if n, err = d.hBase.Get(bucket, filename); err != nil {
if n, f, err = d.hBase.Get(bucket, filename); err != nil {
log.Errorf("hBase.Get error(%v)", err)
if err != errors.ErrNeedleNotExist {
err = errors.ErrHBase
Expand Down Expand Up @@ -324,7 +324,7 @@ func (d *Directory) DelStores(bucket, filename string) (n *meta.Needle, stores [
svrs []string
storeMeta *meta.Store
)
if n, err = d.hBase.Get(bucket, filename); err != nil {
if n, _, err = d.hBase.Get(bucket, filename); err != nil {
log.Errorf("hBase.Get error(%v)", err)
if err != errors.ErrNeedleNotExist {
err = errors.ErrHBase
Expand Down
4 changes: 2 additions & 2 deletions directory/directory.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ GroupRoot = "/group"
PullInterval = "10s"

[hbase]
# thrift addr localhost:9090
Addr = "localhost:9090"
# addr 172.16.13.90:9090
Addr = "172.16.13.90:9090"

# Note that you must specify a number here.
MaxActive = 100
Expand Down
50 changes: 43 additions & 7 deletions directory/hbase/hbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
_columnSha1 = []byte("sha1")
_columnMine = []byte("mine")
_columnStatus = []byte("status")
// _columnUpdateTime = []byte("update_time")
)

type HBaseClient struct {
Expand All @@ -39,10 +40,7 @@ func NewHBaseClient() *HBaseClient {
}

// Get get needle from hbase
func (h *HBaseClient) Get(bucket, filename string) (n *meta.Needle, err error) {
var (
f *meta.File
)
func (h *HBaseClient) Get(bucket, filename string) (n *meta.Needle, f *meta.File, err error) {
if f, err = h.getFile(bucket, filename); err != nil {
return
}
Expand Down Expand Up @@ -113,6 +111,8 @@ func (h *HBaseClient) getNeedle(key int64) (n *meta.Needle, err error) {
n.Vid = int32(binary.BigEndian.Uint32(cv.Value))
} else if bytes.Equal(cv.Qualifier, _columnCookie) {
n.Cookie = int32(binary.BigEndian.Uint32(cv.Value))
} else if bytes.Equal(cv.Qualifier, _columnUpdateTime) {
n.MTime = int64(binary.BigEndian.Uint64(cv.Value))
}
}
}
Expand All @@ -125,6 +125,7 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) {
ks []byte
vbuf = make([]byte, 4)
cbuf = make([]byte, 4)
ubuf = make([]byte, 8)
exist bool
c *hbasethrift.THBaseServiceClient
)
Expand All @@ -143,6 +144,7 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) {
}
binary.BigEndian.PutUint32(vbuf, uint32(n.Vid))
binary.BigEndian.PutUint32(cbuf, uint32(n.Cookie))
binary.BigEndian.PutUint64(ubuf, uint64(time.Now().UnixNano()))
if err = c.Put(_table, &hbasethrift.TPut{
Row: ks,
ColumnValues: []*hbasethrift.TColumnValue{
Expand All @@ -156,6 +158,11 @@ func (h *HBaseClient) putNeedle(n *meta.Needle) (err error) {
Qualifier: _columnCookie,
Value: cbuf,
},
&hbasethrift.TColumnValue{
Family: _familyBasic,
Qualifier: _columnUpdateTime,
Value: ubuf,
},
},
}); err != nil {
hbasePool.Put(c, true)
Expand Down Expand Up @@ -228,11 +235,13 @@ func (h *HBaseClient) getFile(bucket, filename string) (f *meta.File, err error)
if bytes.Equal(cv.Qualifier, _columnKey) {
f.Key = int64(binary.BigEndian.Uint64(cv.Value))
} else if bytes.Equal(cv.Qualifier, _columnSha1) {
f.Sha1 = cv.String()
f.Sha1 = string(cv.GetValue())
} else if bytes.Equal(cv.Qualifier, _columnMine) {
f.Mine = cv.String()
f.Mine = string(cv.GetValue())
} else if bytes.Equal(cv.Qualifier, _columnStatus) {
f.Status = int32(binary.BigEndian.Uint32(cv.Value))
} else if bytes.Equal(cv.Qualifier, _columnUpdateTime) {
f.MTime = int64(binary.BigEndian.Uint64(cv.Value))
}
}
}
Expand All @@ -259,7 +268,8 @@ func (h *HBaseClient) putFile(bucket string, f *meta.File) (err error) {
return
}
if exist {
hbasePool.Put(c, false)
err = h.updateFile(c, bucket, f.Filename, f.Sha1)
hbasePool.Put(c, err != nil)
return errors.ErrNeedleExist
}
binary.BigEndian.PutUint64(kbuf, uint64(f.Key))
Expand Down Expand Up @@ -302,6 +312,32 @@ func (h *HBaseClient) putFile(bucket string, f *meta.File) (err error) {
return
}

// updateFile overwriting is bug, banned
func (h *HBaseClient) updateFile(c *hbasethrift.THBaseServiceClient, bucket, filename, sha1 string) (err error) {
var (
ks []byte
ubuf = make([]byte, 8)
)
ks = []byte(filename)
binary.BigEndian.PutUint64(ubuf, uint64(time.Now().UnixNano()))
err = c.Put(h.tableName(bucket), &hbasethrift.TPut{
Row: ks,
ColumnValues: []*hbasethrift.TColumnValue{
&hbasethrift.TColumnValue{
Family: _familyFile,
Qualifier: _columnSha1,
Value: []byte(sha1),
},
&hbasethrift.TColumnValue{
Family: _familyFile,
Qualifier: _columnUpdateTime,
Value: ubuf,
},
},
})
return
}

// delFile delete file from hbase.bucket_xxx.
func (h *HBaseClient) delFile(bucket, filename string) (err error) {
var (
Expand Down
4 changes: 2 additions & 2 deletions directory/hbase/hbase_b_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func BenchmarkHbasePut(b *testing.B) {
t int64
)
ch := make(chan int64, 1000000)
if err = Init("localhost:9090", 5*time.Second, 200, 200); err != nil {
if err = Init("172.16.13.90:9090", 5*time.Second, 200, 200); err != nil {
b.Errorf("Init failed")
b.FailNow()
}
Expand Down Expand Up @@ -43,7 +43,7 @@ func BenchmarkHbaseGet(b *testing.B) {
t int64
r *rand.Rand
)
if err = Init("localhost:9090", 5*time.Second, 200, 200); err != nil {
if err = Init("172.16.13.90:9090", 5*time.Second, 200, 200); err != nil {
b.Errorf("Init failed")
b.FailNow()
}
Expand Down
2 changes: 1 addition & 1 deletion directory/hbase/hbase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestHbase(t *testing.T) {
err error
m, n *meta.Needle
)
if err = Init("localhost:9090", 5*time.Second, 10, 10); err != nil {
if err = Init("172.16.13.90:9090", 5*time.Second, 10, 10); err != nil {
t.Errorf("Init failed")
t.FailNow()
}
Expand Down
18 changes: 13 additions & 5 deletions directory/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func StartApi(addr string, d *Directory) {

func (s *server) get(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
ok bool
bucket string
filename string
res meta.Response
ok bool
n *meta.Needle
f *meta.File
uerr errors.Error
err error
)
if r.Method != "GET" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
Expand All @@ -60,7 +61,7 @@ func (s *server) get(wr http.ResponseWriter, r *http.Request) {
return
}
defer HttpGetWriter(r, wr, time.Now(), &res)
if n, res.Stores, err = s.d.GetStores(bucket, filename); err != nil {
if n, f, res.Stores, err = s.d.GetStores(bucket, filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
Expand All @@ -73,6 +74,13 @@ func (s *server) get(wr http.ResponseWriter, r *http.Request) {
res.Key = n.Key
res.Cookie = n.Cookie
res.Vid = n.Vid
res.Mine = f.Mine
if f.MTime != 0 {
res.MTime = f.MTime
} else {
res.MTime = n.MTime
}
res.Sha1 = f.Sha1
return
}

Expand Down Expand Up @@ -114,7 +122,7 @@ func (s *server) upload(wr http.ResponseWriter, r *http.Request) {
if err == errors.ErrNeedleExist {
// update file data
res.Ret = errors.RetNeedleExist
if n, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil {
if n, _, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
Expand Down
8 changes: 4 additions & 4 deletions directory/http_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestHTTPAPI(t *testing.T) {
t.FailNow()
}

if zk, err = NewZookeeper([]string{"localhost:2181"}, time.Second*15, "/rack", "/volume", "/group"); err != nil {
if zk, err = NewZookeeper([]string{"123.56.108.22:2181"}, time.Second*15, "/rack", "/volume", "/group"); err != nil {
t.Errorf("NewZookeeper() error(%v)", err)
t.FailNow()
}
Expand All @@ -41,7 +41,7 @@ func TestHTTPAPI(t *testing.T) {
time.Sleep(1 * time.Second)
buf.Reset()
buf.WriteString("num=1")
if resp, err = http.Post("http://localhost:6065/upload", "application/x-www-form-urlencoded", buf); err != nil {
if resp, err = http.Post("http://172.16.13.86:6065/upload", "application/x-www-form-urlencoded", buf); err != nil {
t.Errorf("http.Post error(%v)", err)
t.FailNow()
}
Expand All @@ -62,7 +62,7 @@ func TestHTTPAPI(t *testing.T) {
cookie = res.Cookie
fmt.Println("put vid:", res.Vid)
buf.Reset()
url = fmt.Sprintf("http://localhost:6065/get?key=%d&cookie=%d", key, cookie)
url = fmt.Sprintf("http://172.16.13.86:6065/get?key=%d&cookie=%d", key, cookie)
if resp, err = http.Get(url); err != nil {
t.Errorf("http ERROR error(%v)", err)
t.FailNow()
Expand All @@ -82,7 +82,7 @@ func TestHTTPAPI(t *testing.T) {
fmt.Println("get vid:", res.Vid)
buf.Reset()
buf.WriteString(fmt.Sprintf("key=%d&cookie=%d", key, cookie))
if resp, err = http.Post("http://localhost:6065/del", "application/x-www-form-urlencoded", buf); err != nil {
if resp, err = http.Post("http://172.16.13.86:6065/del", "application/x-www-form-urlencoded", buf); err != nil {
t.Errorf("http.Post error(%v)", err)
t.FailNow()
}
Expand Down

0 comments on commit 97ce4b8

Please sign in to comment.