Skip to content

Commit

Permalink
fix!: e2e tests are flaky due to slow app state processing (#745)
Browse files Browse the repository at this point in the history
* test(e2e): fix flaky TestApp_TxTooBig

* fix(kvstore): json encoding consumes too much memory

* fix(e2e): dashcore fails

* chore: initial app state changed to string

* fix(kvstore): escape keys/values properly in json

* fix: genesis test

* test(statesync): limit backfill test time to 15s

* chore(kvstore): snapshot chunks streaming reader

* chore: self review minor changes

* test(statesync): revert timeout change in backfill
  • Loading branch information
lklimek authored Mar 5, 2024
1 parent d5a1043 commit 420f7a3
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 186 deletions.
25 changes: 16 additions & 9 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package kvstore
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"path"
"strconv"
"time"
Expand Down Expand Up @@ -228,7 +228,12 @@ func newApplication(stateStore StoreFactory, opts ...OptFunc) (*Application, err
defer in.Close()

if err := app.LastCommittedState.Load(in); err != nil {
return nil, fmt.Errorf("load state: %w", err)
// EOF means we most likely don't have any state yet
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("load state: %w", err)
} else {
app.logger.Debug("no state found, using initial state")
}
}

app.snapshots, err = NewSnapshotStore(path.Join(app.cfg.Dir, "snapshots"))
Expand Down Expand Up @@ -264,9 +269,9 @@ func (app *Application) InitChain(_ context.Context, req *abci.RequestInitChain)
}

