Skip to content

Commit

Permalink
mqtt working end-to-end
Browse files Browse the repository at this point in the history
  • Loading branch information
tobowers committed Jun 8, 2020
1 parent d61ac89 commit 73abe64
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 25 deletions.
25 changes: 10 additions & 15 deletions aggregator/api/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

logging "github.com/ipfs/go-log"

"github.com/ipfs/go-cid"
cbornode "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
"github.com/quorumcontrol/chaintree/safewrap"
"github.com/quorumcontrol/messages/v2/build/go/services"
"github.com/quorumcontrol/tupelo-lite/aggregator"
)

Expand All @@ -21,10 +21,8 @@ func init() {

// AddBlockMessage is sent to the message queue for every update
type AddBlockMessage struct {
Did string
NewTip cid.Cid
NewBlocks [][]byte
State [][]byte
AddBlockRequest *services.AddBlockRequest
NewBlocks [][]byte
}

func blocksToBytes(blocks []format.Node) [][]byte {
Expand All @@ -51,19 +49,16 @@ func StartPublishing(ctx context.Context, publishFunc MessageQueueFunc) (aggrega
sw := &safewrap.SafeWrap{}
wrapper := <-updateCh

newTip, err := cid.Cast(wrapper.NewTip)
if err != nil {
logger.Errorf("error getting CID: %v", err)
continue
}
addBlockMessage := &AddBlockMessage{
Did: string(wrapper.ObjectId),
NewTip: newTip,
NewBlocks: blocksToBytes(wrapper.NewNodes),
State: wrapper.State,
AddBlockRequest: wrapper.AddBlockRequest,
NewBlocks: blocksToBytes(wrapper.NewNodes),
}
wrapped := sw.WrapObject(addBlockMessage)
err = publishFunc(ctx, fmt.Sprintf("public/trees/%s", addBlockMessage.Did), wrapped.RawData())
if sw.Err != nil {
logger.Errorf("error wrapping: %v", sw.Err)
continue
}
err := publishFunc(ctx, fmt.Sprintf("public/trees/%s", string(wrapper.ObjectId)), wrapped.RawData())
if err != nil {
logger.Errorf("error publishing: %v", err)
continue
Expand Down
11 changes: 9 additions & 2 deletions aggregator/api/server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/quorumcontrol/tupelo-lite/aggregator/api/publisher"
"github.com/quorumcontrol/tupelo/sdk/gossip/testhelpers"
"github.com/stretchr/testify/require"
)
Expand All @@ -26,7 +28,7 @@ func TestPublishesToMqtt(t *testing.T) {
require.True(t, didConnect)

// TODO: type this chan
resp := make(chan interface{})
resp := make(chan mqtt.Message)
subTok := cli.Subscribe("public/trees/#", byte(0), func(cli mqtt.Client, msg mqtt.Message) {
resp <- msg
})
Expand All @@ -36,10 +38,15 @@ func TestPublishesToMqtt(t *testing.T) {
// now send an ABR to the aggregator
// and get the subscription!
abr := testhelpers.NewValidTransaction(t)

_, err := agg.Add(ctx, &abr)
require.Nil(t, err)

// and we should get a message
<-resp
updateMsg := <-resp

update := &publisher.AddBlockMessage{}
err = cbornode.DecodeInto(updateMsg.Payload(), update)
require.Nil(t, err)
require.Equal(t, update.AddBlockRequest.ObjectId, abr.ObjectId)
}
1 change: 0 additions & 1 deletion src/community/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './community'
export * from './blockservice'
17 changes: 10 additions & 7 deletions src/pubsub/localmqtt.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,34 @@ import 'mocha';
import {expect} from 'chai';
import { configurePubSubForLocal } from './mqtt';
import { PubSub } from 'aws-amplify';
import debug from 'debug';

const log = debug("localmqtttest")

describe("local mqtt", ()=> {
before(()=> {
configurePubSubForLocal({endpoint: "ws://127.0.0.1:8081/mqtt"})
})

it("publishes and subscribes", async ()=> {
it("publishes and subscribes", ()=> {
return new Promise(async (resolve,reject) => {

PubSub.subscribe('public/userToUser/test').subscribe({
next: data => { console.log('Message received', data); resolve() },
error: error => console.error("sub error: ", error),
complete: () => console.log('Done'),
next: data => { log('Message received', data); resolve() },
error: error => log("sub error: ", error),
complete: () => log('Done'),
});
console.log("subscribed")
log("subscribed")
setTimeout(async ()=> {
try {
await PubSub.publish('public/userToUser/test', { msg: 'Hello to all subscribers!' });
} catch(e) {
console.error("error: ", e)
reject(e)
}
console.log("published")
log("published")

},1000)
},100)
})
})
})

0 comments on commit 73abe64

Please sign in to comment.