Skip to content

Commit

Permalink
Refactor the transport layer.
Browse files Browse the repository at this point in the history
1. Abstract the transport layer to facilitate users implementing their own transport mechanisms.
2. Provide an RPC-based transport implementation that adopts the MessagePack data protocol and supports mTLS.
  • Loading branch information
danl5 committed May 26, 2024
1 parent fae89f1 commit 6444858
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
21 changes: 17 additions & 4 deletions cmd/tool/visualize/visualize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,36 @@ package main
import (
"flag"
"fmt"
"log/slog"
"os"
"time"

"github.com/danl5/goelect/pkg/config"
"github.com/danl5/goelect/pkg/consensus"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/transport/rpc"
)

var (
outputPath = flag.String("o", "./fsm_visual", "output path")
)

func main() {
c, _ := consensus.NewConsensus(&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
}, nil, model.ElectNode{})
rpcTransport, _ := rpc.NewRPC(slog.Default())
c, _ := consensus.NewConsensus(
model.ElectNode{
Node: model.Node{
ID: "test",
Address: "test",
},
},
rpcTransport,
&rpc.Config{},
&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
},
nil)
visualStr := c.Visualize()

f, err := os.OpenFile(*outputPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
Expand Down
24 changes: 10 additions & 14 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ func TestConsensus_HeartBeat(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &RpcHandler{
Consensus: &Consensus{
termCache: tt.fields.termCache,
logger: tt.fields.logger,
eventChan: tt.fields.eventChan,
fsm: &fsm.FSM{},
},
c := &Consensus{
termCache: tt.fields.termCache,
logger: tt.fields.logger,
eventChan: tt.fields.eventChan,
fsm: &fsm.FSM{},
}
if err := c.HeartBeat(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr {
t.Errorf("HeartBeat() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -258,13 +256,11 @@ func TestConsensus_RequestVote(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &RpcHandler{
Consensus: &Consensus{
termCache: tt.fields.termCache,
logger: tt.fields.logger,
fsm: tt.fields.fsm,
eventChan: tt.fields.eventChan,
},
c := &Consensus{
termCache: tt.fields.termCache,
logger: tt.fields.logger,
fsm: tt.fields.fsm,
eventChan: tt.fields.eventChan,
}
if err := c.RequestVote(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr {
t.Errorf("RequestVote() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func (s *Server) Start(listenAddress string, handler model.CommandHandler, serve
return err
}

rpcHandler := &RPCHandler{
s.rpcHandler = &RPCHandler{
CmdHandler: handler,
}

err = s.startServer(listenAddress, rpcHandler, cfg)
err = s.startServer(listenAddress, s.rpcHandler, cfg)
if err != nil {
s.logger.Error("failed to start rpc server", "error", err.Error())
return err
Expand Down

0 comments on commit 6444858

Please sign in to comment.