Skip to content

Commit

Permalink
Improve channel rpc usage (#275)
Browse files Browse the repository at this point in the history
* Improve channel rpc usage

* Fix lint error

* Remove unreachable code
  • Loading branch information
mpowaga committed Apr 1, 2019
1 parent 82c4daa commit a8608b8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 79 deletions.
31 changes: 3 additions & 28 deletions es/channel/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import {
send,
emit
} from './internal'
<<<<<<< HEAD
import { unpackTx } from '../tx/builder'
import * as R from 'ramda'
=======
>>>>>>> Improve channel rpc usage (#275)

export function awaitingConnection (channel, message, state) {
if (message.method === 'channels.info') {
Expand Down Expand Up @@ -126,9 +129,6 @@ export async function channelOpen (channel, message, state) {
case 'channels.leave':
// TODO: emit event
return { handler: channelOpen }
case 'channels.message':
emit(channel, 'message', message.params.data.message)
return { handler: channelOpen }
case 'channels.update':
changeState(channel, message.params.data.state)
return { handler: channelOpen }
Expand Down Expand Up @@ -197,31 +197,6 @@ export function awaitingUpdateConflict (channel, message, state) {
}
}

export function awaitingProofOfInclusion (channel, message, state) {
if (message.id === state.messageId) {
state.resolve(message.result.poi)
return { handler: channelOpen }
}
if (message.method === 'channels.error') {
state.reject(new Error(message.data.message))
return { handler: channelOpen }
}
}

export function awaitingBalances (channel, message, state) {
if (message.id === state.messageId) {
state.resolve(R.reduce((acc, item) => ({
...acc,
[item.account]: item.balance
}), {}, message.result))
return { handler: channelOpen }
}
if (message.method === 'channels.error') {
state.reject(new Error(message.data.message))
return { handler: channelOpen }
}
}

export async function awaitingShutdownTx (channel, message, state) {
if (message.method === 'channels.sign.shutdown_sign') {
const signedTx = await Promise.resolve(state.sign(message.params.data.tx))
Expand Down
50 changes: 9 additions & 41 deletions es/channel/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import {
initialize,
enqueueAction,
send,
messageId
call
} from './internal'
import * as R from 'ramda'

/**
* Register event listener function
Expand Down Expand Up @@ -120,26 +121,8 @@ function update (from, to, amount, sign) {
* contracts: ['ct_2dCUAWYZdrWfACz3a2faJeKVTVrfDYxCQHCqAt5zM15f3u2UfA']
* }).then(poi => console.log(poi))
*/
function poi ({ accounts, contracts }) {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
(channel, state) => {
const id = messageId(channel)
send(channel, {
jsonrpc: '2.0',
id,
method: 'channels.get.poi',
params: { accounts, contracts }
})
return {
handler: handlers.awaitingProofOfInclusion,
state: { resolve, reject, messageId: id }
}
}
)
})
async function poi ({ accounts, contracts }) {
return (await call(this, 'channels.get.poi', { accounts, contracts })).poi
}

/**
Expand All @@ -155,26 +138,11 @@ function poi ({ accounts, contracts }) {
* console.log(balances['ak_Y1NRjHuoc3CGMYMvCmdHSBpJsMDR6Ra2t5zjhRcbtMeXXLpLH'])
* )
*/
function balances (accounts) {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
(channel, state) => {
const id = messageId(channel)
send(channel, {
jsonrpc: '2.0',
id,
method: 'channels.get.balances',
params: { accounts }
})
return {
handler: handlers.awaitingBalances,
state: { resolve, reject, messageId: id }
}
}
)
})
async function balances (accounts) {
return R.reduce((acc, item) => ({
...acc,
[item.account]: item.balance
}), {}, await call(this, 'channels.get.balances', { accounts }))
}

/**
Expand Down
42 changes: 32 additions & 10 deletions es/channel/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const messageQueueLocked = new WeakMap()
const actionQueue = new WeakMap()
const actionQueueLocked = new WeakMap()
const sequence = new WeakMap()
const rpcCallbacks = new WeakMap()

function channelURL (url, { endpoint = 'channel', ...params }) {
const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) =>
Expand Down Expand Up @@ -103,12 +104,6 @@ async function handleMessage (channel, message) {
enterState(channel, await Promise.resolve(handler(channel, message, state)))
}

async function enqueueMessage (channel, message) {
const queue = messageQueue.get(channel) || []
messageQueue.set(channel, [...queue, JSON.parse(message)])
dequeueMessage(channel)
}

async function dequeueMessage (channel) {
const queue = messageQueue.get(channel)
if (messageQueueLocked.get(channel) || !queue.length) {
Expand All @@ -122,8 +117,34 @@ async function dequeueMessage (channel) {
dequeueMessage(channel)
}

function messageId (channel) {
return sequence.set(channel, sequence.get(channel) + 1).get(channel)
function onMessage (channel, data) {
const message = JSON.parse(data)
if (message.id) {
const callback = rpcCallbacks.get(channel).get(message.id)
try {
callback(message)
} catch (error) {
emit(channel, 'error', error)
} finally {
rpcCallbacks.get(channel).delete(message.id)
}
} else if (message.method === 'channels.message') {
emit(channel, 'message', message.params.data.message)
} else {
messageQueue.set(channel, [...(messageQueue.get(channel) || []), message])
dequeueMessage(channel)
}
}

function call (channel, method, params) {
return new Promise((resolve, reject) => {
const id = sequence.set(channel, sequence.get(channel) + 1).get(channel)
rpcCallbacks.get(channel).set(id, (message) => {
if (message.result) return resolve(message.result)
if (message.error) return reject(new Error(message.error.message))
})
send(channel, { jsonrpc: '2.0', method, id, params })
})
}

function WebSocket (url, callbacks) {
Expand Down Expand Up @@ -167,10 +188,11 @@ async function initialize (channel, channelOptions) {
fsm.set(channel, { handler: awaitingConnection })
eventEmitters.set(channel, new EventEmitter())
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
websockets.set(channel, await WebSocket(channelURL(channelOptions.url, { ...params, protocol: 'json-rpc' }), {
onopen: () => changeStatus(channel, 'connected'),
onclose: () => changeStatus(channel, 'disconnected'),
onmessage: ({ data }) => enqueueMessage(channel, data)
onmessage: ({ data }) => onMessage(channel, data)
}))
}

Expand All @@ -185,5 +207,5 @@ export {
changeState,
send,
enqueueAction,
messageId
call
}

0 comments on commit a8608b8

Please sign in to comment.