858 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			858 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
| package pq
 | |
| 
 | |
| // Package pq is a pure Go Postgres driver for the database/sql package.
 | |
| // This module contains support for Postgres LISTEN/NOTIFY.
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"database/sql/driver"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // Notification represents a single notification from the database.
 | |
| type Notification struct {
 | |
| 	// Process ID (PID) of the notifying postgres backend.
 | |
| 	BePid int
 | |
| 	// Name of the channel the notification was sent on.
 | |
| 	Channel string
 | |
| 	// Payload, or the empty string if unspecified.
 | |
| 	Extra string
 | |
| }
 | |
| 
 | |
| func recvNotification(r *readBuf) *Notification {
 | |
| 	bePid := r.int32()
 | |
| 	channel := r.string()
 | |
| 	extra := r.string()
 | |
| 
 | |
| 	return &Notification{bePid, channel, extra}
 | |
| }
 | |
| 
 | |
| // SetNotificationHandler sets the given notification handler on the given
 | |
| // connection. A runtime panic occurs if c is not a pq connection. A nil handler
 | |
| // may be used to unset it.
 | |
| //
 | |
| // Note: Notification handlers are executed synchronously by pq meaning commands
 | |
| // won't continue to be processed until the handler returns.
 | |
| func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
 | |
| 	c.(*conn).notificationHandler = handler
 | |
| }
 | |
| 
 | |
| // NotificationHandlerConnector wraps a regular connector and sets a notification handler
 | |
| // on it.
 | |
| type NotificationHandlerConnector struct {
 | |
| 	driver.Connector
 | |
| 	notificationHandler func(*Notification)
 | |
| }
 | |
| 
 | |
| // Connect calls the underlying connector's connect method and then sets the
 | |
| // notification handler.
 | |
| func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
 | |
| 	c, err := n.Connector.Connect(ctx)
 | |
| 	if err == nil {
 | |
| 		SetNotificationHandler(c, n.notificationHandler)
 | |
| 	}
 | |
| 	return c, err
 | |
| }
 | |
| 
 | |
| // ConnectorNotificationHandler returns the currently set notification handler, if any. If
 | |
| // the given connector is not a result of ConnectorWithNotificationHandler, nil is
 | |
| // returned.
 | |
| func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
 | |
| 	if c, ok := c.(*NotificationHandlerConnector); ok {
 | |
| 		return c.notificationHandler
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ConnectorWithNotificationHandler creates or sets the given handler for the given
 | |
| // connector. If the given connector is a result of calling this function
 | |
| // previously, it is simply set on the given connector and returned. Otherwise,
 | |
| // this returns a new connector wrapping the given one and setting the notification
 | |
| // handler. A nil notification handler may be used to unset it.
 | |
| //
 | |
| // The returned connector is intended to be used with database/sql.OpenDB.
 | |
| //
 | |
| // Note: Notification handlers are executed synchronously by pq meaning commands
 | |
| // won't continue to be processed until the handler returns.
 | |
| func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
 | |
| 	if c, ok := c.(*NotificationHandlerConnector); ok {
 | |
| 		c.notificationHandler = handler
 | |
| 		return c
 | |
| 	}
 | |
| 	return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	connStateIdle int32 = iota
 | |
| 	connStateExpectResponse
 | |
| 	connStateExpectReadyForQuery
 | |
| )
 | |
| 
 | |
| type message struct {
 | |
| 	typ byte
 | |
| 	err error
 | |
| }
 | |
| 
 | |
| var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
 | |
| 
 | |
| // ListenerConn is a low-level interface for waiting for notifications.  You
 | |
| // should use Listener instead.
 | |
| type ListenerConn struct {
 | |
| 	// guards cn and err
 | |
| 	connectionLock sync.Mutex
 | |
| 	cn             *conn
 | |
| 	err            error
 | |
| 
 | |
| 	connState int32
 | |
| 
 | |
| 	// the sending goroutine will be holding this lock
 | |
| 	senderLock sync.Mutex
 | |
| 
 | |
| 	notificationChan chan<- *Notification
 | |
| 
 | |
| 	replyChan chan message
 | |
| }
 | |
