From f08ba9eafaacbc710d3211434a82c1828c57687b Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 9 Dec 2022 16:35:13 +0000 Subject: [PATCH] refactor: don't rely on sdk/codec in streaming (#14155) --- store/listenkv/store_test.go | 7 +-- store/rootmulti/store_test.go | 13 ++--- store/streaming/constructor.go | 7 +-- store/streaming/constructor_test.go | 14 ++--- store/streaming/file/service.go | 17 +++--- store/streaming/file/service_test.go | 5 +- store/types/listening.go | 8 +-- store/types/listening_test.go | 9 +-- store/types/store.go | 2 +- store/types/utils.go | 84 ++++++++++++++++++++++++++++ 10 files changed, 114 insertions(+), 52 deletions(-) diff --git a/store/listenkv/store_test.go b/store/listenkv/store_test.go index 44be4120427b..6afc75becc3a 100644 --- a/store/listenkv/store_test.go +++ b/store/listenkv/store_test.go @@ -6,8 +6,6 @@ import ( "io" "testing" - "github.com/cosmos/cosmos-sdk/codec" - codecTypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/prefix" @@ -30,9 +28,8 @@ var kvPairs = []types.KVPair{ } var ( - testStoreKey = types.NewKVStoreKey("listen_test") - interfaceRegistry = codecTypes.NewInterfaceRegistry() - testMarshaller = codec.NewProtoCodec(interfaceRegistry) + testStoreKey = types.NewKVStoreKey("listen_test") + testMarshaller = types.NewTestCodec() ) func newListenKVStore(w io.Writer) *listenkv.Store { diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index f5e09c39128e..df75776433f8 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -11,8 +11,6 @@ import ( "github.com/tendermint/tendermint/libs/log" dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/codec" - codecTypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/iavl" sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps" @@ -646,12 +644,11 @@ func TestAddListenersAndListeningEnabled(t *testing.T) { } var ( - interfaceRegistry = codecTypes.NewInterfaceRegistry() - testMarshaller = codec.NewProtoCodec(interfaceRegistry) - testKey1 = []byte{1, 2, 3, 4, 5} - testValue1 = []byte{5, 4, 3, 2, 1} - testKey2 = []byte{2, 3, 4, 5, 6} - testValue2 = []byte{6, 5, 4, 3, 2} + testMarshaller = types.NewTestCodec() + testKey1 = []byte{1, 2, 3, 4, 5} + testValue1 = []byte{5, 4, 3, 2, 1} + testKey2 = []byte{2, 3, 4, 5, 6} + testValue2 = []byte{6, 5, 4, 3, 2} ) func TestGetListenWrappedKVStore(t *testing.T) { diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index e68be1c893fb..c5c6b1bfb751 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -9,7 +9,6 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client/flags" - "github.com/cosmos/cosmos-sdk/codec" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming/file" "github.com/cosmos/cosmos-sdk/store/types" @@ -20,7 +19,7 @@ import ( ) // ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec, log.Logger) (baseapp.StreamingService, error) +type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error) // ServiceType enum for specifying the type of StreamingService type ServiceType int @@ -90,7 +89,7 @@ func NewServiceConstructor(name string) (ServiceConstructor, error) { func NewFileStreamingService( opts serverTypes.AppOptions, keys []types.StoreKey, - marshaller codec.BinaryCodec, + marshaller types.Codec, logger log.Logger, ) (baseapp.StreamingService, error) { homePath := cast.ToString(opts.Get(flags.FlagHome)) @@ -122,7 +121,7 @@ func NewFileStreamingService( func LoadStreamingServices( bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, - appCodec codec.BinaryCodec, + appCodec types.Codec, logger log.Logger, keys map[string]*types.KVStoreKey, ) ([]baseapp.StreamingService, *sync.WaitGroup, error) { diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go index 29d097611cbb..d2ded40bbdc7 100644 --- a/store/streaming/constructor_test.go +++ b/store/streaming/constructor_test.go @@ -8,14 +8,11 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/baseapp" - "github.com/cosmos/cosmos-sdk/codec" - codecTypes "github.com/cosmos/cosmos-sdk/codec/types" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming" "github.com/cosmos/cosmos-sdk/store/streaming/file" "github.com/cosmos/cosmos-sdk/store/types" simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" - "github.com/cosmos/cosmos-sdk/types/module/testutil" ) type fakeOptions struct{} @@ -29,10 +26,9 @@ func (f *fakeOptions) Get(key string) interface{} { } var ( - mockOptions = new(fakeOptions) - mockKeys = []types.StoreKey{types.NewKVStoreKey("mockKey1"), types.NewKVStoreKey("mockKey2")} - interfaceRegistry = codecTypes.NewInterfaceRegistry() - testMarshaller = codec.NewProtoCodec(interfaceRegistry) + mockOptions = new(fakeOptions) + mockKeys = []types.StoreKey{types.NewKVStoreKey("mockKey1"), types.NewKVStoreKey("mockKey2")} + testMarshaller = types.NewTestCodec() ) func TestStreamingServiceConstructor(t *testing.T) { @@ -56,7 +52,7 @@ func TestStreamingServiceConstructor(t *testing.T) { func TestLoadStreamingServices(t *testing.T) { db := dbm.NewMemDB() - encCdc := testutil.MakeTestEncodingConfig() + encCdc := types.NewTestCodec() keys := types.NewKVStoreKeys("mockKey1", "mockKey2") bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil) @@ -82,7 +78,7 @@ func TestLoadStreamingServices(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc.Codec, log.NewNopLogger(), keys) + activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys) require.NoError(t, err) require.Equal(t, tc.activeStreamersLen, len(activeStreamers)) }) diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index e98e678352cb..1933c7aae62b 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -13,11 +13,10 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + "cosmossdk.io/errors" "github.com/cosmos/cosmos-sdk/baseapp" - "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) var _ baseapp.StreamingService = &StreamingService{} @@ -27,7 +26,7 @@ type StreamingService struct { storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore filePrefix string // optional prefix for each of the generated files writeDir string // directory to write files into - codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files + codec types.Codec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files logger log.Logger currentBlockNumber int64 @@ -42,7 +41,7 @@ type StreamingService struct { } // NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys -func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { +func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c types.Codec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { // sort storeKeys for deterministic output sort.SliceStable(storeKeys, func(i, j int) bool { return storeKeys[i].Name() < storeKeys[j].Name() @@ -191,26 +190,26 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) { var f *os.File f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) if err != nil { - return sdkerrors.Wrapf(err, "open file failed: %s", path) + return errors.Wrapf(err, "open file failed: %s", path) } defer func() { // avoid overriding the real error with file close error if err1 := f.Close(); err1 != nil && err == nil { - err = sdkerrors.Wrapf(err, "close file failed: %s", path) + err = errors.Wrapf(err, "close file failed: %s", path) } }() _, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data)))) if err != nil { - return sdkerrors.Wrapf(err, "write length prefix failed: %s", path) + return errors.Wrapf(err, "write length prefix failed: %s", path) } _, err = f.Write(data) if err != nil { - return sdkerrors.Wrapf(err, "write block data failed: %s", path) + return errors.Wrapf(err, "write block data failed: %s", path) } if fsync { err = f.Sync() if err != nil { - return sdkerrors.Wrapf(err, "fsync failed: %s", path) + return errors.Wrapf(err, "fsync failed: %s", path) } } return diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index 21ebf508f6a6..21c48b9a7196 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -15,15 +15,12 @@ import ( "github.com/tendermint/tendermint/libs/log" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - "github.com/cosmos/cosmos-sdk/codec" - codecTypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) var ( - interfaceRegistry = codecTypes.NewInterfaceRegistry() - testMarshaller = codec.NewProtoCodec(interfaceRegistry) + testMarshaller = types.NewTestCodec() testStreamingService *StreamingService testListener1, testListener2 types.WriteListener emptyContext = context.TODO() diff --git a/store/types/listening.go b/store/types/listening.go index 5f21689449fd..06d851503a57 100644 --- a/store/types/listening.go +++ b/store/types/listening.go @@ -2,8 +2,6 @@ package types import ( "io" - - "github.com/cosmos/cosmos-sdk/codec" ) // WriteListener interface for streaming data out from a listenkv.Store @@ -18,11 +16,11 @@ type WriteListener interface { // protobuf encoded StoreKVPairs to an underlying io.Writer type StoreKVPairWriteListener struct { writer io.Writer - marshaller codec.BinaryCodec + marshaller Codec } -// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec -func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener { +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and Marshaler interface +func NewStoreKVPairWriteListener(w io.Writer, m Codec) *StoreKVPairWriteListener { return &StoreKVPairWriteListener{ writer: w, marshaller: m, diff --git a/store/types/listening_test.go b/store/types/listening_test.go index af1362e4f938..dd1701571888 100644 --- a/store/types/listening_test.go +++ b/store/types/listening_test.go @@ -5,15 +5,11 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/cosmos/cosmos-sdk/codec" - "github.com/cosmos/cosmos-sdk/codec/types" ) func TestNewStoreKVPairWriteListener(t *testing.T) { testWriter := new(bytes.Buffer) - interfaceRegistry := types.NewInterfaceRegistry() - testMarshaller := codec.NewProtoCodec(interfaceRegistry) + testMarshaller := NewTestCodec() wl := NewStoreKVPairWriteListener(testWriter, testMarshaller) @@ -24,8 +20,7 @@ func TestNewStoreKVPairWriteListener(t *testing.T) { func TestOnWrite(t *testing.T) { testWriter := new(bytes.Buffer) - interfaceRegistry := types.NewInterfaceRegistry() - testMarshaller := codec.NewProtoCodec(interfaceRegistry) + testMarshaller := NewTestCodec() wl := NewStoreKVPairWriteListener(testWriter, testMarshaller) diff --git a/store/types/store.go b/store/types/store.go index 4cb7af09e949..0058d95537b9 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -7,9 +7,9 @@ import ( abci "github.com/tendermint/tendermint/abci/types" dbm "github.com/tendermint/tm-db" + "github.com/cosmos/cosmos-sdk/store/internal/kv" pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types" - "github.com/cosmos/cosmos-sdk/types/kv" ) type Store interface { diff --git a/store/types/utils.go b/store/types/utils.go index 510299df8469..a82a7058a267 100644 --- a/store/types/utils.go +++ b/store/types/utils.go @@ -1,9 +1,12 @@ package types import ( + "encoding/binary" "fmt" "sort" "strings" + + proto "github.com/cosmos/gogoproto/proto" ) // KVStorePrefixIterator iterates over all the keys with a certain prefix in ascending order @@ -62,3 +65,84 @@ func assertNoCommonPrefix(keys []string) { } } } + +// Codec defines a interface needed for the store package to marshal data +type Codec interface { + // Marshal returns binary encoding of v. + Marshal(proto.Message) ([]byte, error) + + // MarshalLengthPrefixed returns binary encoding of v with bytes length prefix. + MarshalLengthPrefixed(proto.Message) ([]byte, error) + + // Unmarshal parses the data encoded with Marshal method and stores the result + // in the value pointed to by v. + Unmarshal(bz []byte, ptr proto.Message) error + + // Unmarshal parses the data encoded with UnmarshalLengthPrefixed method and stores + // the result in the value pointed to by v. + UnmarshalLengthPrefixed(bz []byte, ptr proto.Message) error +} + +// ============= TestCodec ============= +// TestCodec defines a codec that utilizes Protobuf for both binary and JSON +// encoding. +type TestCodec struct{} + +var _ Codec = &TestCodec{} + +func NewTestCodec() Codec { + return &TestCodec{} +} + +// Marshal implements BinaryMarshaler.Marshal method. +// NOTE: this function must be used with a concrete type which +// implements proto.Message. For interface please use the codec.MarshalInterface +func (pc *TestCodec) Marshal(o proto.Message) ([]byte, error) { + // Size() check can catch the typed nil value. + if o == nil || proto.Size(o) == 0 { + // return empty bytes instead of nil, because nil has special meaning in places like store.Set + return []byte{}, nil + } + return proto.Marshal(o) +} + +// MarshalLengthPrefixed implements BinaryMarshaler.MarshalLengthPrefixed method. +func (pc *TestCodec) MarshalLengthPrefixed(o proto.Message) ([]byte, error) { + bz, err := pc.Marshal(o) + if err != nil { + return nil, err + } + + var sizeBuf [binary.MaxVarintLen64]byte + n := binary.PutUvarint(sizeBuf[:], uint64(len(bz))) + return append(sizeBuf[:n], bz...), nil +} + +// Unmarshal implements BinaryMarshaler.Unmarshal method. +// NOTE: this function must be used with a concrete type which +// implements proto.Message. For interface please use the codec.UnmarshalInterface +func (pc *TestCodec) Unmarshal(bz []byte, ptr proto.Message) error { + err := proto.Unmarshal(bz, ptr) + if err != nil { + return err + } + + return nil +} + +// UnmarshalLengthPrefixed implements BinaryMarshaler.UnmarshalLengthPrefixed method. +func (pc *TestCodec) UnmarshalLengthPrefixed(bz []byte, ptr proto.Message) error { + size, n := binary.Uvarint(bz) + if n < 0 { + return fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", n) + } + + if size > uint64(len(bz)-n) { + return fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-n) + } else if size < uint64(len(bz)-n) { + return fmt.Errorf("too many bytes to read; want: %v, got: %v", size, len(bz)-n) + } + + bz = bz[n:] + return proto.Unmarshal(bz, ptr) +}