// Overwrite state based on AppStateBytes
// Note this is not optimal from memory perspective; use chunked state sync instead
if len(req.AppStateBytes) > 0 {
err := json.Unmarshal(req.AppStateBytes, &app.LastCommittedState)
if err != nil {
if err := app.LastCommittedState.Load(bytes.NewBuffer(req.AppStateBytes)); err != nil {
return &abci.ResponseInitChain{}, err
}
}
Expand Down Expand Up @@ -398,7 +403,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *abci.RequestFinali
appHash := tmbytes.HexBytes(req.Block.Header.AppHash)
roundState, ok := app.roundStates[roundKey(appHash, req.Height, req.Round)]
if !ok {
return &abci.ResponseFinalizeBlock{}, fmt.Errorf("state with apphash %s not found", appHash)
return &abci.ResponseFinalizeBlock{}, fmt.Errorf("state with apphash %s at height %d round %d not found",
appHash, req.Height, req.Round)
}
if roundState.GetHeight() != req.Height {
return &abci.ResponseFinalizeBlock{},
Expand Down Expand Up @@ -530,14 +536,15 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
}

if app.offerSnapshot.isFull() {
chunks := app.offerSnapshot.bytes()
err := json.Unmarshal(chunks, &app.LastCommittedState)
if err != nil {
chunks := app.offerSnapshot.reader()
defer chunks.Close()

if err := app.LastCommittedState.Load(chunks); err != nil {
return &abci.ResponseApplySnapshotChunk{}, fmt.Errorf("cannot unmarshal state: %w", err)
}

app.logger.Info("restored state snapshot",
"height", app.LastCommittedState.GetHeight(),
"json", string(chunks),
"apphash", app.LastCommittedState.GetAppHash(),
"snapshot_height", app.offerSnapshot.snapshot.Height,
"snapshot_apphash", app.offerSnapshot.appHash,
Expand Down
3 changes: 2 additions & 1 deletion abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestPersistentKVStoreKV(t *testing.T) {

data, err := os.ReadFile(path.Join(dir, "state.json"))
require.NoError(t, err)
assert.Contains(t, string(data), fmt.Sprintf(`"%s":"%s"`, key, value))

assert.Contains(t, string(data), fmt.Sprintf(`"key":"%s","value":"%s"`, key, value))
}

func TestPersistentKVStoreInfo(t *testing.T) {
Expand Down
66 changes: 58 additions & 8 deletions abci/example/kvstore/snapshots.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//nolint:gosec
package kvstore

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -97,20 +98,31 @@ func (s *SnapshotStore) Create(state State) (abci.Snapshot, error) {
s.Lock()
defer s.Unlock()

bz, err := json.Marshal(state)
height := state.GetHeight()

filename := filepath.Join(s.dir, fmt.Sprintf("%v.json", height))
f, err := os.Create(filename)
if err != nil {
return abci.Snapshot{}, err
}
height := state.GetHeight()
defer f.Close()

hasher := sha256.New()
writer := io.MultiWriter(f, hasher)

if err := state.Save(writer); err != nil {
f.Close()
// Cleanup incomplete file; ignore errors during cleanup
_ = os.Remove(filename)
return abci.Snapshot{}, err
}

snapshot := abci.Snapshot{
Height: uint64(height),
Version: 1,
Hash: crypto.Checksum(bz),
}
err = os.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)), bz, 0644)
if err != nil {
return abci.Snapshot{}, err
Hash: hasher.Sum(nil),
}

s.metadata = append(s.metadata, snapshot)
err = s.saveMetadata()
if err != nil {
Expand Down Expand Up @@ -216,6 +228,44 @@ func (s *offerSnapshot) bytes() []byte {
return buf.Bytes()
}

// reader returns a reader for the snapshot data.
func (s *offerSnapshot) reader() io.ReadCloser {
chunks := s.chunks.Values()
reader := &chunkedReader{chunks: chunks}

return reader
}

type chunkedReader struct {
chunks [][]byte
index int
offset int
}

func (r *chunkedReader) Read(p []byte) (n int, err error) {
if r.chunks == nil {
return 0, io.EOF
}
for n < len(p) && r.index < len(r.chunks) {
copyCount := copy(p[n:], r.chunks[r.index][r.offset:])
n += copyCount
r.offset += copyCount
if r.offset >= len(r.chunks[r.index]) {
r.index++
r.offset = 0
}
}
if r.index >= len(r.chunks) {
err = io.EOF
}
return
}

func (r *chunkedReader) Close() error {
r.chunks = nil
return nil
}

// makeChunkItem returns the chunk at a given index from the full byte slice.
func makeChunkItem(chunks *ds.OrderedMap[string, []byte], chunkID []byte) chunkItem {
chunkIDStr := hex.EncodeToString(chunkID)
Expand Down
150 changes: 65 additions & 85 deletions abci/example/kvstore/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net/url"

dbm "github.com/tendermint/tm-db"

Expand All @@ -19,8 +20,6 @@ import (
// Caller of State methods should do proper concurrency locking (eg. mutexes)
type State interface {
dbm.DB
json.Marshaler
json.Unmarshaler

// Save writes full content of this state to some output
Save(output io.Writer) error
Expand Down Expand Up @@ -50,7 +49,7 @@ type State interface {
}

type kvState struct {
dbm.DB
dbm.DB `json:"-"`
// Height of the state. Special value of 0 means zero state.
Height int64 `json:"height"`
InitialHeight int64 `json:"initial_height,omitempty"`
Expand Down Expand Up @@ -182,98 +181,99 @@ func (state *kvState) UpdateAppHash(lastCommittedState State, _txs types1.Txs, t
return nil
}

// Load state from the reader.
// It expects json-encoded kvState, followed by all items from the state.
//
// As a special case, io.EOF when reading the header means that the state is empty.
func (state *kvState) Load(from io.Reader) error {
if state == nil || state.DB == nil {
return errors.New("cannot load into nil state")
}

stateBytes, err := io.ReadAll(from)
if err != nil {
return fmt.Errorf("kvState read: %w", err)
// We reuse DB as we can use atomic batches to load items.
newState := NewKvState(state.DB, state.InitialHeight).(*kvState)

decoder := json.NewDecoder(from)
if err := decoder.Decode(&newState); err != nil {
return fmt.Errorf("error reading state header: %w", err)
}
if len(stateBytes) == 0 {
return nil // NOOP

// Load items to state DB
batch := newState.DB.NewBatch()
defer batch.Close()

if err := resetDB(newState.DB, batch); err != nil {
return err
}

err = json.Unmarshal(stateBytes, &state)
if err != nil {
return fmt.Errorf("kvState unmarshal: %w", err)
item := exportItem{}
var err error
for err = decoder.Decode(&item); err == nil; err = decoder.Decode(&item) {
key, err := url.QueryUnescape(item.Key)
if err != nil {
return fmt.Errorf("error restoring state item key %+v: %w", item, err)
}
value, err := url.QueryUnescape(item.Value)
if err != nil {
return fmt.Errorf("error restoring state item value %+v: %w", item, err)
}

if err := batch.Set([]byte(key), []byte(value)); err != nil {
return fmt.Errorf("error restoring state item %+v: %w", item, err)
}
}

return nil
}
if !errors.Is(err, io.EOF) {
return err
}

func (state kvState) Save(to io.Writer) error {
stateBytes, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("kvState marshal: %w", err)
// commit changes
if err := batch.Write(); err != nil {
return fmt.Errorf("error finalizing restore batch: %w", err)
}

_, err = to.Write(stateBytes)
if err != nil {
return fmt.Errorf("kvState write: %w", err)
// copy loaded values to the state
state.InitialHeight = newState.InitialHeight
state.Height = newState.Height
state.Round = newState.Round
state.AppHash = newState.AppHash
// apphash cannot be nil,zero-length
if len(state.AppHash) == 0 {
state.AppHash = make(tmbytes.HexBytes, crypto.DefaultAppHashSize)
}

return nil
}

type StateExport struct {
Height *int64 `json:"height,omitempty"`
InitialHeight *int64 `json:"initial_height,omitempty"`
AppHash tmbytes.HexBytes `json:"app_hash,omitempty"`
Items map[string]string `json:"items,omitempty"` // we store items as string-encoded values
}
// Save saves state to the writer.
// First it puts json-encoded kvState, followed by all items from the state.
func (state kvState) Save(to io.Writer) error {
encoder := json.NewEncoder(to)
if err := encoder.Encode(state); err != nil {
return fmt.Errorf("kvState marshal: %w", err)
}

// MarshalJSON implements json.Marshaler
func (state kvState) MarshalJSON() ([]byte, error) {
iter, err := state.DB.Iterator(nil, nil)
if err != nil {
return nil, err
return fmt.Errorf("error creating state iterator: %w", err)
}
defer iter.Close()

height := state.Height
initialHeight := state.InitialHeight
apphash := state.GetAppHash()

export := StateExport{
Height: &height,
InitialHeight: &initialHeight,
AppHash: apphash,
Items: nil,
}

for ; iter.Valid(); iter.Next() {
if export.Items == nil {
export.Items = map[string]string{}
key := url.QueryEscape(string(iter.Key()))
value := url.QueryEscape(string(iter.Value()))
item := exportItem{Key: key, Value: value}
if err := encoder.Encode(item); err != nil {
return fmt.Errorf("error encoding state item %+v: %w", item, err)
}
export.Items[string(iter.Key())] = string(iter.Value())
}

return json.Marshal(&export)
return nil
}

// UnmarshalJSON implements json.Unmarshaler.
// Note that it unmarshals only existing (non-nil) values.
// If unmarshaled data contains a nil value (eg. is not present in json), these will stay intact.
func (state *kvState) UnmarshalJSON(data []byte) error {

export := StateExport{}
if err := json.Unmarshal(data, &export); err != nil {
return err
}

if export.Height != nil {
state.Height = *export.Height
}
if export.InitialHeight != nil {
state.InitialHeight = *export.InitialHeight
}
if export.AppHash != nil {
state.AppHash = export.AppHash
}

return state.persistItems(export.Items)
type exportItem struct {
Key string `json:"key"`
Value string `json:"value"`
}

func (state *kvState) Close() error {
Expand All @@ -282,23 +282,3 @@ func (state *kvState) Close() error {
}
return nil
}

func (state *kvState) persistItems(items map[string]string) error {
if items == nil {
return nil
}
batch := state.DB.NewBatch()
defer batch.Close()

if len(items) > 0 {
if err := resetDB(state.DB, batch); err != nil {
return err
}
for key, value := range items {
if err := batch.Set([]byte(key), []byte(value)); err != nil {
return err
}
}
}
return batch.Write()
}
Loading

0 comments on commit 420f7a3

Please sign in to comment.