Skip to content

Commit

Permalink
Merge pull request #285 from quorumcontrol/current-exchange-fix
Browse files Browse the repository at this point in the history
Fix current exchange for remote signers
  • Loading branch information
Brandon Westcott authored May 9, 2019
2 parents 917a54a + ff462ca commit b09588e
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 93 deletions.
25 changes: 10 additions & 15 deletions gossip3/actors/currentstateexchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ func NewCurrentStateExchangeProps(config *CurrentStateExchangeConfig) *actor.Pro

func (e *CurrentStateExchange) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *messages.DoCurrentStateExchange:
context.Request(msg.Destination, &messages.RequestCurrentStateSnapshot{})
case *messages.RequestCurrentStateSnapshot:
e.handleRequestCurrentStateSnapshot(context, msg)
case *messages.ReceiveCurrentStateSnapshot:
e.handleReceiveCurrentStateSnapshot(context, msg)
e.gzipImport(context, msg.Payload)
}
}

func (e *CurrentStateExchange) handleRequestCurrentStateSnapshot(context actor.Context, msg *messages.RequestCurrentStateSnapshot) {
if context.Sender() == nil {
panic("RequestCurrentStateSnapshot requires a Sender")
}

gzippedBytes := e.gzipExport()

if len(gzippedBytes) == 0 {
Expand All @@ -60,18 +66,7 @@ func (e *CurrentStateExchange) handleRequestCurrentStateSnapshot(context actor.C
Payload: gzippedBytes,
}

_, err := context.RequestFuture(extmsgs.FromActorPid(msg.Destination), payload, CURRENT_STATE_EXCHANGE_TIMEOUT).Result()
if err != nil {
panic(fmt.Sprintf("exchange with %v failed: %v", msg.Destination, err))
}
}

func (e *CurrentStateExchange) handleReceiveCurrentStateSnapshot(context actor.Context, msg *messages.ReceiveCurrentStateSnapshot) {
var responseBytes []byte
e.gzipImport(context, msg.Payload)
context.Respond(&messages.ReceiveCurrentStateSnapshot{
Payload: responseBytes,
})
context.Request(context.Sender(), payload)
}

func (e *CurrentStateExchange) gzipExport() []byte {
Expand Down Expand Up @@ -114,7 +109,7 @@ func (e *CurrentStateExchange) gzipImport(context actor.Context, payload []byte)
}
defer reader.Close()

e.Log.Debugw("gzipImport from %v started", context.Sender())
e.Log.Debugf("gzipImport from %v started", context.Sender())

bytesLeft := true

Expand Down Expand Up @@ -146,5 +141,5 @@ func (e *CurrentStateExchange) gzipImport(context actor.Context, payload []byte)
bytesLeft = buf.Len() > 0
}

e.Log.Debugw("gzipImport from %v processed %d keys", context.Sender(), wroteCount)
e.Log.Debugf("gzipImport from %v processed %d keys", context.Sender(), wroteCount)
}
6 changes: 1 addition & 5 deletions gossip3/actors/tupelo.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func (tn *TupeloNode) Receive(context actor.Context) {
tn.handleNewTransaction(context)
case *messages.RequestCurrentStateSnapshot:
context.Forward(tn.currentStateExchangeActor)
case *messages.ReceiveCurrentStateSnapshot:
context.Forward(tn.currentStateExchangeActor)
}
}

Expand Down Expand Up @@ -249,8 +247,6 @@ func (tn *TupeloNode) handleStarted(context actor.Context) {
}
}

context.Send(otherSigner, &messages.RequestCurrentStateSnapshot{
Destination: extmsgs.ToActorPid(tn.currentStateExchangeActor),
})
context.Send(tn.currentStateExchangeActor, &messages.DoCurrentStateExchange{Destination: otherSigner})
}
}
1 change: 0 additions & 1 deletion gossip3/messages/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func init() {
}

type RequestCurrentStateSnapshot struct {
Destination *extmsgs.ActorPID
}

func (RequestCurrentStateSnapshot) TypeCode() int8 {
Expand Down
80 changes: 8 additions & 72 deletions gossip3/messages/internal_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions gossip3/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package messages
import (
"fmt"

"github.com/AsynkronIT/protoactor-go/actor"
extmsgs "github.com/quorumcontrol/tupelo-go-sdk/gossip3/messages"
"github.com/quorumcontrol/tupelo-go-sdk/gossip3/types"
"github.com/quorumcontrol/tupelo-go-sdk/tracing"
Expand Down Expand Up @@ -84,3 +85,7 @@ type GzipImport struct {
type ImportCurrentState struct {
CurrentState *extmsgs.CurrentState
}

type DoCurrentStateExchange struct {
Destination *actor.PID
}

0 comments on commit b09588e

Please sign in to comment.