Skip to content

Commit

Permalink
fix: merge and cherry picked fixes from main (#290)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Tsitrin <114929630+mtsitrin@users.noreply.github.com>
  • Loading branch information
omritoptix and mtsitrin authored Mar 20, 2023
1 parent 458d509 commit 49e75bb
Show file tree
Hide file tree
Showing 18 changed files with 226 additions and 67 deletions.
9 changes: 7 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
return err
}

// Update the state with the new app hash from the commit.
// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
Expand All @@ -499,8 +502,10 @@ func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bo
if uint64(proxyAppInfo.LastBlockHeight) == block.Header.Height {
isRequired = true
m.logger.Info("Skipping block application and only updating store height and state hash", "height", block.Header.Height)
// update the state with the hash
// update the state with the hash, last store height and last validators.
m.lastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.lastState.LastStoreHeight = block.Header.Height
m.lastState.LastValidators = m.lastState.Validators.Copy()
_, err := m.store.UpdateState(m.lastState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ require (
replace (
github.com/cosmos/ibc-go/v3 => github.com/dymensionxyz/ibc-go/v3 v3.0.0-rc2.0.20230105134315-1870174ab6da
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4
github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1
google.golang.org/grpc => google.golang.org/grpc v1.33.2

)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ github.com/dymensionxyz/dymension v0.1.0-alpha.0.20230110174626-23b70dccd1e5 h1:
github.com/dymensionxyz/dymension v0.1.0-alpha.0.20230110174626-23b70dccd1e5/go.mod h1:C5nQlAlDqVtNigjQVNLSL4SuC3gUVoRhqAsfYZZcDgs=
github.com/dymensionxyz/ibc-go/v3 v3.0.0-rc2.0.20230105134315-1870174ab6da h1:driSbrBc4deT0iRjDKjA4royFDsUsDcE0IgFkFxhopo=
github.com/dymensionxyz/ibc-go/v3 v3.0.0-rc2.0.20230105134315-1870174ab6da/go.mod h1:VwB/vWu4ysT5DN2aF78d17LYmx3omSAdq6gpKvM7XRA=
github.com/dymensionxyz/rpc v1.3.1 h1:7EXWIobaBes5zldRvTIg7TmNsEKjicrWA/OjCc0NaGs=
github.com/dymensionxyz/rpc v1.3.1/go.mod h1:f+WpX8ysy8wt95iGc6auYlHcnHj2bUkhiRVkkKNys8c=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand Down Expand Up @@ -466,8 +468,6 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand Down
2 changes: 2 additions & 0 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ message State {
bytes last_results_hash = 14;

bytes app_hash = 15;

uint64 last_store_height = 16 [(gogoproto.customname) = "LastStoreHeight"];
}
4 changes: 2 additions & 2 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func validatePage(pagePtr *int, perPage, totalCount int) (int, error) {
panic(fmt.Sprintf("zero or negative perPage: %d", perPage))
}

if pagePtr == nil { // no page parameter
if pagePtr == nil || *pagePtr <= 0 { // no page parameter
return 1, nil
}

Expand All @@ -902,7 +902,7 @@ func validatePage(pagePtr *int, perPage, totalCount int) (int, error) {
pages = 1 // one page (even if it's empty)
}
page := *pagePtr
if page <= 0 || page > pages {
if page > pages {
return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page)
}

Expand Down
6 changes: 6 additions & 0 deletions rpc/json/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (h *handler) serveJSONRPCforWS(w http.ResponseWriter, r *http.Request, wsCo
}
if methodSpec.ws {
callArgs = append(callArgs, reflect.ValueOf(wsConn))
rpcID, err := codecReq.ID()
if err != nil {
codecReq.WriteError(w, http.StatusBadRequest, err)
return
}
callArgs = append(callArgs, reflect.ValueOf(rpcID))
}
rets := methodSpec.m.Call(callArgs)

Expand Down
29 changes: 21 additions & 8 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"time"

"github.com/gorilla/rpc/v2/json2"
"github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"

"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/rpc/client"
Expand All @@ -37,7 +39,7 @@ func newMethod(m interface{}) *method {
m: reflect.ValueOf(m),
argsType: mType.In(1).Elem(),
returnType: mType.Out(0).Elem(),
ws: mType.NumIn() == 3,
ws: mType.NumIn() == 4,
}
}

Expand Down Expand Up @@ -86,7 +88,7 @@ func newService(c *client.Client, l log.Logger) *service {
return &s
}

func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) {
func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn, subscriptionID []byte) (*ctypes.ResultSubscribe, error) {
addr := req.RemoteAddr

// TODO(tzdybal): pass config and check subscriptions limits
Expand All @@ -108,18 +110,29 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}

go func() {
go func(subscriptionID []byte) {
for {
select {
case msg := <-sub.Out():
data, err := json.Marshal(msg.Data())
// build the base response
resultEvent := &ctypes.ResultEvent{Query: args.Query, Data: msg.Data(), Events: msg.Events()}
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), resultEvent)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), resultEvent)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("failed to marshal response data", "error", err)
s.logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- data
wsConn.queue <- jsonBytes
}
case <-sub.Cancelled():
if sub.Err() != pubsub.ErrUnsubscribed {
Expand All @@ -134,7 +147,7 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return
}
}
}()
}(subscriptionID)

