Skip to content

Commit

Permalink
deps: update it-length-prefixed and uint8arraylists deps (#91)
Browse files Browse the repository at this point in the history
Update deps to support no-copy operations
  • Loading branch information
achingbrain committed Aug 1, 2022
1 parent 3251829 commit f295fce
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 102 deletions.
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-pubsub": "^2.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
Expand All @@ -191,11 +191,12 @@
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.0",
"it-length-prefixed": "^7.0.1",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.3",
"p-queue": "^7.2.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand All @@ -204,9 +205,9 @@
"delay": "^5.0.0",
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
"p-wait-for": "^4.1.0",
"protons": "^3.0.4",
"protons-runtime": "^1.0.4",
"p-wait-for": "^5.0.0",
"protons": "^4.0.1",
"protons-runtime": "^2.0.2",
"sinon": "^14.0.0",
"util": "^0.12.4"
}
Expand Down
46 changes: 35 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Queue from 'p-queue'
import { createTopology } from '@libp2p/topology'
import { codes } from './errors.js'
import { PeerStreams as PeerStreamsImpl } from './peer-streams.js'
import { toMessage, ensureArray, randomSeqno, noSignMsgId, msgId, toRpcMessage } from './utils.js'
import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js'
import {
signMessage,
verifySignature
Expand All @@ -17,6 +17,7 @@ import type { Connection } from '@libp2p/interface-connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/components'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:pubsub')

Expand Down Expand Up @@ -284,7 +285,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
/**
* Responsible for processing each RPC message received by other peers.
*/
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8Array>, peerStreams: PeerStreams) {
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams) {
try {
await pipe(
stream,
Expand Down Expand Up @@ -446,6 +447,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
if (msg.type !== 'signed') {
throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

if (msg.sequenceNumber == null) {
throw errcode(new Error('Need seqno when signature policy is StrictSign but it was missing'), codes.ERR_MISSING_SEQNO)
}
Expand Down Expand Up @@ -474,19 +479,19 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
* Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf.
*/
abstract decodeRpc (bytes: Uint8Array): PubSubRPC
abstract decodeRpc (bytes: Uint8Array | Uint8ArrayList): PubSubRPC

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*/
abstract encodeRpc (rpc: PubSubRPC): Uint8Array
abstract encodeRpc (rpc: PubSubRPC): Uint8ArrayList

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*/
abstract encodeMessage (rpc: PubSubRPCMessage): Uint8Array
abstract encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList

/**
* Send an rpc object to a peer
Expand Down Expand Up @@ -523,26 +528,42 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
if (message.type !== 'unsigned') {
throw errcode(new Error('Message type should be "unsigned" when signature policy is StrictNoSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

// @ts-expect-error should not be present
if (message.signature != null) {
throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
}

// @ts-expect-error should not be present
if (message.key != null) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}

// @ts-expect-error should not be present
if (message.sequenceNumber != null) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case 'StrictSign':
if (message.type !== 'signed') {
throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

if (message.signature == null) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}

if (message.sequenceNumber == null) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
throw errcode(new Error('StrictSigning: Signing required and no sequenceNumber was present'), codes.ERR_MISSING_SEQNO)
}

if (!(await verifySignature(message, this.encodeMessage.bind(this)))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}

break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
Expand All @@ -559,14 +580,16 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
* Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send.
*/
async buildMessage (message: Message) {
async buildMessage (message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }): Promise<Message> {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
message.sequenceNumber = randomSeqno()
return await signMessage(this.components.getPeerId(), message, this.encodeMessage.bind(this))
case 'StrictNoSign':
return await Promise.resolve(message)
return await Promise.resolve({
type: 'unsigned',
...message
})
default:
throw errcode(new Error('Cannot build message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
Expand Down Expand Up @@ -603,10 +626,11 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
throw new Error('Pubsub has not started')
}

const message: Message = {
const message = {
from: this.components.getPeerId(),
topic,
data: data ?? new Uint8Array(0)
data: data ?? new Uint8Array(0),
sequenceNumber: randomSeqno()
}

log('publish topic: %s from: %p data: %m', topic, message.from, message.data)
Expand Down
12 changes: 7 additions & 5 deletions src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import type { Stream } from '@libp2p/interface-connection'
import type { Pushable } from 'it-pushable'
import type { PeerStreamEvents } from '@libp2p/interface-pubsub'
import { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p-pubsub:peer-streams')

Expand All @@ -25,11 +26,11 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Write stream - it's preferable to use the write method
*/
public outboundStream?: Pushable<Uint8Array>
public outboundStream?: Pushable<Uint8ArrayList>
/**
* Read stream
*/
public inboundStream?: AsyncIterable<Uint8Array>
public inboundStream?: AsyncIterable<Uint8ArrayList>
/**
* The raw outbound stream, as retrieved from conn.newStream
*/
Expand Down Expand Up @@ -72,13 +73,13 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
* Send a message to this peer.
* Throws if there is no `stream` to write to available.
*/
write (data: Uint8Array) {
write (data: Uint8Array | Uint8ArrayList) {
if (this.outboundStream == null) {
const id = this.id.toString()
throw new Error('No writable connection to ' + id)
}

this.outboundStream.push(data)
this.outboundStream.push(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
}

/**
Expand Down Expand Up @@ -115,7 +116,8 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
}

this._rawOutboundStream = stream
this.outboundStream = pushable({
this.outboundStream = pushable<Uint8ArrayList>({
objectMode: true,
onEnd: (shouldEmit) => {
// close writable side of the stream
if (this._rawOutboundStream != null && this._rawOutboundStream.reset != null) { // eslint-disable-line @typescript-eslint/prefer-optional-chain
Expand Down
49 changes: 31 additions & 18 deletions src/sign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toRpcMessage } from './utils.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { PubSubRPCMessage, SignedMessage } from '@libp2p/interface-pubsub'
import { peerIdFromKeys } from '@libp2p/peer-id'
import type { Uint8ArrayList } from 'uint8arraylist'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')

/**
* Signs the provided message with the given `peerId`
*/
export async function signMessage (peerId: PeerId, message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) {
// Get the message in bytes, and prepend with the pubsub prefix
const bytes = uint8ArrayConcat([
SignPrefix,
encode(toRpcMessage(message))
])

export async function signMessage (peerId: PeerId, message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList): Promise<SignedMessage> {
if (peerId.privateKey == null) {
throw new Error('Cannot sign message, no private key present')
}
Expand All @@ -26,22 +21,36 @@ export async function signMessage (peerId: PeerId, message: Message, encode: (rp
throw new Error('Cannot sign message, no public key present')
}

const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey)
const signature = await privateKey.sign(bytes)

const outputMessage: Message = {
...message,
signature: signature,
key: peerId.publicKey
// @ts-expect-error signature field is missing, added below
const outputMessage: SignedMessage = {
type: 'signed',
topic: message.topic,
data: message.data,
sequenceNumber: message.sequenceNumber,
from: peerId
}

// Get the message in bytes, and prepend with the pubsub prefix
const bytes = uint8ArrayConcat([
SignPrefix,
encode(toRpcMessage(outputMessage)).subarray()
])

const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey)
outputMessage.signature = await privateKey.sign(bytes)
outputMessage.key = peerId.publicKey

return outputMessage
}

/**
* Verifies the signature of the given message
*/
export async function verifySignature (message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) {
export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList) {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to be verified')
}

if (message.signature == null) {
throw new Error('Message must contain a signature to be verified')
}
Expand All @@ -57,7 +66,7 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC
...toRpcMessage(message),
signature: undefined,
key: undefined
})
}).subarray()
])

// Get the public key
Expand All @@ -72,7 +81,11 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC
* Returns the PublicKey associated with the given message.
* If no valid PublicKey can be retrieved an error will be returned.
*/
export async function messagePublicKey (message: Message) {
export async function messagePublicKey (message: SignedMessage) {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to have a public key')
}

// should be available in the from property of the message (peer id)
if (message.from == null) {
throw new Error('Could not get the public key from the originator id')
Expand Down
32 changes: 24 additions & 8 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,40 @@ export const toMessage = (message: PubSubRPCMessage): Message => {
throw errcode(new Error('RPC message was missing from'), codes.ERR_MISSING_FROM)
}

if (message.sequenceNumber == null || message.from == null || message.signature == null || message.key == null) {
return {
type: 'unsigned',
topic: message.topic ?? '',
data: message.data ?? new Uint8Array(0)
}
}

return {
type: 'signed',
from: peerIdFromBytes(message.from),
topic: message.topic ?? '',
sequenceNumber: message.sequenceNumber == null ? undefined : bigIntFromBytes(message.sequenceNumber),
sequenceNumber: bigIntFromBytes(message.sequenceNumber),
data: message.data ?? new Uint8Array(0),
signature: message.signature ?? undefined,
key: message.key ?? undefined
signature: message.signature,
key: message.key
}
}

export const toRpcMessage = (message: Message): PubSubRPCMessage => {
if (message.type === 'signed') {
return {
from: message.from.multihash.bytes,
data: message.data,
sequenceNumber: bigIntToBytes(message.sequenceNumber),
topic: message.topic,
signature: message.signature,
key: message.key
}
}

return {
from: message.from.multihash.bytes,
data: message.data,
sequenceNumber: message.sequenceNumber == null ? undefined : bigIntToBytes(message.sequenceNumber),
topic: message.topic,
signature: message.signature,
key: message.key
topic: message.topic
}
}

Expand Down
7 changes: 4 additions & 3 deletions test/instance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import { expect } from 'aegir/chai'
import { PubSubBaseProtocol } from '../src/index.js'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { Uint8ArrayList } from 'uint8arraylist'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
throw new Error('Method not implemented.')
}

encodeRpc (rpc: PubSubRPC): Uint8Array {
encodeRpc (rpc: PubSubRPC): Uint8ArrayList {
throw new Error('Method not implemented.')
}

decodeMessage (bytes: Uint8Array): PubSubRPCMessage {
decodeMessage (bytes: Uint8Array | Uint8ArrayList): PubSubRPCMessage {
throw new Error('Method not implemented.')
}

encodeMessage (rpc: PubSubRPCMessage): Uint8Array {
encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList {
throw new Error('Method not implemented.')
}

Expand Down
Loading

0 comments on commit f295fce

Please sign in to comment.