diff --git a/gossip3/actors/currentstateexchange.go b/gossip3/actors/currentstateexchange.go index bba8d760..c1b8c3ab 100644 --- a/gossip3/actors/currentstateexchange.go +++ b/gossip3/actors/currentstateexchange.go @@ -42,14 +42,20 @@ func NewCurrentStateExchangeProps(config *CurrentStateExchangeConfig) *actor.Pro func (e *CurrentStateExchange) Receive(context actor.Context) { switch msg := context.Message().(type) { + case *messages.DoCurrentStateExchange: + context.Request(msg.Destination, &messages.RequestCurrentStateSnapshot{}) case *messages.RequestCurrentStateSnapshot: e.handleRequestCurrentStateSnapshot(context, msg) case *messages.ReceiveCurrentStateSnapshot: - e.handleReceiveCurrentStateSnapshot(context, msg) + e.gzipImport(context, msg.Payload) } } func (e *CurrentStateExchange) handleRequestCurrentStateSnapshot(context actor.Context, msg *messages.RequestCurrentStateSnapshot) { + if context.Sender() == nil { + panic("RequestCurrentStateSnapshot requires a Sender") + } + gzippedBytes := e.gzipExport() if len(gzippedBytes) == 0 { @@ -60,18 +66,7 @@ func (e *CurrentStateExchange) handleRequestCurrentStateSnapshot(context actor.C Payload: gzippedBytes, } - _, err := context.RequestFuture(extmsgs.FromActorPid(msg.Destination), payload, CURRENT_STATE_EXCHANGE_TIMEOUT).Result() - if err != nil { - panic(fmt.Sprintf("exchange with %v failed: %v", msg.Destination, err)) - } -} - -func (e *CurrentStateExchange) handleReceiveCurrentStateSnapshot(context actor.Context, msg *messages.ReceiveCurrentStateSnapshot) { - var responseBytes []byte - e.gzipImport(context, msg.Payload) - context.Respond(&messages.ReceiveCurrentStateSnapshot{ - Payload: responseBytes, - }) + context.Request(context.Sender(), payload) } func (e *CurrentStateExchange) gzipExport() []byte { @@ -114,7 +109,7 @@ func (e *CurrentStateExchange) gzipImport(context actor.Context, payload []byte) } defer reader.Close() - e.Log.Debugw("gzipImport from %v started", context.Sender()) + e.Log.Debugf("gzipImport from %v started", context.Sender()) bytesLeft := true @@ -146,5 +141,5 @@ func (e *CurrentStateExchange) gzipImport(context actor.Context, payload []byte) bytesLeft = buf.Len() > 0 } - e.Log.Debugw("gzipImport from %v processed %d keys", context.Sender(), wroteCount) + e.Log.Debugf("gzipImport from %v processed %d keys", context.Sender(), wroteCount) } diff --git a/gossip3/actors/tupelo.go b/gossip3/actors/tupelo.go index 01223ce5..c53373e7 100644 --- a/gossip3/actors/tupelo.go +++ b/gossip3/actors/tupelo.go @@ -70,8 +70,6 @@ func (tn *TupeloNode) Receive(context actor.Context) { tn.handleNewTransaction(context) case *messages.RequestCurrentStateSnapshot: context.Forward(tn.currentStateExchangeActor) - case *messages.ReceiveCurrentStateSnapshot: - context.Forward(tn.currentStateExchangeActor) } } @@ -249,8 +247,6 @@ func (tn *TupeloNode) handleStarted(context actor.Context) { } } - context.Send(otherSigner, &messages.RequestCurrentStateSnapshot{ - Destination: extmsgs.ToActorPid(tn.currentStateExchangeActor), - }) + context.Send(tn.currentStateExchangeActor, &messages.DoCurrentStateExchange{Destination: otherSigner}) } } diff --git a/gossip3/messages/internal.go b/gossip3/messages/internal.go index b67c7ef0..b77fe34b 100644 --- a/gossip3/messages/internal.go +++ b/gossip3/messages/internal.go @@ -12,7 +12,6 @@ func init() { } type RequestCurrentStateSnapshot struct { - Destination *extmsgs.ActorPID } func (RequestCurrentStateSnapshot) TypeCode() int8 { diff --git a/gossip3/messages/internal_gen.go b/gossip3/messages/internal_gen.go index 8b4b13ac..49dfeb0e 100644 --- a/gossip3/messages/internal_gen.go +++ b/gossip3/messages/internal_gen.go @@ -3,7 +3,6 @@ package messages // Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( - extmsgs "github.com/quorumcontrol/tupelo-go-sdk/gossip3/messages" "github.com/tinylib/msgp/msgp" ) @@ -128,24 +127,6 @@ func (z *RequestCurrentStateSnapshot) DecodeMsg(dc *msgp.Reader) (err error) { return } switch msgp.UnsafeString(field) { - case "Destination": - if dc.IsNil() { - err = dc.ReadNil() - if err != nil { - err = msgp.WrapError(err, "Destination") - return - } - z.Destination = nil - } else { - if z.Destination == nil { - z.Destination = new(extmsgs.ActorPID) - } - err = z.Destination.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Destination") - return - } - } default: err = dc.Skip() if err != nil { @@ -158,43 +139,20 @@ func (z *RequestCurrentStateSnapshot) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z *RequestCurrentStateSnapshot) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 1 - // write "Destination" - err = en.Append(0x81, 0xab, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e) +func (z RequestCurrentStateSnapshot) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) if err != nil { return } - if z.Destination == nil { - err = en.WriteNil() - if err != nil { - return - } - } else { - err = z.Destination.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Destination") - return - } - } return } // MarshalMsg implements msgp.Marshaler -func (z *RequestCurrentStateSnapshot) MarshalMsg(b []byte) (o []byte, err error) { +func (z RequestCurrentStateSnapshot) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 1 - // string "Destination" - o = append(o, 0x81, 0xab, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e) - if z.Destination == nil { - o = msgp.AppendNil(o) - } else { - o, err = z.Destination.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Destination") - return - } - } + // map header, size 0 + o = append(o, 0x80) return } @@ -216,23 +174,6 @@ func (z *RequestCurrentStateSnapshot) UnmarshalMsg(bts []byte) (o []byte, err er return } switch msgp.UnsafeString(field) { - case "Destination": - if msgp.IsNil(bts) { - bts, err = msgp.ReadNilBytes(bts) - if err != nil { - return - } - z.Destination = nil - } else { - if z.Destination == nil { - z.Destination = new(extmsgs.ActorPID) - } - bts, err = z.Destination.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Destination") - return - } - } default: bts, err = msgp.Skip(bts) if err != nil { @@ -246,12 +187,7 @@ func (z *RequestCurrentStateSnapshot) UnmarshalMsg(bts []byte) (o []byte, err er } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *RequestCurrentStateSnapshot) Msgsize() (s int) { - s = 1 + 12 - if z.Destination == nil { - s += msgp.NilSize - } else { - s += z.Destination.Msgsize() - } +func (z RequestCurrentStateSnapshot) Msgsize() (s int) { + s = 1 return } diff --git a/gossip3/messages/messages.go b/gossip3/messages/messages.go index 9495b71e..94ce02d7 100644 --- a/gossip3/messages/messages.go +++ b/gossip3/messages/messages.go @@ -3,6 +3,7 @@ package messages import ( "fmt" + "github.com/AsynkronIT/protoactor-go/actor" extmsgs "github.com/quorumcontrol/tupelo-go-sdk/gossip3/messages" "github.com/quorumcontrol/tupelo-go-sdk/gossip3/types" "github.com/quorumcontrol/tupelo-go-sdk/tracing" @@ -84,3 +85,7 @@ type GzipImport struct { type ImportCurrentState struct { CurrentState *extmsgs.CurrentState } + +type DoCurrentStateExchange struct { + Destination *actor.PID +}