return &ctypes.ResultSubscribe{}, nil
}
Expand Down
15 changes: 14 additions & 1 deletion rpc/json/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package json

import (
"bytes"
"context"
"io"
"net/http"

"github.com/gorilla/websocket"

"github.com/dymensionxyz/dymint/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)

type wsConn struct {
Expand Down Expand Up @@ -38,6 +40,9 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

wsc, err := upgrader.Upgrade(w, r, nil)
Expand All @@ -63,7 +68,15 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) {
for {
mt, r, err := wsc.NextReader()
if err != nil {
h.logger.Error("failed to read next WebSocket message", "error", err)
if _, ok := err.(*websocket.CloseError); ok {
h.logger.Debug("WebSocket connection closed")
err := h.srv.client.EventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
h.logger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
} else {
h.logger.Error("failed to read next WebSocket message", "error", err)
}
break
}

Expand Down
80 changes: 72 additions & 8 deletions rpc/json/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)

type NestedRPCResponse struct {
Query string `json:"query"`
Data struct {
Type string `json:"type"`
Value string `json:"value"`
} `json:"data"`
}

func TestWebSockets(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down Expand Up @@ -49,7 +57,7 @@ func TestWebSockets(t *testing.T) {
`))
assert.NoError(err)

err = conn.SetReadDeadline(time.Now().Add(100 * time.Second))
err = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
assert.NoError(err)
typ, msg, err := conn.ReadMessage()
assert.NoError(err)
Expand All @@ -63,13 +71,25 @@ func TestWebSockets(t *testing.T) {
assert.NoError(err)
assert.Equal(websocket.TextMessage, typ)
assert.NotEmpty(msg)
var payload tmtypes.EventDataNewBlock
err = json.Unmarshal(msg, &payload)
var responsePayload rpctypes.RPCResponse
err = json.Unmarshal(msg, &responsePayload)
assert.NoError(err)
assert.NotNil(payload.ResultBeginBlock)
assert.NotNil(payload.Block)
assert.GreaterOrEqual(payload.Block.Height, int64(1))
assert.NotNil(payload.ResultEndBlock)
assert.Equal(rpctypes.JSONRPCIntID(7), responsePayload.ID)
var m map[string]interface{}
err = json.Unmarshal([]byte(responsePayload.Result), &m)
require.NoError(err)

// TODO(omritoptix): json unmarshalling of the dataPayload fails as dataPayload was encoded with amino and not json (and as such encodes 64bit numbers as strings).
// we need to unmarshal using the tendermint json library for it to populate the dataPayload correctly. Currently skipping this part of the test.
// valueField := m["data"].(map[string]interface{})["value"]
// valueJSON, err := json.Marshal(valueField)
// var dataPayload tmtypes.EventDataNewBlock
// err = tmjson.Unmarshal(valueJSON, &dataPayload)
// require.NoError(err)
// assert.NotNil(dataPayload.ResultBeginBlock)
// assert.NotNil(dataPayload.Block)
// assert.GreaterOrEqual(dataPayload.Block.Height, int64(1))
// assert.NotNil(dataPayload.ResultEndBlock)

unsubscribeAllReq, err := json2.EncodeClientRequest("unsubscribe_all", &unsubscribeAllArgs{})
require.NoError(err)
Expand All @@ -83,3 +103,47 @@ func TestWebSockets(t *testing.T) {
assert.NoError(json.Unmarshal(rsp.Body.Bytes(), &jsonResp))
assert.Nil(jsonResp.Error)
}

// Test that when a websocket connection is closed, the corresponding
// subscription is also removed.
func TestWebsocketCloseUnsubscribe(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

_, local := getRPC(t)
handler, err := GetHTTPHandler(local, log.TestingLogger())
require.NoError(err)

srv := httptest.NewServer(handler)

conn, resp, err := websocket.DefaultDialer.Dial(strings.Replace(srv.URL, "http://", "ws://", 1)+"/websocket", nil)
require.NoError(err)
require.NotNil(resp)
require.NotNil(conn)
assert.Equal(http.StatusSwitchingProtocols, resp.StatusCode)
subscribed_clients := local.EventBus.NumClients()

err = conn.WriteMessage(websocket.TextMessage, []byte(`
{
"jsonrpc": "2.0",
"method": "subscribe",
"id": 7,
"params": {
"query": "tm.event='NewBlock'"
}
}
`))
assert.NoError(err)
// Vaildate we have a new client
assert.Eventually(func() bool {
return subscribed_clients+1 == local.EventBus.NumClients()
}, 3*time.Second, 100*time.Millisecond)
// disconnect websocket
err = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second))
assert.NoError(err)
// Validate we have one less client
assert.Eventually(func() bool {
return subscribed_clients == local.EventBus.NumClients()
}, 3*time.Second, 100*time.Millisecond)

}
2 changes: 1 addition & 1 deletion settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (d *HubClient) eventHandler() {
panic("Settlement WS disconnected")
case event := <-eventsChannel:
// Assert value is in map and publish it to the event bus
d.logger.Debug("Received event from settlement layer", "event", event)
d.logger.Debug("Received event from settlement layer")
_, ok := d.eventMap[event.Query]
if !ok {
d.logger.Debug("Ignoring event. Type not supported", "event", event)
Expand Down
8 changes: 5 additions & 3 deletions state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ func (e *BlockExecutor) updateState(state types.State, block *types.Block, abciR
},
NextValidators: nValSet,
Validators: state.NextValidators.Copy(),
LastValidators: state.Validators.Copy(),
LastHeightValidatorsChanged: lastHeightValSetChanged,
ConsensusParams: state.ConsensusParams,
LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged,
AppHash: state.AppHash,
// We're gonna update those fields only after we commit the blocks
AppHash: state.AppHash,
LastValidators: state.LastValidators.Copy(),
LastStoreHeight: state.LastStoreHeight,
}
copy(s.LastResultsHash[:], tmtypes.NewResults(abciResponses.DeliverTxs).Hash())

Expand Down Expand Up @@ -262,7 +264,7 @@ func (e *BlockExecutor) validateBlock(state types.State, block *types.Block) err
if state.LastBlockHeight <= 0 && block.Header.Height != uint64(state.InitialHeight) {
return errors.New("initial block height mismatch")
}
if state.LastBlockHeight > 0 && block.Header.Height != uint64(state.LastBlockHeight)+1 {
if state.LastBlockHeight > 0 && block.Header.Height != uint64(state.LastStoreHeight)+1 {
return errors.New("block height mismatch")
}
if !bytes.Equal(block.Header.AppHash[:], state.AppHash[:]) {
Expand Down
1 change: 1 addition & 0 deletions state/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestApplyBlock(t *testing.T) {
err = executor.Commit(context.Background(), &newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)

// Create another block with multiple Tx from mempool
require.NoError(mpool.CheckTx([]byte{0, 1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{}))
Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *DefaultStore) LoadState() (types.State, error) {

var state types.State
err = state.FromProto(&pbState)
atomic.StoreUint64(&s.height, uint64(state.LastBlockHeight))
atomic.StoreUint64(&s.height, state.LastStoreHeight)
return state, err
}

Expand Down
1 change: 1 addition & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestRestart(t *testing.T) {
expectedHeight := uint64(10)
_, err := s1.UpdateState(types.State{
LastBlockHeight: int64(expectedHeight),
LastStoreHeight: uint64(expectedHeight),
NextValidators: validatorSet,
Validators: validatorSet,
LastValidators: validatorSet,
Expand Down
Loading

0 comments on commit 49e75bb

Please sign in to comment.