Skip to content

Commit

Permalink
Add support for a handler func that receives notice messages from Pg
Browse files Browse the repository at this point in the history
Fixes #580
  • Loading branch information
cretz authored and autarch committed Jan 16, 2020
1 parent d6fd202 commit dfe11e7
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 4 deletions.
13 changes: 11 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ type conn struct {

// If true this connection is in the middle of a COPY
inCopy bool

// If not nil, notices will be synchronously sent here
noticeHandler func(*Error)
}

// Handle driver-side settings in parsed connection string.
Expand Down Expand Up @@ -971,7 +974,9 @@ func (cn *conn) recv() (t byte, r *readBuf) {
case 'E':
panic(parseError(r))
case 'N':
// ignore
if n := cn.noticeHandler; n != nil {
n(parseError(r))
}
default:
return
}
Expand All @@ -988,8 +993,12 @@ func (cn *conn) recv1Buf(r *readBuf) byte {
}

switch t {
case 'A', 'N':
case 'A':
// ignore
case 'N':
if n := cn.noticeHandler; n != nil {
n(parseError(r))
}
case 'S':
cn.processParameterStatus(r)
default:
Expand Down
4 changes: 3 additions & 1 deletion copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (ci *copyin) resploop() {
case 'C':
// complete
case 'N':
// NoticeResponse
if n := ci.cn.noticeHandler; n != nil {
n(parseError(&r))
}
case 'Z':
ci.cn.processReadyForQuery(&r)
ci.done <- true
Expand Down
71 changes: 71 additions & 0 deletions notice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// +build go1.10

package pq

import (
"context"
"database/sql/driver"
)

// NoticeHandler returns the notice handler on the given connection, if any. A
// runtime panic occurs if c is not a pq connection. This is rarely used
// directly, use ConnectorNoticeHandler and ConnectorWithNoticeHandler instead.
func NoticeHandler(c driver.Conn) func(*Error) {
return c.(*conn).noticeHandler
}

// SetNoticeHandler sets the given notice handler on the given connection. A
// runtime panic occurs if c is not a pq connection. A nil handler may be used
// to unset it. This is rarely used directly, use ConnectorNoticeHandler and
// ConnectorWithNoticeHandler instead.
//
// Note: Notice handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func SetNoticeHandler(c driver.Conn, handler func(*Error)) {
c.(*conn).noticeHandler = handler
}

// NoticeHandlerConnector wraps a regular connector and sets a notice handler
// on it.
type NoticeHandlerConnector struct {
driver.Connector
noticeHandler func(*Error)
}

// Connect calls the underlying connector's connect method and then sets the
// notice handler.
func (n *NoticeHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
c, err := n.Connector.Connect(ctx)
if err == nil {
SetNoticeHandler(c, n.noticeHandler)
}
return c, err
}

// ConnectorNoticeHandler returns the currently set notice handler, if any. If
// the given connector is not a result of ConnectorWithNoticeHandler, nil is
// returned.
func ConnectorNoticeHandler(c driver.Connector) func(*Error) {
if c, ok := c.(*NoticeHandlerConnector); ok {
return c.noticeHandler
}
return nil
}

// ConnectorWithNoticeHandler creates or sets the given handler for the given
// connector. If the given connector is a result of calling this function
// previously, it is simply set on the given connector and returned. Otherwise,
// this returns a new connector wrapping the given one and setting the notice
// handler. A nil notice handler may be used to unset it.
//
// The returned connector is intended to be used with database/sql.OpenDB.
//
// Note: Notice handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func ConnectorWithNoticeHandler(c driver.Connector, handler func(*Error)) *NoticeHandlerConnector {
if c, ok := c.(*NoticeHandlerConnector); ok {
c.noticeHandler = handler
return c
}
return &NoticeHandlerConnector{Connector: c, noticeHandler: handler}
}
33 changes: 33 additions & 0 deletions notice_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// +build go1.10

package pq_test

import (
"database/sql"
"fmt"
"log"

"github.com/lib/pq"
)

func ExampleConnectorWithNoticeHandler() {
name := ""
// Base connector to wrap
base, err := pq.NewConnector(name)
if err != nil {
log.Fatal(err)
}
// Wrap the connector to simply print out the message
connector := pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) {
fmt.Println("Notice sent: " + notice.Message)
})
db := sql.OpenDB(connector)
defer db.Close()
// Raise a notice
sql := "DO language plpgsql $$ BEGIN RAISE NOTICE 'test notice'; END $$"
if _, err := db.Exec(sql); err != nil {
log.Fatal(err)
}
// Output:
// Notice sent: test notice
}
49 changes: 49 additions & 0 deletions notice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// +build go1.10

package pq

import (
"database/sql"
"database/sql/driver"
"testing"
)

func TestConnectorWithNoticeHandler_Simple(t *testing.T) {
b, err := NewConnector("")
if err != nil {
t.Fatal(err)
}
var notice *Error
// Make connector w/ handler to set the local var
c := ConnectorWithNoticeHandler(b, func(n *Error) { notice = n })
raiseNotice(c, t, "Test notice #1")
if notice == nil || notice.Message != "Test notice #1" {
t.Fatalf("Expected notice w/ message, got %v", notice)
}
// Unset the handler on the same connector
prevC := c
if c = ConnectorWithNoticeHandler(c, nil); c != prevC {
t.Fatalf("Expected to not create new connector but did")
}
raiseNotice(c, t, "Test notice #2")
if notice == nil || notice.Message != "Test notice #1" {
t.Fatalf("Expected notice to not change, got %v", notice)
}
// Set it back on the same connector
if c = ConnectorWithNoticeHandler(c, func(n *Error) { notice = n }); c != prevC {
t.Fatal("Expected to not create new connector but did")
}
raiseNotice(c, t, "Test notice #3")
if notice == nil || notice.Message != "Test notice #3" {
t.Fatalf("Expected notice w/ message, got %v", notice)
}
}

func raiseNotice(c driver.Connector, t *testing.T, escapedNotice string) {
db := sql.OpenDB(c)
defer db.Close()
sql := "DO language plpgsql $$ BEGIN RAISE NOTICE '" + escapedNotice + "'; END $$"
if _, err := db.Exec(sql); err != nil {
t.Fatal(err)
}
}
6 changes: 5 additions & 1 deletion notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ func (l *ListenerConn) listenerConnLoop() (err error) {
}
l.replyChan <- message{t, nil}

case 'N', 'S':
case 'S':
// ignore
case 'N':
if n := l.cn.noticeHandler; n != nil {
n(parseError(r))
}
default:
return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
}
Expand Down

0 comments on commit dfe11e7

Please sign in to comment.