Skip to content

Commit

Permalink
fix: send to closed chan in discovery (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Sep 18, 2023
1 parent df70525 commit 4657262
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ require (
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v20.10.11+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/dubbogo/tools v1.0.9 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-errors/errors v1.0.1 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/boot/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ import (
)

func TestFileProvider(t *testing.T) {
os.Setenv(constants.EnvConfigPath, testdata.Path("fake_config.yaml"))
prevEnv := os.Getenv(constants.EnvConfigPath)
defer func() {
_ = os.Setenv(constants.EnvConfigPath, prevEnv)
}()
_ = os.Setenv(constants.EnvConfigPath, testdata.Path("fake_config.yaml"))

provider := NewDiscovery(testdata.Path("fake_bootstrap.yaml"))

err := Boot(context.Background(), provider)
Expand Down
48 changes: 22 additions & 26 deletions pkg/boot/discovery_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ func (fp *discovery) WatchTenants(ctx context.Context) (<-chan config.TenantsEve
ch := make(chan config.TenantsEvent)

cancel := fp.tenantOp.Subscribe(ctx, func(e config.Event) {
ch <- *e.(*config.TenantsEvent)
safeSendCh(ch, *e.(*config.TenantsEvent))
})

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func (fp *discovery) WatchNodes(ctx context.Context, tenant string) (<-chan config.NodesEvent, context.CancelFunc, error) {
Expand All @@ -50,15 +48,13 @@ func (fp *discovery) WatchNodes(ctx context.Context, tenant string) (<-chan conf
ch := make(chan config.NodesEvent)

cancel, err := op.Subscribe(ctx, config.EventTypeNodes, func(e config.Event) {
ch <- *e.(*config.NodesEvent)
safeSendCh(ch, *e.(*config.NodesEvent))
})
if err != nil {
return nil, nil, err
}

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func (fp *discovery) WatchUsers(ctx context.Context, tenant string) (<-chan config.UsersEvent, context.CancelFunc, error) {
Expand All @@ -70,15 +66,13 @@ func (fp *discovery) WatchUsers(ctx context.Context, tenant string) (<-chan conf
ch := make(chan config.UsersEvent)

cancel, err := op.Subscribe(ctx, config.EventTypeUsers, func(e config.Event) {
ch <- *e.(*config.UsersEvent)
safeSendCh(ch, *e.(*config.UsersEvent))
})
if err != nil {
return nil, nil, err
}

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func (fp *discovery) WatchClusters(ctx context.Context, tenant string) (<-chan config.ClustersEvent, context.CancelFunc, error) {
Expand All @@ -90,15 +84,13 @@ func (fp *discovery) WatchClusters(ctx context.Context, tenant string) (<-chan c
ch := make(chan config.ClustersEvent)

cancel, err := op.Subscribe(ctx, config.EventTypeClusters, func(e config.Event) {
ch <- *e.(*config.ClustersEvent)
safeSendCh(ch, *e.(*config.ClustersEvent))
})
if err != nil {
return nil, nil, err
}

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func (fp *discovery) WatchShardingRule(ctx context.Context, tenant string) (<-chan config.ShardingRuleEvent, context.CancelFunc, error) {
Expand All @@ -110,15 +102,13 @@ func (fp *discovery) WatchShardingRule(ctx context.Context, tenant string) (<-ch
ch := make(chan config.ShardingRuleEvent)

cancel, err := op.Subscribe(ctx, config.EventTypeShardingRule, func(e config.Event) {
ch <- *e.(*config.ShardingRuleEvent)
safeSendCh(ch, *e.(*config.ShardingRuleEvent))
})
if err != nil {
return nil, nil, err
}

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func (fp *discovery) WatchShadowRule(ctx context.Context, tenant string) (<-chan config.ShadowRuleEvent, context.CancelFunc, error) {
Expand All @@ -130,23 +120,29 @@ func (fp *discovery) WatchShadowRule(ctx context.Context, tenant string) (<-chan
ch := make(chan config.ShadowRuleEvent)

cancel, err := op.Subscribe(ctx, config.EventTypeShadowRule, func(e config.Event) {
ch <- *e.(*config.ShadowRuleEvent)
safeSendCh(ch, *e.(*config.ShadowRuleEvent))
})
if err != nil {
return nil, nil, err
}

return ch, wrapWatchCancel(cancel, func() {
close(ch)
}), nil
return ch, wrapWatchCancel(cancel, ch), nil
}

func safeSendCh[T any](ch chan<- T, t T) {
// recover if chan is closed
defer func() {
_ = recover()
}()
ch <- t
}

func wrapWatchCancel(cancel context.CancelFunc, closeChan func()) context.CancelFunc {
func wrapWatchCancel[T any](cancel context.CancelFunc, ch chan T) context.CancelFunc {
return func() {
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
cancel()
<-timer.C
closeChan()
close(ch)
}
}

0 comments on commit 4657262

Please sign in to comment.