Skip to content

Commit

Permalink
Merge remote-tracking branch 'gitlab/feature/fix_discovery'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Mar 31, 2023
2 parents e2947f1 + e80cfcd commit c98d591
Show file tree
Hide file tree
Showing 78 changed files with 1,551 additions and 1,159 deletions.
124 changes: 42 additions & 82 deletions discovery/app.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,66 @@
package discovery

import (
"github.com/google/uuid"
"github.com/eolinker/eosc/eocontext"
"sync"
"sync/atomic"
)

type app struct {
id string
nodes map[string]INode
healthChecker IHealthChecker
attrs Attrs
locker sync.RWMutex
container IAppContainer
}

// Reset 重置app的节点列表
func (s *app) Reset(nodes Nodes) {

tmp := make(map[string]INode)
var (
_ IAppAgent = (*_AppAgent)(nil)
_ IApp = (*_App)(nil)
)

for _, node := range nodes {
type IAppAgent interface {
reset(nodes []eocontext.INode)
Agent() IApp
}

if n, has := s.nodes[node.ID()]; has {
n.Leave()
}
tmp[node.ID()] = node
type IApp interface {
Nodes() []eocontext.INode
Close()
}

}
s.locker.Lock()
s.nodes = tmp
s.locker.Unlock()
type _AppAgent struct {
locker sync.RWMutex
nodes []eocontext.INode
use int64
}

// GetAttrs 获取app的属性集合
func (s *app) GetAttrs() Attrs {
s.locker.RLock()
defer s.locker.RUnlock()
return s.attrs
func (a *_AppAgent) Agent() IApp {
atomic.AddInt64(&a.use, 1)
return &_App{_AppAgent: a, isClose: 0}
}

// GetAttrByName 通过属性名获取app对应属性
func (s *app) GetAttrByName(name string) (string, bool) {
s.locker.RLock()
defer s.locker.RUnlock()
attr, ok := s.attrs[name]
return attr, ok
type _App struct {
*_AppAgent

isClose int32
}

// NewApp 创建服务发现app
func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes Nodes) IApp {
if attrs == nil {
attrs = make(Attrs)
func (a *_App) Close() {
if atomic.SwapInt32(&a.isClose, 1) == 0 {
atomic.AddInt64(&a.use, -1)
}
return &app{
attrs: attrs,
nodes: nodes,
locker: sync.RWMutex{},
healthChecker: checker,
id: uuid.NewString(),
container: container,
}
}

// ID 返回app的id
func (s *app) ID() string {
return s.id
}

// Nodes 将运行中的节点列表返回
func (s *app) Nodes() []INode {
s.locker.RLock()
defer s.locker.RUnlock()
nodes := make([]INode, 0, len(s.nodes))
for _, node := range s.nodes {
if node.Status() != Running {
continue
}
nodes = append(nodes, node)
}
return nodes
func newApp(nodes []eocontext.INode) *_AppAgent {

return &_AppAgent{nodes: nodes}
}

// NodeError 定时检查节点,当节点失败时,则返回错误
func (s *app) NodeError(id string) error {
s.locker.Lock()
defer s.locker.Unlock()
if n, ok := s.nodes[id]; ok {
n.Down()
if s.healthChecker != nil {
err := s.healthChecker.AddToCheck(n)
return err
}
}
return nil
func (a *_AppAgent) reset(nodes []eocontext.INode) {

a.locker.Lock()
defer a.locker.Unlock()
a.nodes = nodes
}

// Close 关闭服务发现的app
func (s *app) Close() error {
//
s.container.Remove(s.id)
if s.healthChecker != nil {
return s.healthChecker.Stop()
}
return nil
func (a *_AppAgent) Nodes() []eocontext.INode {
a.locker.RLock()
defer a.locker.RUnlock()
l := make([]eocontext.INode, len(a.nodes))
copy(l, a.nodes)
return l
}
8 changes: 8 additions & 0 deletions discovery/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package discovery

// IHealthChecker 健康检查接口
type IHealthChecker interface {
Check(nodes INodes)
Reset(conf interface{}) error
Stop()
}
14 changes: 0 additions & 14 deletions discovery/checker.go

This file was deleted.

181 changes: 150 additions & 31 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,174 @@ package discovery

import (
"errors"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/eocontext"
"sync"
"sync/atomic"
"time"
)

var (
ErrDiscoveryDown = errors.New("discovery down")
)

//CheckSkill 检查目标技能是否符合
// IDiscovery 服务发现接口
type IDiscovery interface {
GetApp(config string) (IApp, error)
}

// CheckSkill 检查目标技能是否符合
func CheckSkill(skill string) bool {
return skill == "github.com/eolinker/apinto/discovery.discovery.IDiscovery"
}

//IDiscovery 服务发现接口
type IDiscovery interface {
GetApp(config string) (IApp, error)
type NodeInfo struct {
Ip string
Port int
Labels map[string]string
}

//IApp app接口
type IApp interface {
IAttributes
ID() string
Nodes() []INode
Reset(nodes Nodes)
NodeError(id string) error
Close() error
type IAppContainer interface {
INodes
Set(id string, info []NodeInfo) (app IAppAgent)
Reset(info map[string][]NodeInfo)
GetApp(id string) (IAppAgent, bool)
Keys() []string
}

//IAppContainer app容器接口
type IAppContainer interface {
Remove(id string) error
type appContainer struct {
lock sync.RWMutex
nodes eosc.Untyped[string, INode]
apps map[string]*_AppAgent
isHealthCheck int32
}

func (ac *appContainer) status(status NodeStatus) NodeStatus {

if atomic.LoadInt32(&ac.isHealthCheck) > 0 {
return status
}
return Running
}

//INode 节点接口
type INode = eocontext.INode
func (ac *appContainer) SetHealthCheck(isHealthCheck bool) {
if isHealthCheck {
atomic.StoreInt32(&ac.isHealthCheck, 1)
} else {
atomic.StoreInt32(&ac.isHealthCheck, 0)

//Attrs 属性集合
type Attrs = eocontext.Attrs
}
}

//IAttributes 属性接口
type IAttributes = eocontext.IAttributes
func NewAppContainer() IAppContainer {

//NodeStatus 节点状态类型
type NodeStatus = eocontext.NodeStatus
return &appContainer{
apps: make(map[string]*_AppAgent),
nodes: eosc.BuildUntyped[string, INode](),
}
}

const (
//Running 节点运行中状态
Running = eocontext.Running
//Down 节点不可用状态
Down = eocontext.Down
//Leave 节点离开状态
Leave = eocontext.Leave
)
func (ac *appContainer) Keys() []string {

ac.lock.RLock()
defer ac.lock.RUnlock()
if ac.apps == nil {
return nil
}
keys := make([]string, 0, len(ac.apps))
for k := range ac.apps {
keys = append(keys, k)
}
return keys
}

func (ac *appContainer) create(infos []NodeInfo) []eocontext.INode {
ns := make([]eocontext.INode, 0, len(infos))
for _, i := range infos {

n := ac.Get(i.Ip, i.Port)
ns = append(ns, NewNode(n, i.Labels))
}
return ns
}
func (ac *appContainer) Set(name string, infos []NodeInfo) IAppAgent {

ns := ac.create(infos)
ac.lock.RLock()
app, has := ac.apps[name]
ac.lock.RUnlock()
if has {
app.reset(ns)
return app
}

ac.lock.Lock()

app, has = ac.apps[name]
needCheck := false
if !has {
if len(ac.apps) == 0 {
needCheck = true
}
app = newApp(ns)
ac.apps[name] = app
}
ac.lock.Unlock()
if needCheck {
go ac.doCheck()
}
return app
}
func (ac *appContainer) doCheck() {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for range t.C {

ac.lock.Lock()
if len(ac.apps) == 0 {
ac.lock.Unlock()
return
}
for key, app := range ac.apps {
if atomic.LoadInt64(&app.use) <= 0 {
delete(ac.apps, key)
}
}

nodeUse := make(map[string]int)

for _, app := range ac.apps {
for _, n := range app.nodes {
nodeUse[n.ID()] += 1
}
}
nodeList := ac.nodes.List()
for _, n := range nodeList {
if nodeUse[n.ID()] == 0 {
ac.nodes.Del(n.ID())
}
}
ac.lock.Unlock()
}

}

func (ac *appContainer) Reset(infos map[string][]NodeInfo) {
nm := make(map[string]*_AppAgent)
for name, info := range infos {
nm[name] = newApp(ac.create(info))
}
ac.lock.Lock()
ac.apps = nm
ac.lock.Unlock()

}

func (ac *appContainer) GetApp(name string) (IAppAgent, bool) {
ac.lock.RLock()
defer ac.lock.RUnlock()
app, has := ac.apps[name]

return app, has

}
Loading

0 comments on commit c98d591

Please sign in to comment.