| 
 | |
| // NewListenerConn creates a new ListenerConn. Use NewListener instead.
 | |
| func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
 | |
| 	return newDialListenerConn(defaultDialer{}, name, notificationChan)
 | |
| }
 | |
| 
 | |
| func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
 | |
| 	cn, err := DialOpen(d, name)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	l := &ListenerConn{
 | |
| 		cn:               cn.(*conn),
 | |
| 		notificationChan: c,
 | |
| 		connState:        connStateIdle,
 | |
| 		replyChan:        make(chan message, 2),
 | |
| 	}
 | |
| 
 | |
| 	go l.listenerConnMain()
 | |
| 
 | |
| 	return l, nil
 | |
| }
 | |
| 
 | |
| // We can only allow one goroutine at a time to be running a query on the
 | |
| // connection for various reasons, so the goroutine sending on the connection
 | |
| // must be holding senderLock.
 | |
| //
 | |
| // Returns an error if an unrecoverable error has occurred and the ListenerConn
 | |
| // should be abandoned.
 | |
| func (l *ListenerConn) acquireSenderLock() error {
 | |
| 	// we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
 | |
| 	l.senderLock.Lock()
 | |
| 
 | |
| 	l.connectionLock.Lock()
 | |
| 	err := l.err
 | |
| 	l.connectionLock.Unlock()
 | |
| 	if err != nil {
 | |
| 		l.senderLock.Unlock()
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *ListenerConn) releaseSenderLock() {
 | |
| 	l.senderLock.Unlock()
 | |
| }
 | |
| 
 | |
| // setState advances the protocol state to newState.  Returns false if moving
 | |
| // to that state from the current state is not allowed.
 | |
| func (l *ListenerConn) setState(newState int32) bool {
 | |
| 	var expectedState int32
 | |
| 
 | |
| 	switch newState {
 | |
| 	case connStateIdle:
 | |
| 		expectedState = connStateExpectReadyForQuery
 | |
| 	case connStateExpectResponse:
 | |
| 		expectedState = connStateIdle
 | |
| 	case connStateExpectReadyForQuery:
 | |
| 		expectedState = connStateExpectResponse
 | |
| 	default:
 | |
| 		panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
 | |
| 	}
 | |
| 
 | |
| 	return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
 | |
| }
 | |
| 
 | |
| // Main logic is here: receive messages from the postgres backend, forward
 | |
| // notifications and query replies and keep the internal state in sync with the
 | |
| // protocol state.  Returns when the connection has been lost, is about to go
 | |
| // away or should be discarded because we couldn't agree on the state with the
 | |
| // server backend.
 | |
| func (l *ListenerConn) listenerConnLoop() (err error) {
 | |
| 	defer errRecoverNoErrBadConn(&err)
 | |
| 
 | |
| 	r := &readBuf{}
 | |
| 	for {
 | |
| 		t, err := l.cn.recvMessage(r)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		switch t {
 | |
| 		case 'A':
 | |
| 			// recvNotification copies all the data so we don't need to worry
 | |
| 			// about the scratch buffer being overwritten.
 | |
| 			l.notificationChan <- recvNotification(r)
 | |
| 
 | |
| 		case 'T', 'D':
 | |
| 			// only used by tests; ignore
 | |
| 
 | |
| 		case 'E':
 | |
| 			// We might receive an ErrorResponse even when not in a query; it
 | |
| 			// is expected that the server will close the connection after
 | |
| 			// that, but we should make sure that the error we display is the
 | |
| 			// one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
 | |
| 			if !l.setState(connStateExpectReadyForQuery) {
 | |
| 				return parseError(r)
 | |
| 			}
 | |
| 			l.replyChan <- message{t, parseError(r)}
 | |
| 
 | |
| 		case 'C', 'I':
 | |
| 			if !l.setState(connStateExpectReadyForQuery) {
 | |
| 				// protocol out of sync
 | |
| 				return fmt.Errorf("unexpected CommandComplete")
 | |
| 			}
 | |
| 			// ExecSimpleQuery doesn't need to know about this message
 | |
| 
 | |
| 		case 'Z':
 | |
| 			if !l.setState(connStateIdle) {
 | |
| 				// protocol out of sync
 | |
| 				return fmt.Errorf("unexpected ReadyForQuery")
 | |
| 			}
 | |
| 			l.replyChan <- message{t, nil}
 | |
| 
 | |
| 		case 'S':
 | |
| 			// ignore
 | |
| 		case 'N':
 | |
| 			if n := l.cn.noticeHandler; n != nil {
 | |
| 				n(parseError(r))
 | |
| 			}
 | |
| 		default:
 | |
| 			return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // This is the main routine for the goroutine receiving on the database
 | |
| // connection.  Most of the main logic is in listenerConnLoop.
 | |
| func (l *ListenerConn) listenerConnMain() {
 | |
| 	err := l.listenerConnLoop()
 | |
| 
 | |
| 	// listenerConnLoop terminated; we're done, but we still have to clean up.
 | |
| 	// Make sure nobody tries to start any new queries by making sure the err
 | |
| 	// pointer is set.  It is important that we do not overwrite its value; a
 | |
| 	// connection could be closed by either this goroutine or one sending on
 | |
| 	// the connection -- whoever closes the connection is assumed to have the
 | |
| 	// more meaningful error message (as the other one will probably get
 | |
| 	// net.errClosed), so that goroutine sets the error we expose while the
 | |
| 	// other error is discarded.  If the connection is lost while two
 | |
| 	// goroutines are operating on the socket, it probably doesn't matter which
 | |
| 	// error we expose so we don't try to do anything more complex.
 | |
| 	l.connectionLock.Lock()
 | |
| 	if l.err == nil {
 | |
| 		l.err = err
 | |
| 	}
 | |
| 	l.cn.Close()
 | |
| 	l.connectionLock.Unlock()
 | |
| 
 | |
| 	// There might be a query in-flight; make sure nobody's waiting for a
 | |
| 	// response to it, since there's not going to be one.
 | |
| 	close(l.replyChan)
 | |
| 
 | |
| 	// let the listener know we're done
 | |
| 	close(l.notificationChan)
 | |
| 
 | |
| 	// this ListenerConn is done
 | |
| }
 | |
| 
 | |
| // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
 | |
| func (l *ListenerConn) Listen(channel string) (bool, error) {
 | |
| 	return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
 | |
| }
 | |
| 
 | |
| // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
 | |
| func (l *ListenerConn) Unlisten(channel string) (bool, error) {
 | |
| 	return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
 | |
| }
 | |
| 
 | |
| // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
 | |
| func (l *ListenerConn) UnlistenAll() (bool, error) {
 | |
| 	return l.ExecSimpleQuery("UNLISTEN *")
 | |
| }
 | |
| 
 | |
| // Ping the remote server to make sure it's alive.  Non-nil error means the
 | |
| // connection has failed and should be abandoned.
 | |
| func (l *ListenerConn) Ping() error {
 | |
| 	sent, err := l.ExecSimpleQuery("")
 | |
| 	if !sent {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		// shouldn't happen
 | |
| 		panic(err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Attempt to send a query on the connection.  Returns an error if sending the
 | |
| // query failed, and the caller should initiate closure of this connection.
 | |
| // The caller must be holding senderLock (see acquireSenderLock and
 | |
| // releaseSenderLock).
 | |
| func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
 | |
| 	defer errRecoverNoErrBadConn(&err)
 | |
| 
 | |
| 	// must set connection state before sending the query
 | |
| 	if !l.setState(connStateExpectResponse) {
 | |
| 		panic("two queries running at the same time")
 | |
| 	}
 | |
| 
 | |
| 	// Can't use l.cn.writeBuf here because it uses the scratch buffer which
 | |
| 	// might get overwritten by listenerConnLoop.
 | |
| 	b := &writeBuf{
 | |
| 		buf: []byte("Q\x00\x00\x00\x00"),
 | |
| 		pos: 1,
 | |
| 	}
 | |
| 	b.string(q)
 | |
| 	l.cn.send(b)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
 | |
| // parameters) on the connection. The possible return values are:
 | |
| //   1) "executed" is true; the query was executed to completion on the
 | |
| //      database server.  If the query failed, err will be set to the error
 | |
| //      returned by the database, otherwise err will be nil.
 | |
| //   2) If "executed" is false, the query could not be executed on the remote
 | |
| //      server.  err will be non-nil.
 | |
| //
 | |
| // After a call to ExecSimpleQuery has returned an executed=false value, the
 | |
| // connection has either been closed or will be closed shortly thereafter, and
 | |
| // all subsequently executed queries will return an error.
 | |
| func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
 | |
| 	if err = l.acquireSenderLock(); err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	defer l.releaseSenderLock()
 | |
| 
 | |
| 	err = l.sendSimpleQuery(q)
 | |
| 	if err != nil {
 | |
| 		// We can't know what state the protocol is in, so we need to abandon
 | |
| 		// this connection.
 | |
| 		l.connectionLock.Lock()
 | |
| 		// Set the error pointer if it hasn't been set already; see
 | |
| 		// listenerConnMain.
 | |
| 		if l.err == nil {
 | |
| 			l.err = err
 | |
| 		}
 | |
| 		l.connectionLock.Unlock()
 | |
| 		l.cn.c.Close()
 | |
| 		return false, err
 | |
| 	}
 | |
| 
 | |
| 	// now we just wait for a reply..
 | |
| 	for {
 | |
| 		m, ok := <-l.replyChan
 | |
| 		if !ok {
 | |
| 			// We lost the connection to server, don't bother waiting for a
 | |
| 			// a response.  err should have been set already.
 | |
| 			l.connectionLock.Lock()
 | |
| 			err := l.err
 | |
| 			l.connectionLock.Unlock()
 | |
| 			return false, err
 | |
| 		}
 | |
| 		switch m.typ {
 | |
| 		case 'Z':
 | |
| 			// sanity check
 | |
| 			if m.err != nil {
 | |
| 				panic("m.err != nil")
 | |
| 			}
 | |
| 			// done; err might or might not be set
 | |
| 			return true, err
 | |
| 
 | |
| 		case 'E':
 | |
| 			// sanity check
 | |
| 			if m.err == nil {
 | |
| 				panic("m.err == nil")
 | |
| 			}
 | |
| 			// server responded with an error; ReadyForQuery to follow
 | |
| 			err = m.err
 | |
| 
 | |
| 		default:
 | |
| 			return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Close closes the connection.
 | |
| func (l *ListenerConn) Close() error {
 | |
| 	l.connectionLock.Lock()
 | |
| 	if l.err != nil {
 | |
| 		l.connectionLock.Unlock()
 | |
| 		return errListenerConnClosed
 | |
| 	}
 | |
| 	l.err = errListenerConnClosed
 | |
| 	l.connectionLock.Unlock()
 | |
| 	// We can't send anything on the connection without holding senderLock.
 | |
| 	// Simply close the net.Conn to wake up everyone operating on it.
 | |
| 	return l.cn.c.Close()
 | |
| }
 | |
| 
 | |
| // Err returns the reason the connection was closed. It is not safe to call
 | |
| // this function until l.Notify has been closed.
 | |
| func (l *ListenerConn) Err() error {
 | |
| 	return l.err
 | |
| }
 | |
| 
 | |
| var errListenerClosed = errors.New("pq: Listener has been closed")
 | |
| 
 | |
| // ErrChannelAlreadyOpen is returned from Listen when a channel is already
 | |
| // open.
 | |
| var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
 | |
| 
 | |
| // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
 | |
| var ErrChannelNotOpen = errors.New("pq: channel is not open")
 | |
| 
 | |
| // ListenerEventType is an enumeration of listener event types.
 | |
| type ListenerEventType int
 | |
| 
 | |
| const (
 | |
| 	// ListenerEventConnected is emitted only when the database connection
 | |
| 	// has been initially initialized. The err argument of the callback
 | |
| 	// will always be nil.
 | |
| 	ListenerEventConnected ListenerEventType = iota
 | |
| 
 | |
| 	// ListenerEventDisconnected is emitted after a database connection has
 | |
| 	// been lost, either because of an error or because Close has been
 | |
| 	// called. The err argument will be set to the reason the database
 | |
| 	// connection was lost.
 | |
| 	ListenerEventDisconnected
 | |
| 
 | |
| 	// ListenerEventReconnected is emitted after a database connection has
 | |
| 	// been re-established after connection loss. The err argument of the
 | |
| 	// callback will always be nil. After this event has been emitted, a
 | |
| 	// nil pq.Notification is sent on the Listener.Notify channel.
 | |
| 	ListenerEventReconnected
 | |
| 
 | |
| 	// ListenerEventConnectionAttemptFailed is emitted after a connection
 | |
| 	// to the database was attempted, but failed. The err argument will be
 | |
| 	// set to an error describing why the connection attempt did not
 | |
| 	// succeed.
 | |
| 	ListenerEventConnectionAttemptFailed
 | |
| )
 | |
| 
 | |
| // EventCallbackType is the event callback type. See also ListenerEventType
 | |
| // constants' documentation.
 | |
| type EventCallbackType func(event ListenerEventType, err error)
 | |
| 
 | |
| // Listener provides an interface for listening to notifications from a
 | |
| // PostgreSQL database.  For general usage information, see section
 | |
| // "Notifications".
 | |
| //
 | |
| // Listener can safely be used from concurrently running goroutines.
 | |
| type Listener struct {
 | |
| 	// Channel for receiving notifications from the database.  In some cases a
 | |
| 	// nil value will be sent.  See section "Notifications" above.
 | |
| 	Notify chan *Notification
 | |
| 
 | |
| 	name                 string
 | |
| 	minReconnectInterval time.Duration
 | |
| 	maxReconnectInterval time.Duration
 | |
| 	dialer               Dialer
 | |
| 	eventCallback        EventCallbackType
 | |
| 
 | |
| 	lock                 sync.Mutex
 | |
| 	isClosed             bool
 | |
| 	reconnectCond        *sync.Cond
 | |
| 	cn                   *ListenerConn
 | |
| 	connNotificationChan <-chan *Notification
 | |
| 	channels             map[string]struct{}
 | |
| }
 | |
| 
 | |
| // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
 | |
| //
 | |
| // name should be set to a connection string to be used to establish the
 | |
| // database connection (see section "Connection String Parameters" above).
 | |
| //
 | |
| // minReconnectInterval controls the duration to wait before trying to
 | |
| // re-establish the database connection after connection loss.  After each
 | |
| // consecutive failure this interval is doubled, until maxReconnectInterval is
 | |
| // reached.  Successfully completing the connection establishment procedure
 | |
| // resets the interval back to minReconnectInterval.
 | |
| //
 | |
| // The last parameter eventCallback can be set to a function which will be
 | |
| // called by the Listener when the state of the underlying database connection
 | |
| // changes.  This callback will be called by the goroutine which dispatches the
 | |
| // notifications over the Notify channel, so you should try to avoid doing
 | |
| // potentially time-consuming operations from the callback.
 | |
| func NewListener(name string,
 | |
| 	minReconnectInterval time.Duration,
 | |
| 	maxReconnectInterval time.Duration,
 | |
| 	eventCallback EventCallbackType) *Listener {
 | |
| 	return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
 | |
| }
 | |
| 
 | |
| // NewDialListener is like NewListener but it takes a Dialer.
 | |
| func NewDialListener(d Dialer,
 | |
| 	name string,
 | |
| 	minReconnectInterval time.Duration,
 | |
| 	maxReconnectInterval time.Duration,
 | |
| 	eventCallback EventCallbackType) *Listener {
 | |
| 
 | |
| 	l := &Listener{
 | |
| 		name:                 name,
 | |
| 		minReconnectInterval: minReconnectInterval,
 | |
| 		maxReconnectInterval: maxReconnectInterval,
 | |
| 		dialer:               d,
 | |
| 		eventCallback:        eventCallback,
 | |
| 
 | |
| 		channels: make(map[string]struct{}),
 | |
| 
 | |
| 		Notify: make(chan *Notification, 32),
 | |
| 	}
 | |
| 	l.reconnectCond = sync.NewCond(&l.lock)
 | |
| 
 | |
| 	go l.listenerMain()
 | |
| 
 | |
| 	return l
 | |
| }
 | |
| 
 | |
| // NotificationChannel returns the notification channel for this listener.
 | |
| // This is the same channel as Notify, and will not be recreated during the
 | |
| // life time of the Listener.
 | |
| func (l *Listener) NotificationChannel() <-chan *Notification {
 | |
| 	return l.Notify
 | |
| }
 | |
| 
 | |
| // Listen starts listening for notifications on a channel.  Calls to this
 | |
| // function will block until an acknowledgement has been received from the
 | |
| // server.  Note that Listener automatically re-establishes the connection
 | |
| // after connection loss, so this function may block indefinitely if the
 | |
| // connection can not be re-established.
 | |
| //
 | |
| // Listen will only fail in three conditions:
 | |
| //   1) The channel is already open.  The returned error will be
 | |
| //      ErrChannelAlreadyOpen.
 | |
| //   2) The query was executed on the remote server, but PostgreSQL returned an
 | |
| //      error message in response to the query.  The returned error will be a
 | |
| //      pq.Error containing the information the server supplied.
 | |
| //   3) Close is called on the Listener before the request could be completed.
 | |
| //
 | |
| // The channel name is case-sensitive.
 | |
| func (l *Listener) Listen(channel string) error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	if l.isClosed {
 | |
| 		return errListenerClosed
 | |
| 	}
 | |
| 
 | |
| 	// The server allows you to issue a LISTEN on a channel which is already
 | |
| 	// open, but it seems useful to be able to detect this case to spot for
 | |
| 	// mistakes in application logic.  If the application genuinely does't
 | |
| 	// care, it can check the exported error and ignore it.
 | |
| 	_, exists := l.channels[channel]
 | |
| 	if exists {
 | |
| 		return ErrChannelAlreadyOpen
 | |
| 	}
 | |
| 
 | |
| 	if l.cn != nil {
 | |
| 		// If gotResponse is true but error is set, the query was executed on
 | |
| 		// the remote server, but resulted in an error.  This should be
 | |
| 		// relatively rare, so it's fine if we just pass the error to our
 | |
| 		// caller.  However, if gotResponse is false, we could not complete the
 | |
| 		// query on the remote server and our underlying connection is about
 | |
| 		// to go away, so we only add relname to l.channels, and wait for
 | |
| 		// resync() to take care of the rest.
 | |
| 		gotResponse, err := l.cn.Listen(channel)
 | |
| 		if gotResponse && err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	l.channels[channel] = struct{}{}
 | |
| 	for l.cn == nil {
 | |
| 		l.reconnectCond.Wait()
 | |
| 		// we let go of the mutex for a while
 | |
| 		if l.isClosed {
 | |
| 			return errListenerClosed
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Unlisten removes a channel from the Listener's channel list.  Returns
 | |
| // ErrChannelNotOpen if the Listener is not listening on the specified channel.
 | |
| // Returns immediately with no error if there is no connection.  Note that you
 | |
| // might still get notifications for this channel even after Unlisten has
 | |
| // returned.
 | |
| //
 | |
| // The channel name is case-sensitive.
 | |
| func (l *Listener) Unlisten(channel string) error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	if l.isClosed {
 | |
| 		return errListenerClosed
 | |
| 	}
 | |
| 
 | |
| 	// Similarly to LISTEN, this is not an error in Postgres, but it seems
 | |
| 	// useful to distinguish from the normal conditions.
 | |
| 	_, exists := l.channels[channel]
 | |
| 	if !exists {
 | |
| 		return ErrChannelNotOpen
 | |
| 	}
 | |
| 
 | |
| 	if l.cn != nil {
 | |
| 		// Similarly to Listen (see comment in that function), the caller
 | |
| 		// should only be bothered with an error if it came from the backend as
 | |
| 		// a response to our query.
 | |
| 		gotResponse, err := l.cn.Unlisten(channel)
 | |
| 		if gotResponse && err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Don't bother waiting for resync if there's no connection.
 | |
| 	delete(l.channels, channel)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // UnlistenAll removes all channels from the Listener's channel list.  Returns
 | |
| // immediately with no error if there is no connection.  Note that you might
 | |
| // still get notifications for any of the deleted channels even after
 | |
| // UnlistenAll has returned.
 | |
| func (l *Listener) UnlistenAll() error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	if l.isClosed {
 | |
| 		return errListenerClosed
 | |
| 	}
 | |
| 
 | |
| 	if l.cn != nil {
 | |
| 		// Similarly to Listen (see comment in that function), the caller
 | |
| 		// should only be bothered with an error if it came from the backend as
 | |
| 		// a response to our query.
 | |
| 		gotResponse, err := l.cn.UnlistenAll()
 | |
| 		if gotResponse && err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Don't bother waiting for resync if there's no connection.
 | |
| 	l.channels = make(map[string]struct{})
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Ping the remote server to make sure it's alive.  Non-nil return value means
 | |
| // that there is no active connection.
 | |
| func (l *Listener) Ping() error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	if l.isClosed {
 | |
| 		return errListenerClosed
 | |
| 	}
 | |
| 	if l.cn == nil {
 | |
| 		return errors.New("no connection")
 | |
| 	}
 | |
| 
 | |
| 	return l.cn.Ping()
 | |
| }
 | |
| 
 | |
| // Clean up after losing the server connection.  Returns l.cn.Err(), which
 | |
| // should have the reason the connection was lost.
 | |
| func (l *Listener) disconnectCleanup() error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	// sanity check; can't look at Err() until the channel has been closed
 | |
| 	select {
 | |
| 	case _, ok := <-l.connNotificationChan:
 | |
| 		if ok {
 | |
| 			panic("connNotificationChan not closed")
 | |
| 		}
 | |
| 	default:
 | |
| 		panic("connNotificationChan not closed")
 | |
| 	}
 | |
| 
 | |
| 	err := l.cn.Err()
 | |
| 	l.cn.Close()
 | |
| 	l.cn = nil
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Synchronize the list of channels we want to be listening on with the server
 | |
| // after the connection has been established.
 | |
| func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
 | |
| 	doneChan := make(chan error)
 | |
| 	go func(notificationChan <-chan *Notification) {
 | |
| 		for channel := range l.channels {
 | |
| 			// If we got a response, return that error to our caller as it's
 | |
| 			// going to be more descriptive than cn.Err().
 | |
| 			gotResponse, err := cn.Listen(channel)
 | |
| 			if gotResponse && err != nil {
 | |
| 				doneChan <- err
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			// If we couldn't reach the server, wait for notificationChan to
 | |
| 			// close and then return the error message from the connection, as
 | |
| 			// per ListenerConn's interface.
 | |
| 			if err != nil {
 | |
| 				for range notificationChan {
 | |
| 				}
 | |
| 				doneChan <- cn.Err()
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 		doneChan <- nil
 | |
| 	}(notificationChan)
 | |
| 
 | |
| 	// Ignore notifications while synchronization is going on to avoid
 | |
| 	// deadlocks.  We have to send a nil notification over Notify anyway as
 | |
| 	// we can't possibly know which notifications (if any) were lost while
 | |
| 	// the connection was down, so there's no reason to try and process
 | |
| 	// these messages at all.
 | |
| 	for {
 | |
| 		select {
 | |
| 		case _, ok := <-notificationChan:
 | |
| 			if !ok {
 | |
| 				notificationChan = nil
 | |
| 			}
 | |
| 
 | |
| 		case err := <-doneChan:
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // caller should NOT be holding l.lock
 | |
| func (l *Listener) closed() bool {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	return l.isClosed
 | |
| }
 | |
| 
 | |
| func (l *Listener) connect() error {
 | |
| 	notificationChan := make(chan *Notification, 32)
 | |
| 	cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	err = l.resync(cn, notificationChan)
 | |
| 	if err != nil {
 | |
| 		cn.Close()
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	l.cn = cn
 | |
| 	l.connNotificationChan = notificationChan
 | |
| 	l.reconnectCond.Broadcast()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close disconnects the Listener from the database and shuts it down.
 | |
| // Subsequent calls to its methods will return an error.  Close returns an
 | |
| // error if the connection has already been closed.
 | |
| func (l *Listener) Close() error {
 | |
| 	l.lock.Lock()
 | |
| 	defer l.lock.Unlock()
 | |
| 
 | |
| 	if l.isClosed {
 | |
| 		return errListenerClosed
 | |
| 	}
 | |
| 
 | |
| 	if l.cn != nil {
 | |
| 		l.cn.Close()
 | |
| 	}
 | |
| 	l.isClosed = true
 | |
| 
 | |
| 	// Unblock calls to Listen()
 | |
| 	l.reconnectCond.Broadcast()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *Listener) emitEvent(event ListenerEventType, err error) {
 | |
| 	if l.eventCallback != nil {
 | |
| 		l.eventCallback(event, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Main logic here: maintain a connection to the server when possible, wait
 | |
| // for notifications and emit events.
 | |
| func (l *Listener) listenerConnLoop() {
 | |
| 	var nextReconnect time.Time
 | |
| 
 | |
| 	reconnectInterval := l.minReconnectInterval
 | |
| 	for {
 | |
| 		for {
 | |
| 			err := l.connect()
 | |
| 			if err == nil {
 | |
| 				break
 | |
| 			}
 | |
| 
 | |
| 			if l.closed() {
 | |
| 				return
 | |
| 			}
 | |
| 			l.emitEvent(ListenerEventConnectionAttemptFailed, err)
 | |
| 
 | |
| 			time.Sleep(reconnectInterval)
 | |
| 			reconnectInterval *= 2
 | |
| 			if reconnectInterval > l.maxReconnectInterval {
 | |
| 				reconnectInterval = l.maxReconnectInterval
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if nextReconnect.IsZero() {
 | |
| 			l.emitEvent(ListenerEventConnected, nil)
 | |
| 		} else {
 | |
| 			l.emitEvent(ListenerEventReconnected, nil)
 | |
| 			l.Notify <- nil
 | |
| 		}
 | |
| 
 | |
| 		reconnectInterval = l.minReconnectInterval
 | |
| 		nextReconnect = time.Now().Add(reconnectInterval)
 | |
| 
 | |
| 		for {
 | |
| 			notification, ok := <-l.connNotificationChan
 | |
| 			if !ok {
 | |
| 				// lost connection, loop again
 | |
| 				break
 | |
| 			}
 | |
| 			l.Notify <- notification
 | |
| 		}
 | |
| 
 | |
| 		err := l.disconnectCleanup()
 | |
| 		if l.closed() {
 | |
| 			return
 | |
| 		}
 | |
| 		l.emitEvent(ListenerEventDisconnected, err)
 | |
| 
 | |
| 		time.Sleep(time.Until(nextReconnect))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *Listener) listenerMain() {
 | |
| 	l.listenerConnLoop()
 | |
| 	close(l.Notify)
 | |
| }
 | 
