Skip to content

Commit

Permalink
ListenerConn: Improve implementation of Close
Browse files Browse the repository at this point in the history
The previous coding was both unsafe (since it tried to operate on the
connection without holding senderLock), but also in some cases failed to
wake up ongoing queries.
  • Loading branch information
johto committed May 2, 2015
1 parent f324032 commit 53b185f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
11 changes: 7 additions & 4 deletions notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,13 @@ func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
// We can't know what state the protocol is in, so we need to abandon
// this connection.
l.connectionLock.Lock()
defer l.connectionLock.Unlock()
// Set the error pointer if it hasn't been set already; see
// listenerConnMain.
if l.err == nil {
l.err = err
}
l.cn.Close()
l.connectionLock.Unlock()
l.cn.c.Close()
return false, err
}

Expand Down Expand Up @@ -329,12 +329,15 @@ func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {

func (l *ListenerConn) Close() error {
l.connectionLock.Lock()
defer l.connectionLock.Unlock()
if l.err != nil {
l.connectionLock.Unlock()
return errListenerConnClosed
}
l.err = errListenerConnClosed
return l.cn.Close()
l.connectionLock.Unlock()
// We can't send anything on the connection without holding senderLock.
// Simply close the net.Conn to wake up everyone operating on it.
return l.cn.c.Close()
}

// Err() returns the reason the connection was closed. It is not safe to call
Expand Down
40 changes: 38 additions & 2 deletions notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ func TestConnExecDeadlock(t *testing.T) {
}()
// give the two goroutines some time to get into position
runtime.Gosched()
// simulate network connection failure
l.cn.c.Close()
// calls Close on the net.Conn; equivalent to a network failure
l.Close()

var done int32 = 0
go func() {
Expand All @@ -246,6 +246,42 @@ func TestConnExecDeadlock(t *testing.T) {
atomic.StoreInt32(&done, 1)
}

// Test for ListenerConn being closed while a slow query is executing
func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()

var wg sync.WaitGroup
wg.Add(1)

go func() {
sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
if sent {
panic("expected sent=false")
}
// could be any of a number of errors
if err == nil {
panic("expected error")
}
wg.Done()
}()
// give the above goroutine some time to get into position
runtime.Gosched()
err := l.Close()
if err != nil {
t.Fatal(err)
}
var done int32 = 0
go func() {
time.Sleep(10 * time.Second)
if atomic.LoadInt32(&done) != 1 {
panic("timed out")
}
}()
wg.Wait()
atomic.StoreInt32(&done, 1)
}

func TestNotifyExtra(t *testing.T) {
db := openTestConn(t)
defer db.Close()
Expand Down

0 comments on commit 53b185f

Please sign in to comment.