Skip to content

Commit

Permalink
refactor: don't rely on sdk/codec in streaming (#14155)
Browse files Browse the repository at this point in the history
  • Loading branch information
tac0turtle authored Dec 9, 2022
1 parent 18b0fa0 commit f08ba9e
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 52 deletions.
7 changes: 2 additions & 5 deletions store/listenkv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"io"
"testing"

"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/prefix"
Expand All @@ -30,9 +28,8 @@ var kvPairs = []types.KVPair{
}

var (
testStoreKey = types.NewKVStoreKey("listen_test")
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
testStoreKey = types.NewKVStoreKey("listen_test")
testMarshaller = types.NewTestCodec()
)

func newListenKVStore(w io.Writer) *listenkv.Store {
Expand Down
13 changes: 5 additions & 8 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/iavl"
sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps"
Expand Down Expand Up @@ -646,12 +644,11 @@ func TestAddListenersAndListeningEnabled(t *testing.T) {
}

var (
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
testKey1 = []byte{1, 2, 3, 4, 5}
testValue1 = []byte{5, 4, 3, 2, 1}
testKey2 = []byte{2, 3, 4, 5, 6}
testValue2 = []byte{6, 5, 4, 3, 2}
testMarshaller = types.NewTestCodec()
testKey1 = []byte{1, 2, 3, 4, 5}
testValue1 = []byte{5, 4, 3, 2, 1}
testKey2 = []byte{2, 3, 4, 5, 6}
testValue2 = []byte{6, 5, 4, 3, 2}
)

func TestGetListenWrappedKVStore(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
Expand All @@ -20,7 +19,7 @@ import (
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec, log.Logger) (baseapp.StreamingService, error)
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewServiceConstructor(name string) (ServiceConstructor, error) {
func NewFileStreamingService(
opts serverTypes.AppOptions,
keys []types.StoreKey,
marshaller codec.BinaryCodec,
marshaller types.Codec,
logger log.Logger,
) (baseapp.StreamingService, error) {
homePath := cast.ToString(opts.Get(flags.FlagHome))
Expand Down Expand Up @@ -122,7 +121,7 @@ func NewFileStreamingService(
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec codec.BinaryCodec,
appCodec types.Codec,
logger log.Logger,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
Expand Down
14 changes: 5 additions & 9 deletions store/streaming/constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims"
"github.com/cosmos/cosmos-sdk/types/module/testutil"
)

type fakeOptions struct{}
Expand All @@ -29,10 +26,9 @@ func (f *fakeOptions) Get(key string) interface{} {
}

var (
mockOptions = new(fakeOptions)
mockKeys = []types.StoreKey{types.NewKVStoreKey("mockKey1"), types.NewKVStoreKey("mockKey2")}
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
mockOptions = new(fakeOptions)
mockKeys = []types.StoreKey{types.NewKVStoreKey("mockKey1"), types.NewKVStoreKey("mockKey2")}
testMarshaller = types.NewTestCodec()
)

func TestStreamingServiceConstructor(t *testing.T) {
Expand All @@ -56,7 +52,7 @@ func TestStreamingServiceConstructor(t *testing.T) {

func TestLoadStreamingServices(t *testing.T) {
db := dbm.NewMemDB()
encCdc := testutil.MakeTestEncodingConfig()
encCdc := types.NewTestCodec()
keys := types.NewKVStoreKeys("mockKey1", "mockKey2")
bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil)

Expand All @@ -82,7 +78,7 @@ func TestLoadStreamingServices(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc.Codec, log.NewNopLogger(), keys)
activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys)
require.NoError(t, err)
require.Equal(t, tc.activeStreamersLen, len(activeStreamers))
})
Expand Down
17 changes: 8 additions & 9 deletions store/streaming/file/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"

"cosmossdk.io/errors"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

var _ baseapp.StreamingService = &StreamingService{}
Expand All @@ -27,7 +26,7 @@ type StreamingService struct {
storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
codec types.Codec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
logger log.Logger

currentBlockNumber int64
Expand All @@ -42,7 +41,7 @@ type StreamingService struct {
}

// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) {
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c types.Codec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) {
// sort storeKeys for deterministic output
sort.SliceStable(storeKeys, func(i, j int) bool {
return storeKeys[i].Name() < storeKeys[j].Name()
Expand Down Expand Up @@ -191,26 +190,26 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) {
var f *os.File
f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return sdkerrors.Wrapf(err, "open file failed: %s", path)
return errors.Wrapf(err, "open file failed: %s", path)
}
defer func() {
// avoid overriding the real error with file close error
if err1 := f.Close(); err1 != nil && err == nil {
err = sdkerrors.Wrapf(err, "close file failed: %s", path)
err = errors.Wrapf(err, "close file failed: %s", path)
}
}()
_, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data))))
if err != nil {
return sdkerrors.Wrapf(err, "write length prefix failed: %s", path)
return errors.Wrapf(err, "write length prefix failed: %s", path)
}
_, err = f.Write(data)
if err != nil {
return sdkerrors.Wrapf(err, "write block data failed: %s", path)
return errors.Wrapf(err, "write block data failed: %s", path)
}
if fsync {
err = f.Sync()
if err != nil {
return sdkerrors.Wrapf(err, "fsync failed: %s", path)
return errors.Wrapf(err, "fsync failed: %s", path)
}
}
return
Expand Down
5 changes: 1 addition & 4 deletions store/streaming/file/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

var (
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
testMarshaller = types.NewTestCodec()
testStreamingService *StreamingService
testListener1, testListener2 types.WriteListener
emptyContext = context.TODO()
Expand Down
8 changes: 3 additions & 5 deletions store/types/listening.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package types

import (
"io"

"github.com/cosmos/cosmos-sdk/codec"
)

// WriteListener interface for streaming data out from a listenkv.Store
Expand All @@ -18,11 +16,11 @@ type WriteListener interface {
// protobuf encoded StoreKVPairs to an underlying io.Writer
type StoreKVPairWriteListener struct {
writer io.Writer
marshaller codec.BinaryCodec
marshaller Codec
}

// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec
func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener {
// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and Marshaler interface
func NewStoreKVPairWriteListener(w io.Writer, m Codec) *StoreKVPairWriteListener {
return &StoreKVPairWriteListener{
writer: w,
marshaller: m,
Expand Down
9 changes: 2 additions & 7 deletions store/types/listening_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/types"
)

func TestNewStoreKVPairWriteListener(t *testing.T) {
testWriter := new(bytes.Buffer)
interfaceRegistry := types.NewInterfaceRegistry()
testMarshaller := codec.NewProtoCodec(interfaceRegistry)
testMarshaller := NewTestCodec()

wl := NewStoreKVPairWriteListener(testWriter, testMarshaller)

Expand All @@ -24,8 +20,7 @@ func TestNewStoreKVPairWriteListener(t *testing.T) {

func TestOnWrite(t *testing.T) {
testWriter := new(bytes.Buffer)
interfaceRegistry := types.NewInterfaceRegistry()
testMarshaller := codec.NewProtoCodec(interfaceRegistry)
testMarshaller := NewTestCodec()

wl := NewStoreKVPairWriteListener(testWriter, testMarshaller)

Expand Down
2 changes: 1 addition & 1 deletion store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/internal/kv"
pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types"
snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types"
"github.com/cosmos/cosmos-sdk/types/kv"
)

type Store interface {
Expand Down
84 changes: 84 additions & 0 deletions store/types/utils.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package types

import (
"encoding/binary"
"fmt"
"sort"
"strings"

proto "github.com/cosmos/gogoproto/proto"
)

// KVStorePrefixIterator iterates over all the keys with a certain prefix in ascending order
Expand Down Expand Up @@ -62,3 +65,84 @@ func assertNoCommonPrefix(keys []string) {
}
}
}

// Codec defines a interface needed for the store package to marshal data
type Codec interface {
// Marshal returns binary encoding of v.
Marshal(proto.Message) ([]byte, error)

// MarshalLengthPrefixed returns binary encoding of v with bytes length prefix.
MarshalLengthPrefixed(proto.Message) ([]byte, error)

// Unmarshal parses the data encoded with Marshal method and stores the result
// in the value pointed to by v.
Unmarshal(bz []byte, ptr proto.Message) error

// Unmarshal parses the data encoded with UnmarshalLengthPrefixed method and stores
// the result in the value pointed to by v.
UnmarshalLengthPrefixed(bz []byte, ptr proto.Message) error
}

// ============= TestCodec =============
// TestCodec defines a codec that utilizes Protobuf for both binary and JSON
// encoding.
type TestCodec struct{}

var _ Codec = &TestCodec{}

func NewTestCodec() Codec {
return &TestCodec{}
}

// Marshal implements BinaryMarshaler.Marshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.MarshalInterface
func (pc *TestCodec) Marshal(o proto.Message) ([]byte, error) {
// Size() check can catch the typed nil value.
if o == nil || proto.Size(o) == 0 {
// return empty bytes instead of nil, because nil has special meaning in places like store.Set
return []byte{}, nil
}
return proto.Marshal(o)
}

// MarshalLengthPrefixed implements BinaryMarshaler.MarshalLengthPrefixed method.
func (pc *TestCodec) MarshalLengthPrefixed(o proto.Message) ([]byte, error) {
bz, err := pc.Marshal(o)
if err != nil {
return nil, err
}

var sizeBuf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(sizeBuf[:], uint64(len(bz)))
return append(sizeBuf[:n], bz...), nil
}

// Unmarshal implements BinaryMarshaler.Unmarshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.UnmarshalInterface
func (pc *TestCodec) Unmarshal(bz []byte, ptr proto.Message) error {
err := proto.Unmarshal(bz, ptr)
if err != nil {
return err
}

return nil
}

// UnmarshalLengthPrefixed implements BinaryMarshaler.UnmarshalLengthPrefixed method.
func (pc *TestCodec) UnmarshalLengthPrefixed(bz []byte, ptr proto.Message) error {
size, n := binary.Uvarint(bz)
if n < 0 {
return fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", n)
}

if size > uint64(len(bz)-n) {
return fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-n)
} else if size < uint64(len(bz)-n) {
return fmt.Errorf("too many bytes to read; want: %v, got: %v", size, len(bz)-n)
}

bz = bz[n:]
return proto.Unmarshal(bz, ptr)
}

0 comments on commit f08ba9e

Please sign in to comment.