Skip to content

Commit

Permalink
Merge remote-tracking branch 'floodsub/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Jun 27, 2023
2 parents 839b45a + 34be608 commit c2ea85d
Show file tree
Hide file tree
Showing 29 changed files with 3,783 additions and 0 deletions.
5 changes: 5 additions & 0 deletions packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
"types": "./dist/src/peers.d.ts",
"import": "./dist/src/peers.js"
},
"./pubsub": {
"types": "./dist/src/pubsub/index.d.ts",
"import": "./dist/src/pubsub/index.js"
},
"./stream-muxer": {
"types": "./dist/src/stream-muxer/index.d.ts",
"import": "./dist/src/stream-muxer/index.js"
Expand Down Expand Up @@ -119,6 +123,7 @@
"it-stream-types": "^2.0.1",
"merge-options": "^3.0.4",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-limit": "^4.0.0",
"p-wait-for": "^5.0.2",
"sinon": "^15.1.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { connectionPair } from './connection.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PubSub } from '@libp2p/interface/pubsub'
import type { Startable } from '@libp2p/interface/startable'
import type { ConnectionManager, PendingDial } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
Expand All @@ -16,6 +17,7 @@ export interface MockNetworkComponents {
registrar: Registrar
connectionManager: ConnectionManager
events: EventEmitter<Libp2pEvents>
pubsub?: PubSub
}

class MockNetwork {
Expand Down
114 changes: 114 additions & 0 deletions packages/interface-compliance-tests/src/pubsub/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { isStartable, start, stop } from '@libp2p/interface/startable'
import { expect } from 'aegir/chai'
import delay from 'delay'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { mockNetwork } from '../mocks/index.js'
import { createComponents } from './utils.js'
import type { PubSubArgs, PubSubComponents } from './index.js'
import type { TestSetup } from '../index.js'
import type { PubSub } from '@libp2p/interface/pubsub'

const topic = 'foo'
const data = uint8ArrayFromString('bar')

export default (common: TestSetup<PubSub, PubSubArgs>): void => {
describe('pubsub api', () => {
let pubsub: PubSub
let components: PubSubComponents

// Create pubsub router
beforeEach(async () => {
mockNetwork.reset()
components = await createComponents()

pubsub = components.pubsub = await common.setup({
components,
init: {
emitSelf: true
}
})
})

afterEach(async () => {
sinon.restore()
await stop(...Object.values(components))
await common.teardown()
mockNetwork.reset()
})

it('can start correctly', async () => {
if (!isStartable(pubsub)) {
return
}

sinon.spy(components.registrar, 'register')

await start(...Object.values(components))

expect(pubsub.isStarted()).to.equal(true)
expect(components.registrar.register).to.have.property('callCount', 1)
})

it('can stop correctly', async () => {
if (!isStartable(pubsub)) {
return
}

sinon.spy(components.registrar, 'unregister')

await start(...Object.values(components))
await stop(...Object.values(components))

expect(pubsub.isStarted()).to.equal(false)
expect(components.registrar.unregister).to.have.property('callCount', 1)
})

it('can subscribe and unsubscribe correctly', async () => {
const handler = (): void => {
throw new Error('a message should not be received')
}

await start(...Object.values(components))
pubsub.subscribe(topic)
pubsub.addEventListener('message', handler)

await pWaitFor(() => {
const topics = pubsub.getTopics()
return topics.length === 1 && topics[0] === topic
})

pubsub.removeEventListener('message', handler)
pubsub.unsubscribe(topic)

await pWaitFor(() => pubsub.getTopics().length === 0)

// Publish to guarantee the handler is not called
await pubsub.publish(topic, data)

// handlers are called async
await delay(100)

await stop(...Object.values(components))
})

it('can subscribe and publish correctly', async () => {
const defer = pDefer()

await start(...Object.values(components))

pubsub.subscribe(topic)
pubsub.addEventListener('message', (evt) => {
expect(evt).to.have.nested.property('detail.topic', topic)
expect(evt).to.have.deep.nested.property('detail.data', data)
defer.resolve()
})
await pubsub.publish(topic, data)
await defer.promise

await stop(...Object.values(components))
})
})
}
Loading

0 comments on commit c2ea85d

Please sign in to comment.