Skip to content

Commit

Permalink
Introduce connectionLivenessCheckTimeout configuration (#1162)
Browse files Browse the repository at this point in the history
This configuration defines the number of milliseconds the connection might be idle before need to perform a liveness check on acquiring from the pool.

```typescript
const driver = neo4j.driver(URL, AUTH, {
  // other configurations, then
  connectionLivenessCheckTimeout: 30000 // 30 seconds
})
```

Check the API docs for more information.
  • Loading branch information
bigmontz committed Dec 4, 2023
1 parent bedb211 commit 33d88ca
Show file tree
Hide file tree
Showing 23 changed files with 691 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Pool, { PoolConfig } from '../pool'
import { error, ConnectionProvider, ServerInfo, newError } from 'neo4j-driver-core'
import AuthenticationProvider from './authentication-provider'
import { object } from '../lang'
import LivenessCheckProvider from './liveness-check-provider'

const { SERVICE_UNAVAILABLE } = error
const AUTHENTICATION_ERRORS = [
Expand All @@ -40,6 +41,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
this._config = config
this._log = log
this._authenticationProvider = new AuthenticationProvider({ authTokenManager, userAgent, boltAgent })
this._livenessCheckProvider = new LivenessCheckProvider({ connectionLivenessCheckTimeout: config.connectionLivenessCheckTimeout })
this._userAgent = userAgent
this._boltAgent = boltAgent
this._createChannelConnection =
Expand Down Expand Up @@ -81,6 +83,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
_createConnection ({ auth }, address, release) {
return this._createChannelConnection(address).then(connection => {
connection.release = () => {
connection.idleTimestamp = Date.now()
return release(address, connection)
}
this._openConnections[connection.id] = connection
Expand All @@ -100,6 +103,15 @@ export default class PooledConnectionProvider extends ConnectionProvider {
return false
}

try {
await this._livenessCheckProvider.check(conn)
} catch (error) {
this._log.debug(
`The connection ${conn.id} is not alive because of an error ${error.code} '${error.message}'`
)
return false
}

try {
await this._authenticationProvider.authenticate({ connection: conn, auth, skipReAuth })
return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export default class LivenessCheckProvider {
constructor ({ connectionLivenessCheckTimeout }) {
this._connectionLivenessCheckTimeout = connectionLivenessCheckTimeout
}

/**
* Checks connection liveness with configured params.
*
* @param {Connection} connection
* @returns {Promise<true>} If liveness checks succeed, throws otherwise
*/
async check (connection) {
if (this._isCheckDisabled || this._isNewlyCreatedConnection(connection)) {
return true
}

const idleFor = Date.now() - connection.idleTimestamp

if (this._connectionLivenessCheckTimeout === 0 ||
idleFor > this._connectionLivenessCheckTimeout) {
return await connection.resetAndFlush()
.then(() => true)
}

return true
}

get _isCheckDisabled () {
return this._connectionLivenessCheckTimeout == null || this._connectionLivenessCheckTimeout < 0
}

_isNewlyCreatedConnection (connection) {
return connection.authToken == null
}
}
14 changes: 13 additions & 1 deletion packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export default class ChannelConnection extends Connection {
this._id = idGenerator++
this._address = address
this._server = { address: address.asHostPort() }
this.creationTimestamp = Date.now()
this._creationTimestamp = Date.now()
this._disableLosslessIntegers = disableLosslessIntegers
this._ch = channel
this._chunker = chunker
Expand Down Expand Up @@ -220,6 +220,18 @@ export default class ChannelConnection extends Connection {
this._dbConnectionId = value
}

set idleTimestamp (value) {
this._idleTimestamp = value
}

get idleTimestamp () {
return this._idleTimestamp
}

get creationTimestamp () {
return this._creationTimestamp
}

/**
* Send initialization message.
* @param {string} userAgent the user agent for this driver.
Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/connection/connection-delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ export default class DelegateConnection extends Connection {
this._delegate.version = value
}

get creationTimestamp () {
return this._delegate.creationTimestamp
}

set idleTimestamp (value) {
this._delegate.idleTimestamp = value
}

get idleTimestamp () {
return this._delegate.idleTimestamp
}

isOpen () {
return this._delegate.isOpen()
}
Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export default class Connection extends CoreConnection {
throw new Error('not implemented')
}

get creationTimestamp () {
throw new Error('not implemented')
}

set idleTimestamp (value) {
throw new Error('not implemented')
}

get idleTimestamp () {
throw new Error('not implemented')
}

/**
* @returns {BoltProtocol} the underlying bolt protocol assigned to this connection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Connection, DelegateConnection } from '../../src/connection'
import { authTokenManagers, internal, newError, ServerInfo, staticAuthTokenManager } from 'neo4j-driver-core'
import AuthenticationProvider from '../../src/connection-provider/authentication-provider'
import { functional } from '../../src/lang'
import LivenessCheckProvider from '../../src/connection-provider/liveness-check-provider'

const {
serverAddress: { ServerAddress },
Expand Down Expand Up @@ -281,16 +282,23 @@ describe('constructor', () => {
})

it('should register the release function into the connection', async () => {
const { create } = setup()
const releaseResult = { property: 'some property' }
const release = jest.fn(() => releaseResult)
jest.useFakeTimers()
try {
const { create } = setup()
const releaseResult = { property: 'some property' }
const release = jest.fn(() => releaseResult)

const connection = await create({}, server0, release)
const connection = await create({}, server0, release)
connection.idleTimestamp = -1234

const released = connection.release()
const released = connection.release()

expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
expect(connection.idleTimestamp).toBeCloseTo(Date.now())
} finally {
jest.useRealTimers()
}
})

it.each([
Expand Down Expand Up @@ -361,26 +369,27 @@ describe('constructor', () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({ auth }, connection)).resolves.toBe(true)

expect(authenticationProviderHook.authenticate).toHaveBeenCalledWith({
connection, auth
})
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it.each([
null,
undefined,
{ scheme: 'bearer', credentials: 'token01' }
])('should return true when connection is open and within the lifetime and authentication fails (auth=%o)', async (auth) => {
])('should return false when connection is open and within the lifetime and authentication fails (auth=%o)', async (auth) => {
const connection = new FakeConnection(server0)
const error = newError('failed')
const authenticationProvider = jest.fn(() => Promise.reject(error))
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook, log } = setup({ authenticationProvider })
const { validateOnAcquire, authenticationProviderHook, log, livenessCheckProviderHook } = setup({ authenticationProvider })

await expect(validateOnAcquire({ auth }, connection)).resolves.toBe(false)

Expand All @@ -391,6 +400,7 @@ describe('constructor', () => {
expect(log.debug).toHaveBeenCalledWith(
`The connection ${connection.id} is not valid because of an error ${error.code} '${error.message}'`
)
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it.each([
Expand All @@ -401,45 +411,67 @@ describe('constructor', () => {
const auth = {}
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({ auth, skipReAuth }, connection)).resolves.toBe(true)

expect(authenticationProviderHook.authenticate).toHaveBeenCalledWith({
connection, auth, skipReAuth
})
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it('should return false when liveness checks fails', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()
const error = newError('#themessage', '#thecode')

const { validateOnAcquire, authenticationProviderHook, log, livenessCheckProviderHook } = setup({
livenessCheckProvider: () => Promise.reject(error)
})

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)

expect(livenessCheckProviderHook.check).toBeCalledWith(connection)
expect(log.debug).toBeCalledWith(
`The connection ${connection.id} is not alive because of an error ${error.code} '${error.message}'`
)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
})

it('should return false when connection is closed and within the lifetime', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()
await connection.close()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})

it('should return false when connection is open and out of the lifetime', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now() - 4000

const { validateOnAcquire, authenticationProviderHook } = setup({ maxConnectionLifetime: 3000 })
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup({ maxConnectionLifetime: 3000 })

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})

it('should return false when connection is closed and out of the lifetime', async () => {
const connection = new FakeConnection(server0)
await connection.close()
connection.creationTimestamp = Date.now() - 4000

const { validateOnAcquire, authenticationProviderHook } = setup({ maxConnectionLifetime: 3000 })
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup({ maxConnectionLifetime: 3000 })

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})
})

Expand Down Expand Up @@ -492,14 +524,17 @@ describe('constructor', () => {
})
})

function setup ({ createConnection, authenticationProvider, maxConnectionLifetime } = {}) {
function setup ({ createConnection, authenticationProvider, maxConnectionLifetime, livenessCheckProvider } = {}) {
const newPool = jest.fn((...args) => new Pool(...args))
const log = new Logger('debug', () => undefined)
jest.spyOn(log, 'debug')
const createChannelConnectionHook = createConnection || jest.fn(async (address) => new FakeConnection(address))
const authenticationProviderHook = new AuthenticationProvider({ })
const livenessCheckProviderHook = new LivenessCheckProvider({})
jest.spyOn(authenticationProviderHook, 'authenticate')
.mockImplementation(authenticationProvider || jest.fn(({ connection }) => Promise.resolve(connection)))
jest.spyOn(livenessCheckProviderHook, 'check')
.mockImplementation(livenessCheckProvider || jest.fn(() => Promise.resolve(true)))
const provider = new DirectConnectionProvider({
newPool,
config: {
Expand All @@ -510,11 +545,13 @@ describe('constructor', () => {
})
provider._createChannelConnection = createChannelConnectionHook
provider._authenticationProvider = authenticationProviderHook
provider._livenessCheckProvider = livenessCheckProviderHook
return {
provider,
...newPool.mock.calls[0][0],
createChannelConnectionHook,
authenticationProviderHook,
livenessCheckProviderHook,
log
}
}
Expand Down Expand Up @@ -812,6 +849,22 @@ class FakeConnection extends Connection {
return this._supportsReAuth
}

set creationTimestamp (value) {
this._creationTimestamp = value
}

get creationTimestamp () {
return this._creationTimestamp
}

set idleTimestamp (value) {
this._idleTimestamp = value
}

get idleTimestamp () {
return this._idleTimestamp
}

async close () {
this._closed = true
}
Expand Down
Loading

0 comments on commit 33d88ca

Please sign in to comment.