Skip to content

Commit

Permalink
Merge pull request #23 from danl5/refactor_rpc
Browse files Browse the repository at this point in the history
Refactor the transport layer.
  • Loading branch information
danl5 authored May 29, 2024
2 parents 9ee37cb + 6444858 commit 0fbb951
Show file tree
Hide file tree
Showing 17 changed files with 1,007 additions and 531 deletions.
21 changes: 17 additions & 4 deletions cmd/tool/visualize/visualize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,36 @@ package main
import (
"flag"
"fmt"
"log/slog"
"os"
"time"

"github.com/danl5/goelect/pkg/config"
"github.com/danl5/goelect/pkg/consensus"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/transport/rpc"
)

var (
outputPath = flag.String("o", "./fsm_visual", "output path")
)

func main() {
c, _ := consensus.NewConsensus(&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
}, nil, model.ElectNode{})
rpcTransport, _ := rpc.NewRPC(slog.Default())
c, _ := consensus.NewConsensus(
model.ElectNode{
Node: model.Node{
ID: "test",
Address: "test",
},
},
rpcTransport,
&rpc.Config{},
&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
},
nil)
visualStr := c.Visualize()

f, err := os.OpenFile(*outputPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
Expand Down
65 changes: 24 additions & 41 deletions elect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/danl5/goelect/pkg/consensus"
"github.com/danl5/goelect/pkg/log"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/rpc"
)

