Skip to content

Commit

Permalink
enhance: Fast close msgdispatcher target (#34803)
Browse files Browse the repository at this point in the history
/kind improvement

issue: #34075

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Jul 28, 2024
1 parent f77f536 commit 4a3d98d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/mq/msgdispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/lifetime"
)

func TestDispatcher(t *testing.T) {
Expand Down Expand Up @@ -77,11 +78,13 @@ func TestDispatcher(t *testing.T) {
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
cancelCh: lifetime.NewSafeChan(),
})
d.AddTarget(&target{
vchannel: "mock_vchannel_1",
pos: nil,
ch: nil,
cancelCh: lifetime.NewSafeChan(),
})
num := d.TargetNum()
assert.Equal(t, 2, num)
Expand Down Expand Up @@ -110,6 +113,7 @@ func TestDispatcher(t *testing.T) {
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
cancelCh: lifetime.NewSafeChan(),
}
assert.Equal(t, cap(output), cap(target.ch))
wg := &sync.WaitGroup{}
Expand Down
11 changes: 11 additions & 0 deletions pkg/mq/msgdispatcher/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand All @@ -32,19 +36,23 @@ type target struct {
closeMu sync.Mutex
closeOnce sync.Once
closed bool

cancelCh lifetime.SafeChan
}

func newTarget(vchannel string, pos *Pos) *target {
t := &target{
vchannel: vchannel,
ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()),
pos: pos,
cancelCh: lifetime.NewSafeChan(),
}
t.closed = false
return t
}

func (t *target) close() {
t.cancelCh.Close()
t.closeMu.Lock()
defer t.closeMu.Unlock()
t.closeOnce.Do(func() {
Expand All @@ -61,6 +69,9 @@ func (t *target) send(pack *MsgPack) error {
}
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
select {
case <-t.cancelCh.CloseCh():
log.Info("target closed", zap.String("vchannel", t.vchannel))
return nil
case <-time.After(maxTolerantLag):
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag)
case t.ch <- pack:
Expand Down

0 comments on commit 4a3d98d

Please sign in to comment.