Skip to content

Commit

Permalink
network: Split out recv window.
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Jul 9, 2018
1 parent 4e06481 commit 47b841a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 45 deletions.
49 changes: 4 additions & 45 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ func (n *Network) Accept(conn net.Conn) {
var client *PeerClient
var clientInit sync.Once

recvWindow := NewRingBuffer(RECV_WINDOW_SIZE) // value type = *protobuf.Message
recvMessageNonce := uint64(1)
recvMutex := &sync.Mutex{}
recvWindow := NewRecvWindow(RECV_WINDOW_SIZE)

var err error

Expand All @@ -380,38 +378,6 @@ func (n *Network) Accept(conn net.Conn) {
}
}()

checkRecvWindow := func() error {
ready := make([]*protobuf.Message, 0)

recvMutex.Lock()
i := 0
for ; i < RECV_WINDOW_SIZE; i++ {
cursor := recvWindow.Index(i)
if *cursor == nil {
break
}
ready = append(ready, (*cursor).(*protobuf.Message))
*cursor = nil
}
if i > 0 && i < RECV_WINDOW_SIZE {
recvWindow.MoveForward(i)
}
recvMessageNonce += uint64(i)
recvMutex.Unlock()

//glog.Infof("Sending %d messages", len(ready))

for _, msg := range ready {
select {
case n.RecvQueue <- msg:
default:
return errors.New("recv queue is full")
//glog.Errorf("recv queue full, dropping messages")
}
}
return nil
}

// Wrap a session around the incoming connection.
incoming, err = smux.Server(conn, muxConfig())
if err != nil {
Expand Down Expand Up @@ -469,18 +435,11 @@ func (n *Network) Accept(conn net.Conn) {
return
}

recvMutex.Lock()
offset := int(msg.MessageNonce - recvMessageNonce)
if offset < 0 || offset >= RECV_WINDOW_SIZE {
glog.Errorf("Local message nonce is %d while received %d", recvMessageNonce, msg.MessageNonce)
recvMutex.Unlock()
incoming.Close()
return
err = recvWindow.Input(msg)
if err == nil {
err = recvWindow.Update(n)
}
*recvWindow.Index(offset) = msg
recvMutex.Unlock()

err = checkRecvWindow()
if err != nil {
glog.Error(err)
incoming.Close()
Expand Down
71 changes: 71 additions & 0 deletions network/recv_window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package network

import (
"sync"

"github.com/perlin-network/noise/protobuf"
"github.com/pkg/errors"
)

type RecvWindow struct {
sync.Mutex

size int
buffer *RingBuffer
messageNonce uint64
}

func NewRecvWindow(size int) *RecvWindow {
return &RecvWindow {
size: size,
buffer: NewRingBuffer(size),
messageNonce: 1,
}
}

func (w *RecvWindow) Update(n *Network) error {
ready := make([]*protobuf.Message, 0)

w.Lock()
i := 0
for ; i < w.size; i++ {
cursor := w.buffer.Index(i)
if *cursor == nil {
break
}
ready = append(ready, (*cursor).(*protobuf.Message))
*cursor = nil
}
if i > 0 && i < w.size {
w.buffer.MoveForward(i)
}
w.messageNonce += uint64(i)
w.Unlock()

//glog.Infof("Sending %d messages", len(ready))

for _, msg := range ready {
select {
case n.RecvQueue <- msg:
default:
return errors.New("recv queue is full")
//glog.Errorf("recv queue full, dropping messages")
}
}

return nil
}

func (w *RecvWindow) Input(msg *protobuf.Message) error {
w.Lock()
defer w.Unlock()

offset := int(msg.MessageNonce - w.messageNonce)

if offset < 0 || offset >= w.size {
return errors.Errorf("Local message nonce is %d while received %d", w.messageNonce, msg.MessageNonce)
}

*w.buffer.Index(offset) = msg
return nil
}

0 comments on commit 47b841a

Please sign in to comment.