Skip to content

Commit

Permalink
Mod: DataListener move to remoting
Browse files Browse the repository at this point in the history
  • Loading branch information
hxmhlt committed Jun 10, 2019
1 parent cfc316c commit 0039ac6
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 31 deletions.
8 changes: 3 additions & 5 deletions config_center/dynamic_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package config_center

import (
"github.com/apache/dubbo-go/remoting"
"time"
)
import (
"github.com/apache/dubbo-go/common"
)

//////////////////////////////////////////
// DynamicConfiguration
Expand All @@ -31,8 +29,8 @@ const DEFAULT_GROUP = "dubbo"
const DEFAULT_CONFIG_TIMEOUT = "10s"

type DynamicConfiguration interface {
AddListener(string, common.ConfigurationListener, ...Option)
RemoveListener(string, common.ConfigurationListener, ...Option)
AddListener(string, remoting.ConfigurationListener, ...Option)
RemoveListener(string, remoting.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
}
Expand Down
5 changes: 3 additions & 2 deletions config_center/zookeeper/dynamic_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
"github.com/apache/dubbo-go/remoting"
"sync"
)
import (
Expand Down Expand Up @@ -61,11 +62,11 @@ func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConf

}

func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {

}

func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {

}

Expand Down
5 changes: 3 additions & 2 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package directory

import (
"github.com/apache/dubbo-go/remoting"
"sync"
"time"
)
Expand Down Expand Up @@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {

switch res.Action {
case common.Add:
case remoting.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case common.Del:
case remoting.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
Expand Down
9 changes: 5 additions & 4 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package directory

import (
"context"
"github.com/apache/dubbo-go/remoting"
"net/url"
"strconv"
"testing"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
Expand Down Expand Up @@ -80,15 +81,15 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap))})
}
//for group2
urlmap2 := url.Values{}
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))})
}

Expand Down Expand Up @@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {

go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
3 changes: 2 additions & 1 deletion registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package registry

import (
"fmt"
"github.com/apache/dubbo-go/remoting"
"math/rand"
"time"
)
Expand All @@ -36,7 +37,7 @@ func init() {
//////////////////////////////////////////

type ServiceEvent struct {
Action common.EventType
Action remoting.EventType
Service common.URL
}

Expand Down
13 changes: 7 additions & 6 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package zookeeper

import (
"context"
"github.com/apache/dubbo-go/remoting"
)
import (
perrors "github.com/pkg/errors"
Expand All @@ -42,15 +43,15 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
Expand All @@ -61,14 +62,14 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *common.ConfigChangeEvent
events chan *remoting.ConfigChangeEvent
}

func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)}
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) {
func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}

Expand All @@ -85,7 +86,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {

case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == common.Del && !l.valid() {
if e.ConfigType == remoting.Del && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion common/configuration_listener.go → remoting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package common
package remoting

import "fmt"

Expand Down
21 changes: 11 additions & 10 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
"github.com/apache/dubbo-go/remoting"
"path"
"sync"
"time"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}

func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener common.DataListener) {
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
Expand Down Expand Up @@ -111,15 +112,15 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
// listen l service node
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete content{%s}", n)
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
Expand All @@ -134,19 +135,19 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
logger.Warnf("delete content{%s}", n)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
}

func (l *ZkEventListener) listenDirEvent(zkPath string, listener common.DataListener) {
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()

Expand Down Expand Up @@ -216,7 +217,7 @@ func timeSecondDuration(sec int) time.Duration {
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.DataListener) {
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
Expand Down Expand Up @@ -244,7 +245,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
}

for _, c := range children {
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: c}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
continue
}

Expand All @@ -254,14 +255,14 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: c})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}

logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener common.DataListener) {
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
Expand Down

0 comments on commit 0039ac6

Please sign in to comment.