Skip to content

Commit

Permalink
modify directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed May 12, 2016
1 parent fe1481f commit 6474257
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 112 deletions.
143 changes: 71 additions & 72 deletions directory/conf/config.go
Original file line number Diff line number Diff line change
@@ -1,72 +1,71 @@
package conf

import (
"github.com/BurntSushi/toml"
"io/ioutil"
"os"
"time"
)

type Config struct {
Snowflake *Snowflake
Zookeeper *Zookeeper
HBase *HBase

MaxNum int
ApiListen string
PprofEnable bool
PprofListen string
}

type Snowflake struct {
ZkAddrs []string
ZkTimeout duration
ZkPath string
WorkId int64
}

type Zookeeper struct {
Addrs []string
Timeout duration
PullInterval duration
VolumeRoot string
StoreRoot string
GroupRoot string
}

type HBase struct {
Addr string
MaxActive int
MaxIdle int
Timeout duration
LvsTimeout duration
Framed bool
}

// Code to implement the TextUnmarshaler interface for `duration`:
type duration struct {
time.Duration
}

func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

// NewConfig new a config.
func NewConfig(conf string) (c *Config, err error) {
var (
file *os.File
blob []byte
)
c = new(Config)
if file, err = os.Open(conf); err != nil {
return
}
if blob, err = ioutil.ReadAll(file); err != nil {
return
}
err = toml.Unmarshal(blob, c)
return
}
package conf

import (
"github.com/BurntSushi/toml"
"io/ioutil"
"os"
"time"
)

type Config struct {
Snowflake *Snowflake
Zookeeper *Zookeeper
HBase *HBase

MaxNum int
ApiListen string
PprofEnable bool
PprofListen string
}

type Snowflake struct {
ZkAddrs []string
ZkTimeout duration
ZkPath string
WorkId int64
}

type Zookeeper struct {
Addrs []string
Timeout duration
PullInterval duration
VolumeRoot string
StoreRoot string
GroupRoot string
}

type HBase struct {
Addr string
MaxActive int
MaxIdle int
Timeout duration
LvsTimeout duration
}

// Code to implement the TextUnmarshaler interface for `duration`:
type duration struct {
time.Duration
}

func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

// NewConfig new a config.
func NewConfig(conf string) (c *Config, err error) {
var (
file *os.File
blob []byte
)
c = new(Config)
if file, err = os.Open(conf); err != nil {
return
}
if blob, err = ioutil.ReadAll(file); err != nil {
return
}
err = toml.Unmarshal(blob, c)
return
}
Binary file removed directory/directory
Binary file not shown.
2 changes: 0 additions & 2 deletions directory/directory.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,3 @@ MaxIdle = 100

# Note that you must specify a number here.
Timeout = "1s"

Framed = false
10 changes: 8 additions & 2 deletions directory/hbase/hbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func (h *HBaseClient) Get(bucket, filename string) (n *meta.Needle, err error) {
if f, err = h.getFile(bucket, filename); err != nil {
return
}
n, err = h.getNeedle(f.Key)
if n, err = h.getNeedle(f.Key); err == errors.ErrNeedleNotExist {
log.Warningf("table not match: bucket: %s filename: %s", bucket, filename)
h.delFile(bucket, filename)
}
return
}

Expand All @@ -55,7 +58,10 @@ func (h *HBaseClient) Put(bucket string, f *meta.File, n *meta.Needle) (err erro
if err = h.putFile(bucket, f); err != nil {
return
}
err = h.putNeedle(n)
if err = h.putNeedle(n); err != errors.ErrNeedleExist && err != nil {
log.Warningf("table not match: bucket: %s filename: %s", bucket, f.Filename)
h.delFile(bucket, f.Filename)
}
return
}

Expand Down
6 changes: 1 addition & 5 deletions directory/hbase/hpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ func Init(config *conf.Config) error {
log.Error("thrift.NewTSocketTimeout error(%v)", err)
return
}

if config.HBase.Framed {
trans = thrift.NewTFramedTransport(trans)
}

trans = thrift.NewTFramedTransport(trans)
c = hbasethrift.NewTHBaseServiceClientFactory(trans, thrift.NewTBinaryProtocolFactoryDefault())
if err = trans.Open(); err != nil {
log.Error("trans.Open error(%v)", err)
Expand Down
48 changes: 17 additions & 31 deletions directory/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ const (
_pingOk = 0
)

type server struct {
d *Directory
}

// StartApi start api http listen.
func StartApi(addr string, d *Directory) {
var s = &server{d: d}
go func() {
var (
err error
serveMux = http.NewServeMux()
)
serveMux.Handle("/get", httpGetHandler{d: d})
serveMux.Handle("/upload", httpUploadHandler{d: d})
serveMux.Handle("/del", httpDelHandler{d: d})
serveMux.Handle("/ping", httpPingHandler{})
serveMux.HandleFunc("/get", s.get)
serveMux.HandleFunc("/upload", s.upload)
serveMux.HandleFunc("/del", s.del)
serveMux.HandleFunc("/ping", s.ping)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
Expand All @@ -32,12 +37,7 @@ func StartApi(addr string, d *Directory) {
return
}

// httpGetHandler http upload a file.
type httpGetHandler struct {
d *Directory
}

func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *server) get(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
Expand All @@ -60,7 +60,7 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}
defer HttpGetWriter(r, wr, time.Now(), &res)
if n, res.Stores, err = h.d.GetStores(bucket, filename); err != nil {
if n, res.Stores, err = s.d.GetStores(bucket, filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
Expand All @@ -76,12 +76,7 @@ func (h httpGetHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}

// httpUploadHandler http upload a file.
type httpUploadHandler struct {
d *Directory
}

func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *server) upload(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
Expand Down Expand Up @@ -115,11 +110,11 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
defer HttpUploadWriter(r, wr, time.Now(), &res)

res.Ret = errors.RetOK
if n, res.Stores, err = h.d.UploadStores(bucket, f); err != nil {
if n, res.Stores, err = s.d.UploadStores(bucket, f); err != nil {
if err == errors.ErrNeedleExist {
// update file data
res.Ret = errors.RetNeedleExist
if n, res.Stores, err = h.d.GetStores(bucket, f.Filename); err != nil {
if n, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
Expand All @@ -144,12 +139,7 @@ func (h httpUploadHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}

// httpDelHandler
type httpDelHandler struct {
d *Directory
}

func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *server) del(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
Expand All @@ -172,7 +162,7 @@ func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}
defer HttpDelWriter(r, wr, time.Now(), &res)
if n, res.Stores, err = h.d.DelStores(bucket, filename); err != nil {
if n, res.Stores, err = s.d.DelStores(bucket, filename); err != nil {
log.Errorf("DelStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
Expand All @@ -188,11 +178,7 @@ func (h httpDelHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
return
}

// httpPingHandler http ping health
type httpPingHandler struct {
}

func (h httpPingHandler) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
func (s *server) ping(wr http.ResponseWriter, r *http.Request) {
var (
byteJson []byte
res = map[string]interface{}{"code": _pingOk}
Expand Down
Binary file removed doc/bfs.graffle
Binary file not shown.

0 comments on commit 6474257

Please sign in to comment.