Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Event SPI #424

Merged
merged 25 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bug fixes; fix flaky build
Signed-off-by: David Chung <david.chung@docker.com>
  • Loading branch information
David Chung committed Mar 2, 2017
commit 49a773c4b93632560a07eb885d588af3396fce40
142 changes: 142 additions & 0 deletions pkg/broker/client/sse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package client

import (
"fmt"
"net/http"
"testing"
"time"

"github.com/docker/infrakit/pkg/broker/server"
"github.com/stretchr/testify/require"
)

func TestBrokerMultiSubscribersEarlyDisconnects(t *testing.T) {

broker := server.NewBroker()
go http.ListenAndServe("localhost:3002", broker)

// Start sending events right away, continously
go func() {
tick := 0
for {
<-time.After(100 * time.Millisecond)
require.NoError(t, broker.Publish("local/time/tick", tick))
tick++

if tick > 30 {
broker.Stop()
return
}
}
}()

time.Sleep(200 * time.Millisecond)

received1 := make(chan interface{})
received2 := make(chan interface{})

topic1, _, err := Subscribe("http://localhost:3002/", "local", Options{})
require.NoError(t, err)
go func() {
// This subscriber will leave after receiving 5 messages
for {
var val int
require.NoError(t, (<-topic1).Decode(&val))
received1 <- val

if val > 10 {
close(received1)
return
}
}
}()

topic2, _, err := Subscribe("http://localhost:3002/?topic=/local/time", "", Options{})
require.NoError(t, err)
go func() {
for {
var val int
require.NoError(t, (<-topic2).Decode(&val))
received2 <- val

if val > 20 {
close(received2)
return
}
}
}()

values1 := []interface{}{}
values2 := []interface{}{}

for v := range received1 {
if v == nil {
break
}
values1 = append(values1, v)
}
for v := range received2 {
if v == nil {
break
}
values2 = append(values2, v)
}

require.Equal(t, 10, len(values1))
require.Equal(t, 20, len(values2))
}

func TestBrokerMultiSubscriberCustomObject(t *testing.T) {

type event struct {
Time int64
Message string
}

broker := server.NewBroker()
go http.ListenAndServe("localhost:3003", broker)

received1 := make(chan event)
received2 := make(chan event)

topic1, _, err := Subscribe("http://localhost:3003/", "local", Options{})
require.NoError(t, err)
go func() {
for {
var val event
require.NoError(t, (<-topic1).Decode(&val))
received1 <- val
}
}()

topic2, _, err := Subscribe("http://localhost:3003/", "local/instance1", Options{})
require.NoError(t, err)
go func() {
for {
var val event
require.NoError(t, (<-topic2).Decode(&val))
received2 <- val
}
}()

go func() {
for {
<-time.After(10 * time.Millisecond)

now := time.Now()
evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)}
require.NoError(t, broker.Publish("remote/instance1", evt))
require.NoError(t, broker.Publish("local/instance1", evt))
}
}()

// Test a few rounds to make sure all subscribers get the same messages each round.
for i := 0; i < 5; i++ {
a := <-received1
b := <-received2
require.Equal(t, a, b)
}

broker.Stop()

}
Loading