Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Chaining Keys #30

Closed
wants to merge 8 commits into from
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ async function connect() {
remoteNodePublicKey: '02df5ffe895c778e10f7742a6c5b8a0cefbe9465df58b92fadeb883752c8107c8f',
// Optional WebSocket proxy endpoint to connect through (see WebSocket Proxy section)
wsProxy: 'wss://<WEBSOCKET_PROXY>',
// Optional TCP Socket to connect through (either use wsProxy OR tcpSocket)
tcpSocket: new net.Socket(),
// The IP address of the node
ip: '35.232.170.67',
// The port of the node, defaults to 9735
Expand Down
85 changes: 58 additions & 27 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
ConnectionStatus
} from './types.js'

const MAX_BUFFER_SIZE = 1024 * 1024 * 0.8 // 80% of 1 MB limit imposed by cln commando
const DEFAULT_RECONNECT_ATTEMPTS = 5

class LnMessage {
Expand Down Expand Up @@ -89,6 +90,8 @@ class LnMessage {
private _messageBuffer: BufferReader
private _processingBuffer: boolean
private _l: number | null
private _bytesRead: number | null
private _bytesWrittenMap: Map<any, number>

constructor(options: LnWebSocketOptions) {
validateInit(options)
Expand Down Expand Up @@ -122,6 +125,8 @@ class LnMessage {
this.Buffer = Buffer
this.tcpSocket = tcpSocket

this._bytesRead = 0
this._bytesWrittenMap = new Map()
this._handshakeState = HANDSHAKE_STATE.INITIATOR_INITIATING
this._decryptedMsgs$ = new Subject()
this.decryptedMsgs$ = this._decryptedMsgs$.asObservable()
Expand Down Expand Up @@ -177,8 +182,12 @@ class LnMessage {
}

this.socket.onopen = async () => {
this._log('info', 'WebSocket is connected')
this._log('info', 'WebSocket is connected at ' + new Date().toISOString())
this._log('info', 'Creating Act1 message')

// Resetting bytes read and written counter
this._bytesRead = 0
this._bytesWrittenMap.set(this.socket, 0)

const msg = this.noise.initiatorAct1(Buffer.from(this.remoteNodePublicKey, 'hex'))

Expand All @@ -190,7 +199,25 @@ class LnMessage {
}

this.socket.onclose = async () => {
this._log('error', 'WebSocket is closed')
this.reconnectWebSocket.bind(this)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to bind this function since it is already declared on the instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed this binding because I wanted to use the same logic used on websocket's close event in line 490 & 569 without closing the socket.

}

this.socket.onerror = (err: { message: string }) => {
this._log('error', `WebSocket error: ${JSON.stringify(err)}`)
}

this.socket.onmessage = this.queueMessage.bind(this)

return firstValueFrom(
this.connectionStatus$.pipe(
filter((status) => status === 'connected' || status === 'disconnected'),
map((status) => status === 'connected')
)
)
}

async reconnectWebSocket () {
this._log('error', 'WebSocket is closed at ' + new Date().toISOString())

this.connectionStatus$.next('disconnected')
this.connected$.next(false)
Expand All @@ -207,20 +234,6 @@ class LnMessage {
this.connect()
this._attemptedReconnects += 1
}
}

this.socket.onerror = (err: { message: string }) => {
this._log('error', `WebSocket error: ${JSON.stringify(err)}`)
}

this.socket.onmessage = this.queueMessage.bind(this)

return firstValueFrom(
this.connectionStatus$.pipe(
filter((status) => status === 'connected' || status === 'disconnected'),
map((status) => status === 'connected')
)
)
}

private queueMessage(event: { data: ArrayBuffer }) {
Expand Down Expand Up @@ -287,9 +300,10 @@ class LnMessage {
}
} while (readMore)
} catch (err) {
// Terminate on failures as we won't be able to recovery
// Terminate on failures as we won't be able to recover
// since the noise state has rotated nonce and we won't
// be able to any more data without additional errors.
this._log('error', `Noise state has rotated nonce: ${JSON.stringify(err)}`)
this.disconnect()
}

Expand Down Expand Up @@ -466,6 +480,15 @@ class LnMessage {

case MessageType.CommandoResponse: {
this._commandoMsgs$.next(payload as CommandoMessage)

// Counting bytes written and reconnecting if it is about to reach the max limit
const currentBytesWritten = this._bytesWrittenMap.get(this.socket) || 0
const updatedBytesWritten = currentBytesWritten + Buffer.byteLength(decrypted)
this._bytesWrittenMap.set(this.socket, updatedBytesWritten)
if (this.connectionStatus$.value === 'connected' && updatedBytesWritten > MAX_BUFFER_SIZE) {
this._log('error', 'Bytes received exceeded the limit. Resetting the connection...' + new Date().toISOString())
this.reconnectWebSocket()
}
}

// ignore all other messages
Expand All @@ -479,10 +502,9 @@ class LnMessage {
method,
params = [],
rune,
reqId
reqId,
reqIdPrefix = 'lnmessage'
}: CommandoRequest): Promise<JsonRpcSuccessResponse['result']> {
this._log('info', `Commando request method: ${method} params: ${JSON.stringify(params)}`)

// not connected, so initiate a connection
if (this.connectionStatus$.value === 'disconnected') {
this._log('info', 'No socket connection, so creating one now')
Expand All @@ -502,6 +524,8 @@ class LnMessage {
await firstValueFrom(this.connectionStatus$.pipe(filter((status) => status === 'connected')))
}

this._log('info', `Commando request method: ${method} params: ${JSON.stringify(params)}`)

const writer = new BufferWriter()

if (!reqId) {
Expand All @@ -516,10 +540,14 @@ class LnMessage {
// write the id
writer.writeBytes(Buffer.from(reqId, 'hex'))

// Unique request id with prefix, method and id
const detailedReqId = `${reqIdPrefix}:${method}#${reqId}`

// write the request
writer.writeBytes(
Buffer.from(
JSON.stringify({
id: detailedReqId, // Adding id for easier debugging with commando
rune,
method,
params
Expand All @@ -534,19 +562,22 @@ class LnMessage {
this._log('info', 'Sending commando message')
this.socket.send(message)

this._log('info', 'Message sent and awaiting response')
// Counting bytes read and reconnecting if it is about to reach the max limit
this._bytesRead = this._bytesRead ? this._bytesRead + Buffer.byteLength(message) : Buffer.byteLength(message);
if (this.connectionStatus$.value === 'connected' && this._bytesRead && this._bytesRead > MAX_BUFFER_SIZE) {
this._log('error', 'Bytes sent exceeded the limit. Resetting the connection...' + new Date().toISOString())
this.reconnectWebSocket()
}
this._log('info', `Message sent with id ${detailedReqId} and awaiting response`)

const { response } = await firstValueFrom(
this._commandoMsgs$.pipe(filter((commandoMsg) => commandoMsg.id === reqId))
)

const { result } = response as JsonRpcSuccessResponse
const { error } = response as JsonRpcErrorResponse

this._log(
'info',
result ? 'Successful response received' : `Error response received: ${error.message}`
)

this._log('info', result ? `Successful response received for ID: ${response.id}` : `Error response received: ${error.message}`)

if (error) throw error

Expand All @@ -558,7 +589,7 @@ class LnMessage {

_log(level: keyof Logger, msg: string) {
if (this._logger && this._logger[level]) {
this._logger[level](`[${level.toUpperCase()} - ${new Date().toLocaleTimeString()}]: ${msg}`)
this._logger[level](`[${level.toUpperCase()} - ${new Date().toISOString()}]: ${msg}`)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export type CommandoRequest = JsonRpcRequest & {
* request id ahead of time
*/
reqId?: string
reqIdPrefix?: string
}

export type CommandoResponse = JsonRpcSuccessResponse | JsonRpcErrorResponse
Expand Down