Skip to content

Commit

Permalink
Add support for LISTEN/NOTIFY
Browse files Browse the repository at this point in the history
This commit adds support for the PostgreSQL LISTEN/NOTIFY mechanism by
adding two new user-exposed interfaces:

  - ListenerConn is a low level interface which establishes a connection
    to a PostgreSQL server and only knows how to execute simple queries
    and wait for notifications.  This interface does not know how to
    reconnect, and does not maintain a list of the channels it is
    listening on.  Its use is discouraged.
  - Listener is the higher-level interface which uses ListenerConn
    underneath to provide a simpler to use interface, supporting
    automatic reconnection.  In order to do that, it also maintains a
    list of channels it is already listening on.

Significant contributions and ideas from Tommie Gannert.  Ideas and
valuable feedback from Kamil Kisiel, Maciek Sakrejda and Paul Hammond.
  • Loading branch information
johto committed Feb 8, 2014
1 parent b7b3322 commit 224d938
Show file tree
Hide file tree
Showing 6 changed files with 1,404 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Optionally, a benchmark suite can be run as part of the tests:
* pq.ParseURL for converting urls to connection strings for sql.Open.
* Many libpq compatible environment variables
* Unix socket support
* Notifications: `LISTEN`/`NOTIFY`

## Future / Things you can help with

Expand Down
8 changes: 7 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,13 @@ func (cn *conn) Prepare(q string) (driver.Stmt, error) {

func (cn *conn) Close() (err error) {
defer errRecover(&err)
cn.send(cn.writeBuf('X'))

// Don't go through send(); ListenerConn relies on us not scribbling on the
// scratch buffer of this connection.
_, err = cn.c.Write([]byte("X\x00\x00\x00\x04"))
if err != nil {
return err
}

return cn.c.Close()
}
Expand Down
35 changes: 35 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,40 @@ Usage example:
log.Fatal(err)
}
Notifications
PostgreSQL supports a simple publish/subscribe model over database
connections. See http://www.postgresql.org/docs/current/static/sql-notify.html
for more information about the general mechanism.
To start listening for notifications, you first have to open a new connection
to the database by calling NewListener. This connection can not be used for
anything other than LISTEN / NOTIFY. Calling Listen will open a "notification
channel"; once a notification channel is open, a notification generated on that
channel will effect a send on the Listener.Notify channel. A notification
channel will remain open until Unlisten is called, though connection loss might
result in some notifications being lost. To solve this problem, Listener sends
a nil pointer over the Notify channel any time the connection is re-established
following a connection loss. The application can get information about the
state of the underlying connection by setting an event callback in the call to
NewListener.
A single Listener can safely be used from concurrent goroutines, which means
that there is often no need to create more than one Listener in your
application. However, a Listener is always connected to a single database, so
you will need to create a new Listener instance for every database you want to
receive notifications in.
The channel name in both Listen and Unlisten is case sensitive, and can contain
any characters legal in an identifier (see
http://www.postgresql.org/docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
for more information). Note that the channel name will be truncated to 63
bytes by the PostgreSQL server.
You can find a complete, working example of Listener usage at
http://godoc.org/github.com/lib/pq/listen_example.
*/
package pq
102 changes: 102 additions & 0 deletions listen_example/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Below you will find a self-contained Go program which uses the LISTEN / NOTIFY
mechanism to avoid polling the database while waiting for more work to arrive.
//
// You can see the program in action by defining a function similar to
// the following:
//
// CREATE OR REPLACE FUNCTION public.get_work()
// RETURNS bigint
// LANGUAGE sql
// AS $$
// SELECT CASE WHEN random() >= 0.2 THEN int8 '1' END
// $$
// ;
package main
import (
"github.com/lib/pq"
"database/sql"
"fmt"
"time"
)
func doWork(db *sql.DB, work int64) {
// work here
}
func getWork(db *sql.DB) {
for {
// get work from the database here
var work sql.NullInt64
err := db.QueryRow("SELECT get_work()").Scan(&work)
if err != nil {
fmt.Println("call to get_work() failed: ", err)
time.Sleep(10 * time.Second)
continue
}
if !work.Valid {
// no more work to do
fmt.Println("ran out of work")
return
}
fmt.Println("starting work on ", work.Int64)
go doWork(db, work.Int64)
}
}
func waitForNotification(l *pq.Listener) {
for {
select {
case <-l.Notify:
fmt.Println("received notification, new work available")
return
case <-time.After(90 * time.Second):
go func() {
l.Ping()
}()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
return
}
}
}
func main() {
var conninfo string = ""
db, err := sql.Open("postgres", conninfo)
if err != nil {
panic(err)
}
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
listener := pq.NewListener(conninfo, 10 * time.Second, time.Minute, reportProblem)
err = listener.Listen("getwork")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
getWork(db)
waitForNotification(listener)
}
}
*/
package listen_example
Loading

0 comments on commit 224d938

Please sign in to comment.