const (
Expand All @@ -23,7 +22,11 @@ const (
)

// NewElect creates a new Elect instance
func NewElect(cfg *ElectConfig, logger log.Logger) (*Elect, error) {
func NewElect(
trans model.Transport,
transConfig model.TransportConfig,
cfg *ElectConfig,
logger log.Logger) (*Elect, error) {
var peers []config.NodeConfig
for _, n := range cfg.Peers {
peers = append(peers, config.NodeConfig{
Expand All @@ -48,27 +51,31 @@ func NewElect(cfg *ElectConfig, logger log.Logger) (*Elect, error) {
}

// new consensus instance
crh, err := consensus.NewConsensusRpcHandler(&config.Config{
ElectTimeout: time.Duration(electTimeout) * time.Millisecond,
HeartBeatInterval: time.Duration(heartbeatInterval) * time.Millisecond,
ConnectTimeout: time.Duration(connectTimeout) * time.Second,
Peers: peers,
}, logger, model.ElectNode{
Node: model.Node{
Address: cfg.Node.Address,
ID: cfg.Node.ID,
Tags: cfg.Node.Tags,
c, err := consensus.NewConsensus(
model.ElectNode{
Node: model.Node{
Address: cfg.Node.Address,
ID: cfg.Node.ID,
Tags: cfg.Node.Tags,
},
NoVote: cfg.Node.NoVote,
},
NoVote: cfg.Node.NoVote,
})
trans,
transConfig,
&config.Config{
ElectTimeout: time.Duration(electTimeout) * time.Millisecond,
HeartBeatInterval: time.Duration(heartbeatInterval) * time.Millisecond,
ConnectTimeout: time.Duration(connectTimeout) * time.Second,
Peers: peers,
}, logger)
if err != nil {
return nil, err
}
return &Elect{
cfg: cfg,
logger: logger,
callBackTimeout: cfg.CallBackTimeout,
consensus: crh,
consensus: c,
callBacks: cfg.CallBacks,
errChan: make(chan error, 10),
}, nil
Expand All @@ -81,7 +88,7 @@ type Elect struct {
// callBackTimeout is the timeout for the callbacks
callBackTimeout int
// consensus is a pointer to an RpcHandler which encapsulates the implementation of the consensus algorithm.
consensus *consensus.RpcHandler
consensus *consensus.Consensus
// errChan is a channel for errors
errChan chan error

Expand All @@ -94,12 +101,6 @@ type Elect struct {
// Run is the main function of the Elect struct
// It starts the RPC server, runs the consensus algorithm.
func (e *Elect) Run() error {
// start the RPC server
err := e.startServer()
if err != nil {
e.logger.Error("elect, failed to start rpc server", "error", err.Error())
return err
}

// run the consensus algorithm
stateChan, err := e.consensus.Run()
Expand All @@ -122,7 +123,7 @@ func (e *Elect) Errors() <-chan error {

// CurrentState returns current node state
func (e *Elect) CurrentState() string {
return e.consensus.CurrentState().State.String()
return e.consensus.CurrentState().String()
}

// ClusterState returns current cluster state
Expand All @@ -145,24 +146,6 @@ func (e *Elect) Leader() (string, error) {
return l.ID, nil
}

func (e *Elect) startServer() error {
rpcSvr, err := rpc.NewRpcServer(e.logger)
if err != nil {
return err
}

go func() {
err = rpcSvr.Start(e.cfg.Node.Address, e.consensus)
if err != nil {
e.logger.Error("elect, failed to start rpc server", "error", err.Error())
return
}
}()

e.logger.Info("start rpc server")
return nil
}

func (e *Elect) sendError(err error) {
select {
case e.errChan <- err:
Expand Down
158 changes: 96 additions & 62 deletions examples/onenode/node.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/*
Usage:
Single node:
go run node.go
Three nodes:
go run node.go --nodeaddr=127.0.0.1:9981 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
go run node.go --nodeaddr=127.0.0.1:9982 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
go run node.go --nodeaddr=127.0.0.1:9983 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
*/
package main

import (
Expand All @@ -10,6 +20,7 @@ import (

"github.com/danl5/goelect"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/transport/rpc"
)

var (
Expand All @@ -20,68 +31,61 @@ var (
peers = flag.String("peers", "127.0.0.1:9981", "peers node address separated by comma")
)

// Callback functions for state transitions
func enterLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter leader,", st.State, st.SrcState)
return nil
}

func leaveLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave leader,", st.State, st.SrcState)
return nil
}

func enterFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter follower,", st.State, st.SrcState)
return nil
}

func leaveFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave follower,", st.State, st.SrcState)
return nil
}

func enterCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter candidate,", st.State, st.SrcState)
return nil
}

func leaveCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave candidate,", st.State, st.SrcState)
return nil
}

func newElect() (*goelect.Elect, error) {
pAddrs := strings.Split(*peers, ",")
if len(pAddrs) == 0 {
peerAddrs := strings.Split(*peers, ",")
if len(peerAddrs) == 0 {
panic("peers is empty")
}

var peerNodes []goelect.Node
for _, pa := range pAddrs {
for _, pa := range peerAddrs {
peerNodes = append(peerNodes, goelect.Node{Address: pa, ID: pa})
}

e, err := goelect.NewElect(&goelect.ElectConfig{
ElectTimeout: 200,
HeartBeatInterval: 150,
ConnectTimeout: 10,
Peers: peerNodes,
// state transition callbacks
CallBacks: &goelect.StateCallBacks{
EnterLeader: enterLeader,
LeaveLeader: leaveLeader,
EnterFollower: enterFollower,
LeaveFollower: leaveFollower,
EnterCandidate: enterCandidate,
LeaveCandidate: leaveCandidate,
},
// self node
Node: goelect.Node{
Address: *nodeAddress,
ID: *nodeAddress,
},
}, slog.Default())
logger := slog.Default()

// rpc transport
rpcTransport, err := rpc.NewRPC(logger)
if err != nil {
return nil, err
}
// rpc transport config
rpcTransportConfig := &rpc.Config{
ServerCAs: nil,
ServerKey: "",
ServerCert: "",
ServerSkipVerify: false,
ClientCAs: nil,
ClientCert: "",
ClientKey: "",
ClientSkipVerify: false,
ConnectTimeout: 0,
}

// new elect
e, err := goelect.NewElect(
rpcTransport,
rpcTransportConfig,
&goelect.ElectConfig{
ElectTimeout: 200,
HeartBeatInterval: 150,
ConnectTimeout: 10,
Peers: peerNodes,
// state transition callbacks
CallBacks: &goelect.StateCallBacks{
EnterLeader: enterLeader,
LeaveLeader: leaveLeader,
EnterFollower: enterFollower,
LeaveFollower: leaveFollower,
EnterCandidate: enterCandidate,
LeaveCandidate: leaveCandidate,
},
// self node
Node: goelect.Node{
Address: *nodeAddress,
ID: *nodeAddress,
},
}, logger)
if err != nil {
return nil, err
}
Expand All @@ -98,31 +102,61 @@ func main() {
}

// run the elect
go func() {
err = e.Run()
if err != nil {
panic(err)
}
}()
err = e.Run()
if err != nil {
panic(err)
}

tk := time.NewTicker(5 * time.Second)
defer tk.Stop()
for {
select {
case <-tk.C:
// get and print the cluster state
cs, _ := e.ClusterState()
fmt.Println("Node\tState\t")
for addr, n := range cs.Nodes {
fmt.Println(addr, n.State.String())
}
fmt.Println()

// get and print the leader
leaderNode, _ := e.Leader()
fmt.Println("Leader:", leaderNode)

fmt.Println()
isLeader := e.IsLeader()
fmt.Println("IsLeader:", isLeader)
fmt.Println()
}
}
}

// Callback functions for state transitions
func enterLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter leader,", st.State, st.SrcState)
return nil
}

func leaveLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave leader,", st.State, st.SrcState)
return nil
}

func enterFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter follower,", st.State, st.SrcState)
return nil
}

func leaveFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave follower,", st.State, st.SrcState)
return nil
}

func enterCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter candidate,", st.State, st.SrcState)
return nil
}

func leaveCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave candidate,", st.State, st.SrcState)
return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.21

require (
github.com/looplab/fsm v1.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/silenceper/pool v1.0.0
github.com/stretchr/testify v1.9.0
github.com/ugorji/go/codec v1.2.12
golang.org/x/sync v0.7.0
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/silenceper/pool v1.0.0 h1:JTCaA+U6hJAA0P8nCx+JfsRCHMwLTfatsm5QXelffmU=
Expand All @@ -17,6 +19,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading

0 comments on commit 0fbb951

Please sign in to comment.