From 608dba8c30d3539b46997a7bb6dae993021d95cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Matczuk?= Date: Tue, 9 Oct 2018 21:51:27 +0200 Subject: [PATCH] hostConnPool: introduced ConnPicker interface to abstract pool storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hostConnPool logic around conns slice is moved to defaultConnPicker. Co-authored-by: Henrik Johansson Co-authored-by: MichaƂ Matczuk --- conn.go | 4 ++ connectionpool.go | 124 ++++++++++++++++------------------------------ connpicker.go | 116 +++++++++++++++++++++++++++++++++++++++++++ query_executor.go | 2 +- session.go | 2 +- 5 files changed, 165 insertions(+), 83 deletions(-) create mode 100644 connpicker.go diff --git a/conn.go b/conn.go index 7258c92ff..124b0cbb1 100644 --- a/conn.go +++ b/conn.go @@ -427,6 +427,10 @@ func (s *startupCoordinator) authenticateHandshake(ctx context.Context, authFram } func (c *Conn) closeWithError(err error) { + if c == nil { + return + } + if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { return } diff --git a/connectionpool.go b/connectionpool.go index 7bfb08754..a6bffd7ed 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -13,7 +13,6 @@ import ( "math/rand" "net" "sync" - "sync/atomic" "time" ) @@ -256,35 +255,34 @@ type hostConnPool struct { addr string size int keyspace string - // protection for conns, closed, filling - mu sync.RWMutex - conns []*Conn - closed bool - filling bool - - pos uint32 + // protection for connPicker, closed, filling + mu sync.RWMutex + connPicker ConnPicker + closed bool + filling bool } func (h *hostConnPool) String() string { h.mu.RLock() defer h.mu.RUnlock() + size, _ := h.connPicker.Size() return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]", - h.filling, h.closed, len(h.conns), h.size, h.host) + h.filling, h.closed, size, h.size, h.host) } func newHostConnPool(session *Session, host *HostInfo, port, size int, keyspace string) *hostConnPool { pool := &hostConnPool{ - session: session, - host: host, - port: port, - addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(), - size: size, - keyspace: keyspace, - conns: make([]*Conn, 0, size), - filling: false, - closed: false, + session: session, + host: host, + port: port, + addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(), + size: size, + keyspace: keyspace, + connPicker: nopConnPicker{}, + filling: false, + closed: false, } // the pool is not filled or connected @@ -292,7 +290,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int, } // Pick a connection from this connection pool for the given query. -func (pool *hostConnPool) Pick() *Conn { +func (pool *hostConnPool) Pick(token token) *Conn { pool.mu.RLock() defer pool.mu.RUnlock() @@ -300,8 +298,8 @@ func (pool *hostConnPool) Pick() *Conn { return nil } - size := len(pool.conns) - if size < pool.size { + size, missing := pool.connPicker.Size() + if missing > 0 { // try to fill the pool go pool.fill() @@ -310,23 +308,7 @@ func (pool *hostConnPool) Pick() *Conn { } } - pos := int(atomic.AddUint32(&pool.pos, 1) - 1) - - var ( - leastBusyConn *Conn - streamsAvailable int - ) - - // find the conn which has the most available streams, this is racy - for i := 0; i < size; i++ { - conn := pool.conns[(pos+i)%size] - if streams := conn.AvailableStreams(); streams > streamsAvailable { - leastBusyConn = conn - streamsAvailable = streams - } - } - - return leastBusyConn + return pool.connPicker.Pick(token) } //Size returns the number of connections currently active in the pool @@ -334,38 +316,19 @@ func (pool *hostConnPool) Size() int { pool.mu.RLock() defer pool.mu.RUnlock() - return len(pool.conns) + size, _ := pool.connPicker.Size() + return size } //Close the connection pool func (pool *hostConnPool) Close() { pool.mu.Lock() + defer pool.mu.Unlock() - if pool.closed { - pool.mu.Unlock() - return + if !pool.closed { + pool.connPicker.Close() } pool.closed = true - - // ensure we dont try to reacquire the lock in handleError - // TODO: improve this as the following can happen - // 1) we have locked pool.mu write lock - // 2) conn.Close calls conn.closeWithError(nil) - // 3) conn.closeWithError calls conn.Close() which returns an error - // 4) conn.closeWithError calls pool.HandleError with the error from conn.Close - // 5) pool.HandleError tries to lock pool.mu - // deadlock - - // empty the pool - conns := pool.conns - pool.conns = nil - - pool.mu.Unlock() - - // close the connections - for _, conn := range conns { - conn.Close() - } } // Fill the connection pool @@ -378,8 +341,7 @@ func (pool *hostConnPool) fill() { } // determine the filling work to be done - startCount := len(pool.conns) - fillCount := pool.size - startCount + startCount, fillCount := pool.connPicker.Size() // avoid filling a full (or overfull) pool if fillCount <= 0 { @@ -391,9 +353,7 @@ func (pool *hostConnPool) fill() { pool.mu.RUnlock() pool.mu.Lock() - // double check everything since the lock was released - startCount = len(pool.conns) - fillCount = pool.size - startCount + startCount, fillCount = pool.connPicker.Size() if pool.closed || pool.filling || fillCount <= 0 { // looks like another goroutine already beat this // goroutine to the filling @@ -427,8 +387,10 @@ func (pool *hostConnPool) fill() { return } - // filled one - fillCount-- + // filled one, let's reload it to see if it has changed + pool.mu.RLock() + _, fillCount = pool.connPicker.Size() + pool.mu.RUnlock() } // fill the rest of the pool asynchronously @@ -543,11 +505,21 @@ func (pool *hostConnPool) connect() (err error) { return nil } - pool.conns = append(pool.conns, conn) + // lazily initialize the connPicker when we know the required type + pool.initConnPicker(conn) + pool.connPicker.Put(conn) return nil } +func (pool *hostConnPool) initConnPicker(conn *Conn) { + if _, ok := pool.connPicker.(nopConnPicker); !ok { + return + } + + pool.connPicker = newDefaultConnPicker(pool.size) +} + // handle any error from a Conn func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) { if !closed { @@ -565,15 +537,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) { return } - // find the connection index - for i, candidate := range pool.conns { - if candidate == conn { - // remove the connection, not preserving order - pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1] - - // lost a connection, so fill the pool - go pool.fill() - break - } - } + pool.connPicker.Remove(conn) } diff --git a/connpicker.go b/connpicker.go new file mode 100644 index 000000000..ac00dc05f --- /dev/null +++ b/connpicker.go @@ -0,0 +1,116 @@ +package gocql + +import ( + "fmt" + "sync" + "sync/atomic" +) + +type ConnPicker interface { + Pick(token) *Conn + Put(*Conn) + Remove(conn *Conn) + Size() (int, int) + Close() +} + +type defaultConnPicker struct { + conns []*Conn + pos uint32 + size int + mu sync.RWMutex +} + +func newDefaultConnPicker(size int) *defaultConnPicker { + if size <= 0 { + panic(fmt.Sprintf("invalid pool size %d", size)) + } + return &defaultConnPicker{ + size: size, + } +} + +func (p *defaultConnPicker) Remove(conn *Conn) { + p.mu.Lock() + defer p.mu.Unlock() + + for i, candidate := range p.conns { + if candidate == conn { + p.conns[i] = nil + return + } + } +} + +func (p *defaultConnPicker) Close() { + p.mu.Lock() + defer p.mu.Unlock() + + conns := p.conns + p.conns = nil + for _, conn := range conns { + if conn != nil { + conn.Close() + } + } +} + +func (p *defaultConnPicker) Size() (int, int) { + size := len(p.conns) + return size, p.size - size +} + +func (p *defaultConnPicker) Pick(token) *Conn { + pos := int(atomic.AddUint32(&p.pos, 1) - 1) + size := len(p.conns) + + var ( + leastBusyConn *Conn + streamsAvailable int + ) + + // find the conn which has the most available streams, this is racy + for i := 0; i < size; i++ { + conn := p.conns[(pos+i)%size] + if conn == nil { + continue + } + if streams := conn.AvailableStreams(); streams > streamsAvailable { + leastBusyConn = conn + streamsAvailable = streams + } + } + + return leastBusyConn +} + +func (p *defaultConnPicker) Put(conn *Conn) { + p.mu.Lock() + p.conns = append(p.conns, conn) + p.mu.Unlock() +} + +// nopConnPicker is a no-operation implementation of ConnPicker, it's used when +// hostConnPool is created to allow deferring creation of the actual ConnPicker +// to the point where we have first connection. +type nopConnPicker struct{} + +func (nopConnPicker) Pick(token) *Conn { + return nil +} + +func (nopConnPicker) Put(*Conn) { +} + +func (nopConnPicker) Remove(conn *Conn) { +} + +func (nopConnPicker) Size() (int, int) { + // Return 1 to make hostConnPool to try to establish a connection. + // When first connection is established hostConnPool replaces nopConnPicker + // with a different ConnPicker implementation. + return 0, 1 +} + +func (nopConnPicker) Close() { +} diff --git a/query_executor.go b/query_executor.go index dfe7d6533..6e5a492af 100644 --- a/query_executor.go +++ b/query_executor.go @@ -117,7 +117,7 @@ func (q *queryExecutor) run(qry ExecutableQuery, specWG *sync.WaitGroup, results continue } - conn := pool.Pick() + conn := pool.Pick(selectedHost.Token()) if conn == nil { selectedHost = hostIter() continue diff --git a/session.go b/session.go index e69d2bd61..c6ca82355 100644 --- a/session.go +++ b/session.go @@ -430,7 +430,7 @@ func (s *Session) getConn() *Conn { pool, ok := s.pool.getPool(host) if !ok { continue - } else if conn := pool.Pick(); conn != nil { + } else if conn := pool.Pick(nil); conn != nil { return conn } }