Skip to content

Commit

Permalink
Merge branch 'master' into prunning-fix-test
Browse files Browse the repository at this point in the history
  • Loading branch information
magicxyyz committed Dec 22, 2023
2 parents cb4d3c5 + ef9379b commit e24cc5d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 10 deletions.
2 changes: 2 additions & 0 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Warn("failed sequencing delayed messages after catching lock", "err", err)
}
}
// This should be redundant now that even non-primary sequencers broadcast over the feed,
// but the backlog efficiently deduplicates messages, so better safe than sorry.
err = c.streamer.PopulateFeedBacklog()
if err != nil {
log.Warn("failed to populate the feed backlog on lockout acquisition", "err", err)
Expand Down
12 changes: 6 additions & 6 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,6 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex
return err
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastSingle(msgWithMeta, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down Expand Up @@ -927,6 +921,12 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [
default:
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion broadcaster/backlog/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (s *backlogSegment) Contains(i uint64) bool {
s.messagesLock.RLock()
defer s.messagesLock.RUnlock()
start := s.start()
if i < start || i > s.end() {
if i < start || i > s.end() || len(s.messages) == 0 {
return false
}

Expand Down
25 changes: 24 additions & 1 deletion broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package broadcaster

import (
"context"
"errors"
"net"

"github.com/gobwas/ws"
Expand Down Expand Up @@ -56,10 +57,11 @@ func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMeta
}, nil
}

func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) error {
func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastSingle", "recover", r)
err = errors.New("panic in BroadcastSingle")
}
}()
bfm, err := b.NewBroadcastFeedMessage(msg, seq)
Expand All @@ -79,6 +81,27 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) {
b.BroadcastFeedMessages(broadcastFeedMessages)
}

func (b *Broadcaster) BroadcastMessages(messages []arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastMessages", "recover", r)
err = errors.New("panic in BroadcastMessages")
}
}()
var feedMessages []*m.BroadcastFeedMessage
for _, msg := range messages {
bfm, err := b.NewBroadcastFeedMessage(msg, seq)
if err != nil {
return err
}
feedMessages = append(feedMessages, bfm)
}

b.BroadcastFeedMessages(feedMessages)

return nil
}

func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage) {

bm := &m.BroadcastMessage{
Expand Down
9 changes: 7 additions & 2 deletions wsbroadcastserver/clientconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cc *ClientConnection) Remove() {

func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.BacklogSegment) error {
var prevSegment backlog.BacklogSegment
isFirstSegment := true
for !backlog.IsBacklogSegmentNil(segment) {
// must get the next segment before the messages to be sent are
// retrieved ensures another segment is not added in between calls.
Expand All @@ -132,10 +133,14 @@ func (cc *ClientConnection) writeBacklog(ctx context.Context, segment backlog.Ba
}

msgs := prevSegment.Messages()
if prevSegment.Contains(uint64(cc.requestedSeqNum)) {
if isFirstSegment && prevSegment.Contains(uint64(cc.requestedSeqNum)) {
requestedIdx := int(cc.requestedSeqNum) - int(prevSegment.Start())
msgs = msgs[requestedIdx:]
// This might be false if messages were added after we fetched the segment's messages
if len(msgs) >= requestedIdx {
msgs = msgs[requestedIdx:]
}
}
isFirstSegment = false
bm := &m.BroadcastMessage{
Version: m.V1,
Messages: msgs,
Expand Down

0 comments on commit e24cc5d

Please sign in to comment.