From fddc443bb7f4075cfd451ddf250294870a98f598 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 3 Sep 2020 19:28:00 +0530 Subject: [PATCH] Move Closer from y to z --- z/z.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++ z/z_test.go | 26 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/z/z.go b/z/z.go index 8b284ce2..a25e10d1 100644 --- a/z/z.go +++ b/z/z.go @@ -17,6 +17,9 @@ package z import ( + "context" + "sync" + "github.com/cespare/xxhash" ) @@ -53,3 +56,72 @@ func KeyToHash(key interface{}) (uint64, uint64) { panic("Key type not supported") } } + +var ( + dummyCloserChan <-chan struct{} +) + +// Closer holds the two things we need to close a goroutine and wait for it to +// finish: a chan to tell the goroutine to shut down, and a WaitGroup with +// which to wait for it to finish shutting down. +type Closer struct { + waiting sync.WaitGroup + + ctx context.Context + cancel context.CancelFunc +} + +// NewCloser constructs a new Closer, with an initial count on the WaitGroup. +func NewCloser(initial int) *Closer { + ret := &Closer{} + ret.ctx, ret.cancel = context.WithCancel(context.Background()) + ret.waiting.Add(initial) + return ret +} + +// AddRunning Add()'s delta to the WaitGroup. +func (lc *Closer) AddRunning(delta int) { + lc.waiting.Add(delta) +} + +// Ctx can be used to get a context, which would automatically get cancelled when Signal is called. +func (lc *Closer) Ctx() context.Context { + if lc == nil { + return context.Background() + } + return lc.ctx +} + +// Signal signals the HasBeenClosed signal. +func (lc *Closer) Signal() { + // Todo(ibrahim): Change Signal to return error on next badger breaking change. + lc.cancel() +} + +// HasBeenClosed gets signaled when Signal() is called. +func (lc *Closer) HasBeenClosed() <-chan struct{} { + if lc == nil { + return dummyCloserChan + } + return lc.ctx.Done() +} + +// Done calls Done() on the WaitGroup. +func (lc *Closer) Done() { + if lc == nil { + return + } + lc.waiting.Done() +} + +// Wait waits on the WaitGroup. (It waits for NewCloser's initial value, AddRunning, and Done +// calls to balance out.) +func (lc *Closer) Wait() { + lc.waiting.Wait() +} + +// SignalAndWait calls Signal(), then Wait(). +func (lc *Closer) SignalAndWait() { + lc.Signal() + lc.Wait() +} diff --git a/z/z_test.go b/z/z_test.go index 9570bc41..4eedb640 100644 --- a/z/z_test.go +++ b/z/z_test.go @@ -53,3 +53,29 @@ func TestKeyToHash(t *testing.T) { key, conflict = KeyToHash(int64(3)) verifyHashProduct(t, 3, 0, key, conflict) } + +func TestMulipleSignals(t *testing.T) { + closer := NewCloser(0) + require.NotPanics(t, func() { closer.Signal() }) + // Should not panic. + require.NotPanics(t, func() { closer.Signal() }) + require.NotPanics(t, func() { closer.SignalAndWait() }) + + // Attempt 2. + closer = NewCloser(1) + require.NotPanics(t, func() { closer.Done() }) + + require.NotPanics(t, func() { closer.SignalAndWait() }) + // Should not panic. + require.NotPanics(t, func() { closer.SignalAndWait() }) + require.NotPanics(t, func() { closer.Signal() }) +} + +func TestCloser(t *testing.T) { + closer := NewCloser(1) + go func() { + defer closer.Done() + <-closer.Ctx().Done() + }() + closer.SignalAndWait() +}