-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
125 lines (113 loc) · 2.67 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"bytes"
"io"
"io/ioutil"
"time"
"bfs/libs/errors"
"bfs/libs/meta"
"bfs/proxy/bfs"
"bfs/proxy/cache"
"bfs/proxy/conf"
log "github.com/golang/glog"
"golang.org/x/time/rate"
)
const _mcMaxLength = 1024 * 1024 // memcache max value is 1MB
// Service .
type Service struct {
cache *cache.Cache
bfs *bfs.Bfs
cacheChan chan func()
rl *rate.Limiter
}
// NewService new service
func NewService(c *conf.Config) (s *Service) {
s = &Service{
cache: cache.New(c.Mc, time.Duration(c.ExpireMc)),
bfs: bfs.New(c),
rl: rate.NewLimiter(rate.Limit(c.Limit.Rate), c.Limit.Brust),
cacheChan: make(chan func(), 1024),
}
go s.cacheproc()
return
}
func (s *Service) addCache(f func()) {
select {
case s.cacheChan <- f:
default:
log.Warningf("s.cacheChan is full")
}
}
func (s *Service) cacheproc() {
for f := range s.cacheChan {
f()
}
}
// Get get
func (s *Service) Get(bucket, filename string) (src io.ReadCloser, ctlen int, mtime int64, sha1, mine string, err error) {
var (
mf *meta.File
bs []byte
)
if mf, err = s.cache.Meta(bucket, filename); err == nil && mf != nil {
if bs, err = s.cache.File(bucket, filename); err == nil && len(bs) > 0 {
mtime = mf.MTime
sha1 = mf.Sha1
mine = mf.Mine
ctlen = len(bs)
src = ioutil.NopCloser(bytes.NewReader(bs))
return
}
}
if !s.rl.Allow() {
err = errors.ErrServiceUnavailable
log.Errorf("service.bfs.Get.RateLimit(%s,%s),error(%v)", bucket, filename, err)
return
}
// get from bfs
if src, ctlen, mtime, sha1, mine, err = s.bfs.Get(bucket, filename); err != nil {
log.Errorf("service.bfs.Get(%s,%s),error(%v)", bucket, filename, err)
}
return
}
// Upload upload
func (s *Service) Upload(bucket, filename, mine, sha1 string, buf []byte) (err error) {
var (
mtime = time.Now().UnixNano()
mf *meta.File
)
if err = s.bfs.Upload(bucket, filename, mine, sha1, mtime, buf); err != nil && err != errors.ErrNeedleExist {
log.Errorf("service.bfs.Upload(%s,%s),error(%s)", bucket, filename, err)
return
}
mf = &meta.File{
MTime: mtime,
Sha1: sha1,
Mine: mine,
}
if len(buf) < _mcMaxLength {
s.addCache(func() {
s.cache.SetMeta(bucket, filename, mf)
s.cache.SetFile(bucket, filename, buf)
})
}
return
}
// Delete delete
func (s *Service) Delete(bucket, filename string) (err error) {
if err = s.bfs.Delete(bucket, filename); err != nil {
log.Errorf("service.bfs.Delete(%s,%s),error(%v)", bucket, filename, err)
return
}
s.cache.DelMeta(bucket, filename)
s.cache.DelFile(bucket, filename)
return
}
// Ping .
func (s *Service) Ping() (err error) {
if err = s.bfs.Ping(); err != nil {
return
}
err = s.cache.Ping()
return
}