From 6444858da2f0383a95d250d26df369de5ea411bb Mon Sep 17 00:00:00 2001 From: danl5 Date: Sun, 26 May 2024 18:33:42 +0800 Subject: [PATCH] Refactor the transport layer. 1. Abstract the transport layer to facilitate users implementing their own transport mechanisms. 2. Provide an RPC-based transport implementation that adopts the MessagePack data protocol and supports mTLS. --- cmd/tool/visualize/visualize.go | 21 +++++++++++++++++---- pkg/consensus/consensus_test.go | 24 ++++++++++-------------- pkg/transport/rpc/rpc.go | 4 ++-- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/cmd/tool/visualize/visualize.go b/cmd/tool/visualize/visualize.go index 2bf106d..cee3b28 100644 --- a/cmd/tool/visualize/visualize.go +++ b/cmd/tool/visualize/visualize.go @@ -3,12 +3,14 @@ package main import ( "flag" "fmt" + "log/slog" "os" "time" "github.com/danl5/goelect/pkg/config" "github.com/danl5/goelect/pkg/consensus" "github.com/danl5/goelect/pkg/model" + "github.com/danl5/goelect/pkg/transport/rpc" ) var ( @@ -16,10 +18,21 @@ var ( ) func main() { - c, _ := consensus.NewConsensus(&config.Config{ - ConnectTimeout: 10 * time.Second, - Peers: []config.NodeConfig{}, - }, nil, model.ElectNode{}) + rpcTransport, _ := rpc.NewRPC(slog.Default()) + c, _ := consensus.NewConsensus( + model.ElectNode{ + Node: model.Node{ + ID: "test", + Address: "test", + }, + }, + rpcTransport, + &rpc.Config{}, + &config.Config{ + ConnectTimeout: 10 * time.Second, + Peers: []config.NodeConfig{}, + }, + nil) visualStr := c.Visualize() f, err := os.OpenFile(*outputPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index bc10632..1afdcc5 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -74,13 +74,11 @@ func TestConsensus_HeartBeat(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := &RpcHandler{ - Consensus: &Consensus{ - termCache: tt.fields.termCache, - logger: tt.fields.logger, - eventChan: tt.fields.eventChan, - fsm: &fsm.FSM{}, - }, + c := &Consensus{ + termCache: tt.fields.termCache, + logger: tt.fields.logger, + eventChan: tt.fields.eventChan, + fsm: &fsm.FSM{}, } if err := c.HeartBeat(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr { t.Errorf("HeartBeat() error = %v, wantErr %v", err, tt.wantErr) @@ -258,13 +256,11 @@ func TestConsensus_RequestVote(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := &RpcHandler{ - Consensus: &Consensus{ - termCache: tt.fields.termCache, - logger: tt.fields.logger, - fsm: tt.fields.fsm, - eventChan: tt.fields.eventChan, - }, + c := &Consensus{ + termCache: tt.fields.termCache, + logger: tt.fields.logger, + fsm: tt.fields.fsm, + eventChan: tt.fields.eventChan, } if err := c.RequestVote(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr { t.Errorf("RequestVote() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/transport/rpc/rpc.go b/pkg/transport/rpc/rpc.go index d4d3b21..dc95f9a 100644 --- a/pkg/transport/rpc/rpc.go +++ b/pkg/transport/rpc/rpc.go @@ -117,11 +117,11 @@ func (s *Server) Start(listenAddress string, handler model.CommandHandler, serve return err } - rpcHandler := &RPCHandler{ + s.rpcHandler = &RPCHandler{ CmdHandler: handler, } - err = s.startServer(listenAddress, rpcHandler, cfg) + err = s.startServer(listenAddress, s.rpcHandler, cfg) if err != nil { s.logger.Error("failed to start rpc server", "error", err.Error()) return err