Skip to content

Commit

Permalink
Add support for request messages by topics (#1805)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Babik authored Jan 21, 2020
1 parent 3b81bd2 commit bc2d018
Show file tree
Hide file tree
Showing 21 changed files with 268 additions and 107 deletions.
41 changes: 23 additions & 18 deletions eth-node/bridge/geth/envelope.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gethbridge

import (
"io"

"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
"github.com/status-im/status-go/whisper/v6"
Expand All @@ -15,15 +18,8 @@ func NewWhisperEnvelope(e *whisper.Envelope) types.Envelope {
return &whisperEnvelope{env: e}
}

func UnwrapWhisperEnvelope(e types.Envelope) (*whisper.Envelope, bool) {
if env, ok := e.(*whisperEnvelope); ok {
return env.env, true
}
return nil, false
}

func MustUnwrapWhisperEnvelope(e types.Envelope) *whisper.Envelope {
return e.(*whisperEnvelope).env
func (w *whisperEnvelope) Unwrap() interface{} {
return w.env
}

func (w *whisperEnvelope) Hash() types.Hash {
Expand Down Expand Up @@ -54,6 +50,14 @@ func (w *whisperEnvelope) Size() int {
return len(w.env.Data)
}

func (w *whisperEnvelope) DecodeRLP(s *rlp.Stream) error {
return w.env.DecodeRLP(s)
}

func (w *whisperEnvelope) EncodeRLP(writer io.Writer) error {
return rlp.Encode(writer, w.env)
}

type wakuEnvelope struct {
env *waku.Envelope
}
Expand All @@ -63,15 +67,8 @@ func NewWakuEnvelope(e *waku.Envelope) types.Envelope {
return &wakuEnvelope{env: e}
}

func UnwrapWakuEnvelope(e types.Envelope) (*waku.Envelope, bool) {
if env, ok := e.(*wakuEnvelope); ok {
return env.env, true
}
return nil, false
}

func MustUnwrapWakuEnvelope(e types.Envelope) *waku.Envelope {
return e.(*wakuEnvelope).env
func (w *wakuEnvelope) Unwrap() interface{} {
return w.env
}

func (w *wakuEnvelope) Hash() types.Hash {
Expand Down Expand Up @@ -101,3 +98,11 @@ func (w *wakuEnvelope) Topic() types.TopicType {
func (w *wakuEnvelope) Size() int {
return len(w.env.Data)
}

func (w *wakuEnvelope) DecodeRLP(s *rlp.Stream) error {
return w.env.DecodeRLP(s)
}

func (w *wakuEnvelope) EncodeRLP(writer io.Writer) error {
return rlp.Encode(writer, w.env)
}
2 changes: 1 addition & 1 deletion eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error {
return w.waku.RequestHistoricMessagesWithTimeout(peerID, MustUnwrapWakuEnvelope(envelope), timeout)
return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*waku.Envelope), timeout)
}

type wakuFilterWrapper struct {
Expand Down
2 changes: 1 addition & 1 deletion eth-node/bridge/geth/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (w *gethWhisperWrapper) SendMessagesRequest(peerID []byte, r types.Messages
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (w *gethWhisperWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error {
return w.whisper.RequestHistoricMessagesWithTimeout(peerID, MustUnwrapWhisperEnvelope(envelope), timeout)
return w.whisper.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*whisper.Envelope), timeout)
}

// SyncMessages can be sent between two Mail Servers and syncs envelopes between them.
Expand Down
4 changes: 3 additions & 1 deletion eth-node/types/envelopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package types
// Envelope represents a clear-text data packet to transmit through the Whisper
// network. Its contents may or may not be encrypted and signed.
type Envelope interface {
Hash() Hash // Cached hash of the envelope to avoid rehashing every time.
Wrapped

Hash() Hash // cached hash of the envelope to avoid rehashing every time
Bloom() []byte
PoW() float64
Expiry() uint32
Expand Down
7 changes: 7 additions & 0 deletions eth-node/types/wrapped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package types

// Wrapped tells that a given object has an underlying representation
// and this representation can be accessed using `Unwrap` method.
type Wrapped interface {
Unwrap() interface{}
}
2 changes: 1 addition & 1 deletion mailserver/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func countMessages(t *testing.T, db DB) int {
}

i, _ := db.BuildIterator(query)
defer i.Release()
defer func() { _ = i.Release() }()

for i.Next() {
var env whisper.Envelope
Expand Down
34 changes: 20 additions & 14 deletions mailserver/mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func (s *WakuMailServer) Deliver(peerID []byte, req waku.MessagesRequest) {
Lower: req.From,
Upper: req.To,
Bloom: req.Bloom,
Topics: req.Topics,
Limit: req.Limit,
Cursor: req.Cursor,
Batch: true,
Expand Down Expand Up @@ -512,7 +513,7 @@ func (whisperAdapter) CreateRequestCompletedPayload(reqID, lastEnvelopeHash type
func (whisperAdapter) CreateSyncResponse(envelopes []types.Envelope, cursor []byte, final bool, err string) interface{} {
whisperEnvelopes := make([]*whisper.Envelope, len(envelopes))
for i, env := range envelopes {
whisperEnvelopes[i] = gethbridge.MustUnwrapWhisperEnvelope(env)
whisperEnvelopes[i] = env.Unwrap().(*whisper.Envelope)
}
return whisper.SyncResponse{
Envelopes: whisperEnvelopes,
Expand Down Expand Up @@ -698,6 +699,19 @@ func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPa

req.SetDefaults()

log.Info(
"[mailserver:DeliverMail] processing request",
"peerID", peerID.String(),
"requestID", reqID.String(),
"lower", req.Lower,
"upper", req.Upper,
"bloom", req.Bloom,
"topics", req.Topics,
"limit", req.Limit,
"cursor", req.Cursor,
"batch", req.Batch,
)

if err := req.Validate(); err != nil {
syncFailuresCounter.WithLabelValues("req_invalid").Inc()
log.Error(
Expand All @@ -721,17 +735,6 @@ func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPa
return
}

log.Info(
"[mailserver:DeliverMail] processing request",
"peerID", peerID.String(),
"requestID", reqID.String(),
"lower", req.Lower,
"upper", req.Upper,
"bloom", req.Bloom,
"limit", req.Limit,
"cursor", req.Cursor,
"batch", req.Batch,
)
if req.Batch {
requestsBatchedCounter.Inc()
}
Expand All @@ -745,7 +748,7 @@ func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPa
)
return
}
defer iter.Release()
defer func() { _ = iter.Release() }()

bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
Expand Down Expand Up @@ -773,6 +776,7 @@ func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPa
nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
iter,
req.Bloom,
req.Topics,
int(req.Limit),
processRequestTimeout,
reqID.String(),
Expand Down Expand Up @@ -843,7 +847,7 @@ func (s *mailServer) SyncMail(peerID types.Hash, req MessagesRequestPayload) err
syncFailuresCounter.WithLabelValues("iterator").Inc()
return err
}
defer iter.Release()
defer func() { _ = iter.Release() }()

bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
Expand All @@ -864,6 +868,7 @@ func (s *mailServer) SyncMail(peerID types.Hash, req MessagesRequestPayload) err
nextCursor, _ := s.processRequestInBundles(
iter,
req.Bloom,
req.Topics,
int(req.Limit),
processRequestTimeout,
requestID,
Expand Down Expand Up @@ -960,6 +965,7 @@ func (s *mailServer) createIterator(req MessagesRequestPayload) (Iterator, error
func (s *mailServer) processRequestInBundles(
iter Iterator,
bloom []byte,
topics [][]byte,
limit int,
timeout time.Duration,
requestID string,
Expand Down
3 changes: 2 additions & 1 deletion mailserver/mailserver_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DB interface {
type Iterator interface {
Next() bool
DBKey() (*DBKey, error)
Release()
Release() error
Error() error
GetEnvelope(bloom []byte) ([]byte, error)
}
Expand All @@ -34,4 +34,5 @@ type CursorQuery struct {
cursor []byte
limit uint32
bloom []byte
topics [][]byte
}
20 changes: 6 additions & 14 deletions mailserver/mailserver_db_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/whisper/v6"
)
Expand Down Expand Up @@ -55,7 +54,11 @@ func (i *LevelDBIterator) GetEnvelope(bloom []byte) ([]byte, error) {
return nil, nil
}
return rawValue, nil
}

func (i *LevelDBIterator) Release() error {
i.Iterator.Release()
return nil
}

func NewLevelDB(dataDir string) (*LevelDB, error) {
Expand Down Expand Up @@ -103,7 +106,7 @@ func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
if err != nil {
return 0, err
}
defer i.Release()
defer func() { _ = i.Release() }()

batch := leveldb.Batch{}
removed := 0
Expand Down Expand Up @@ -142,18 +145,7 @@ func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
defer recoverLevelDBPanics("SaveEnvelope")

key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())

var (
rawEnvelope []byte
err error
)
if whisperEnv, ok := gethbridge.UnwrapWhisperEnvelope(env); ok {
rawEnvelope, err = rlp.EncodeToBytes(whisperEnv)
} else if wakuEnv, ok := gethbridge.UnwrapWakuEnvelope(env); ok {
rawEnvelope, err = rlp.EncodeToBytes(wakuEnv)
} else {
return errors.New("unsupported underlying types.Envelope type")
}
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.Inc()
Expand Down
Loading

0 comments on commit bc2d018

Please sign in to comment.