Skip to content

Commit

Permalink
Working RAFT with gRPC and a small FSM demo
Browse files Browse the repository at this point in the history
  • Loading branch information
lauranooooo committed Jun 4, 2024
1 parent 16d0cbb commit ff88b83
Show file tree
Hide file tree
Showing 20 changed files with 4,871 additions and 0 deletions.
16 changes: 16 additions & 0 deletions projects/raft-otel/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.22.3

WORKDIR /

COPY go.mod go.sum ./
RUN go mod download

COPY *.go ./
COPY main/*.go ./main/
COPY raft_proto/*.go ./raft_proto/


EXPOSE 7600

# Bit lazy not to build properly but that's not the main point of this exercise
CMD ["go", "run", "main/main.go"]
16 changes: 16 additions & 0 deletions projects/raft-otel/Dockerfile.client
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.22.3

WORKDIR /

COPY go.mod go.sum ./
RUN go mod download

COPY *.go ./
COPY client/*.go ./client/
COPY raft_proto/*.go ./raft_proto/


EXPOSE 7600

# Bit lazy not to build properly but that's not the main point of this exercise
CMD ["go", "run", "client/client.go"]
60 changes: 60 additions & 0 deletions projects/raft-otel/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"raft/raft_proto"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const port = 7600

func main() {
addr := flag.String("dns", "raft", "dns address for raft cluster")

if addr == nil || *addr == "" {
fmt.Printf("Must supply dns address of cluster\n")
os.Exit(1)
}

time.Sleep(time.Second * 5) // wait for raft servers to come up

ips, err := net.LookupIP(*addr)
if err != nil {
fmt.Printf("Could not get IPs: %v\n", err)
os.Exit(1)
}

clients := make([]raft_proto.RaftKVServiceClient, 0)

for _, ip := range ips {
fmt.Printf("Connecting to %s\n", ip.String())
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip.String(), port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("%v", err)
}
client := raft_proto.NewRaftKVServiceClient(conn)
clients = append(clients, client)
}

for {
for _, c := range clients {
n := time.Now().Second()
res, err := c.Set(context.TODO(), &raft_proto.SetRequest{Keyname: "cursec", Value: fmt.Sprintf("%d", n)})
fmt.Printf("Called set cursec %d, got %v, %v\n", n, res, err)

time.Sleep(1 * time.Second) // allow consensus to happen

getres, err := c.Get(context.TODO(), &raft_proto.GetRequest{Keyname: "cursec"})
fmt.Printf("Called get cursec, got %v, %v\n", getres, err)
}
time.Sleep(5 * time.Second)
}
}
60 changes: 60 additions & 0 deletions projects/raft-otel/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"raft/raft_proto"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const port = 7600

func main() {
addr := flag.String("dns", "raft", "dns address for raft cluster")

if addr == nil || *addr == "" {
fmt.Printf("Must supply dns address of cluster\n")
os.Exit(1)
}

time.Sleep(time.Second * 5) // wait for raft servers to come up

ips, err := net.LookupIP(*addr)
if err != nil {
fmt.Printf("Could not get IPs: %v\n", err)
os.Exit(1)
}

clients := make([]raft_proto.RaftKVServiceClient, 0)

for _, ip := range ips {
fmt.Printf("Connecting to %s\n", ip.String())
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip.String(), port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("%v", err)
}
client := raft_proto.NewRaftKVServiceClient(conn)
clients = append(clients, client)
}

for {
for _, c := range clients {
n := time.Now().Second()
res, err := c.Set(context.TODO(), &raft_proto.SetRequest{Keyname: "cursec", Value: fmt.Sprintf("%d", n)})
fmt.Printf("Called set cursec %d, got %v, %v\n", n, res, err)

time.Sleep(1 * time.Second) // allow consensus to happen

getres, err := c.Get(context.TODO(), &raft_proto.GetRequest{Keyname: "cursec"})
fmt.Printf("Called get cursec, got %v, %v\n", getres, err)
}
time.Sleep(5 * time.Second)
}
}
12 changes: 12 additions & 0 deletions projects/raft-otel/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
raft:
build: .
deploy:
mode: replicated
replicas: 3
client:
build:
dockerfile: ./Dockerfile.client
deploy:
mode: replicated
replicas: 1
16 changes: 16 additions & 0 deletions projects/raft-otel/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module raft

go 1.19

require (
github.com/fortytw2/leaktest v1.3.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
)
15 changes: 15 additions & 0 deletions projects/raft-otel/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
113 changes: 113 additions & 0 deletions projects/raft-otel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"flag"
"fmt"
"net"
"os"
"os/signal"
"raft"
"strings"
"syscall"
"time"
)

const port = 7600

func main() {
addr := flag.String("dns", "raft", "dns address for raft cluster")
if_addr := flag.String("if", "eth0", "use IPV4 address of this interface") // eth0 works on docker, may vary for other platforms

if addr == nil || *addr == "" {
fmt.Printf("Must supply dns address of cluster\n")
os.Exit(1)
}

id := getOwnAddr(*if_addr)
fmt.Printf("My address/node ID is %s\n", id)

ready := make(chan interface{})
storage := raft.NewMapStorage()
commitChan := make(chan raft.CommitEntry)
server := raft.NewServer(id, id, storage, ready, commitChan, port)
server.Serve(raft.NewKV())

ips, err := net.LookupIP(*addr)
if err != nil {
fmt.Printf("Could not get IPs: %v\n", err)
os.Exit(1)
}

// Connect to all peers with appropriate waits
// TODO: we only do this once, on startup - we really should periodically check to see if the DNS listing for peers has changed
for _, ip := range ips {
// if not own IP
if !ownAddr(ip, id) {
peerAddr := fmt.Sprintf("%s:%d", ip.String(), port)

connected := false
for rt := 0; rt <= 3 && !connected; rt++ {
fmt.Printf("Connecting to peer %s\n", peerAddr)
err = server.ConnectToPeer(peerAddr, peerAddr)
if err == nil {
connected = true
} else { // probably just not started up yet, retry
fmt.Printf("Error connecting to peer: %+v", err)
time.Sleep(time.Duration(rt+1) * time.Second)
}
}
if err != nil {
fmt.Printf("Exhausted retries connecting to peer %s", peerAddr)
os.Exit(1)
}
}
}

close(ready) // start raft server, peers are connected

gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
<-gracefulShutdown
server.DisconnectAll()
server.Shutdown()
}

func getOwnAddr(intf string) string {
ifs, err := net.Interfaces()
if err != nil {
fmt.Printf("Could not get intf: %v\n", err)
os.Exit(1)
}

for _, cif := range ifs {
if cif.Name == intf {
ads, _ := cif.Addrs()
for _, addr := range ads {
if isIPV4(addr.String()) {
ip := getIP(addr.String())
return ip.String()
}

}
}
}

fmt.Printf("Could not find intf: %s\n", intf)
os.Exit(1)
return ""
}

func isIPV4(addr string) bool {
parts := strings.Split(addr, "::")
return len(parts) == 1
}

func getIP(addr string) net.IP {
parts := strings.Split(addr, "/")
return net.ParseIP(parts[0])
}

func ownAddr(ip net.IP, myAddr string) bool {
res := ip.String() == myAddr
return res
}
Loading

0 comments on commit ff88b83

Please sign in to comment.