Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring #9

Merged
merged 14 commits into from
Dec 5, 2023
Merged
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var logger = slog.New(
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "bedel",
Short: "Small utility to sync redis acls with a master instance",
Short: "Small utility to sync redis acls with a primary instance",
}

// Execute adds all child commands to the root command and sets flags appropriately.
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// runCmd represents the run command
var runCmd = &cobra.Command{
Use: "run",
Short: "Run the acl manager in mood loop, it will sync the follower with the master",
Short: "Run the acl manager in mood loop, it will sync the follower with the primary",
Run: func(cmd *cobra.Command, args []string) {
mgr := aclmanager.New(viper.GetString("address"), viper.GetString("username"), viper.GetString("password"))
ctx := cmd.Context()
Expand Down
28 changes: 24 additions & 4 deletions cmd/runOnce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,39 @@ package cmd
import (
"github.com/ncode/bedel/pkg/aclmanager"
"github.com/spf13/viper"
"log/slog"
"os"

"github.com/spf13/cobra"
)

// runOnceCmd represents the runOnce command
var runOnceCmd = &cobra.Command{
Use: "runOnce",
Short: "Run the acl manager once, it will sync the follower with the master",
Short: "Run the acl manager once, it will sync the follower with the primary",
Run: func(cmd *cobra.Command, args []string) {
mgr := aclmanager.New(viper.GetString("address"), viper.GetString("username"), viper.GetString("password"))
err := mgr.SyncAcls()
ctx := cmd.Context()
aclManager := aclmanager.New(viper.GetString("address"), viper.GetString("username"), viper.GetString("password"))
function, err := aclManager.CurrentFunction(ctx)
if err != nil {
panic(err)
slog.Warn("unable to check if it's a Primary", "message", err)
os.Exit(1)
}
if function == aclmanager.Follower {
primary, err := aclManager.Primary(ctx)
if err != nil {
slog.Warn("unable to find Primary", "message", err)
os.Exit(1)
}
var added, deleted []string
added, deleted, err = aclManager.SyncAcls(ctx, primary)
if err != nil {
slog.Warn("unable to sync acls from Primary", "message", err)
os.Exit(1)
}
slog.Info("Synced acls from Primary", "added", added, "deleted", deleted)
} else {
slog.Info("Not a follower, nothing to do")
}
},
}
Expand Down
134 changes: 64 additions & 70 deletions pkg/aclmanager/aclmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"log/slog"
"regexp"
"strings"
"sync/atomic"
"time"
)

const (
Primary = iota
Follower
Unknown
)

var (
Expand All @@ -31,6 +33,8 @@ type AclManager struct {
Username string
Password string
RedisClient *redis.Client
primary atomic.Bool
nodes map[string]int
}

// New creates a new AclManager
Expand All @@ -45,18 +49,23 @@ func New(addr string, username string, password string) *AclManager {
Username: username,
Password: password,
RedisClient: redisClient,
nodes: make(map[string]int),
}
}

type NodeInfo struct {
Address string
Function int
}
// findNodes returns a list of nodes in the cluster based on the redis info replication command
func (a *AclManager) findNodes(ctx context.Context) (err error) {
slog.Debug("Finding nodes")
replicationInfo, err := a.RedisClient.Info(ctx, "replication").Result()
if err != nil {
return err
}

func parseRedisOutput(output string) (nodes []NodeInfo, err error) {
var masterHost, masterPort string
a.primary.Store(role.MatchString(replicationInfo))

scanner := bufio.NewScanner(strings.NewReader(output))
var masterHost, masterPort string
var nodes []string
scanner := bufio.NewScanner(strings.NewReader(replicationInfo))
for scanner.Scan() {
line := scanner.Text()

Expand All @@ -68,84 +77,58 @@ func parseRedisOutput(output string) (nodes []NodeInfo, err error) {
slog.Debug("Parsing line looking for Follower", "content", line)
if matches := primaryPortRegex.FindStringSubmatch(line); matches != nil {
masterPort = matches[1]
nodes = append(nodes, NodeInfo{Address: fmt.Sprintf("%s:%s", masterHost, masterPort), Function: Primary})
nodes = append(nodes, fmt.Sprintf("%s:%s", masterHost, masterPort))
a.nodes[fmt.Sprintf("%s:%s", masterHost, masterPort)] = Primary
}

if matches := followerRegex.FindStringSubmatch(line); matches != nil {
ip := matches[followerRegex.SubexpIndex("ip")]
port := matches[followerRegex.SubexpIndex("port")]
nodes = append(nodes, NodeInfo{Address: fmt.Sprintf("%s:%s", ip, port), Function: Follower})
nodes = append(nodes, fmt.Sprintf("%s:%s", ip, port))
a.nodes[fmt.Sprintf("%s:%s", ip, port)] = Follower
}
}

if err := scanner.Err(); err != nil {
return nodes, err
}

return nodes, err
}

// FindNodes returns a list of nodes in the cluster based on the redis info replication command
func (a *AclManager) FindNodes() (nodes []NodeInfo, err error) {
slog.Debug("Finding nodes")
replicationInfo, err := a.RedisClient.Info(context.Background(), "replication").Result()
if err != nil {
return nodes, err
return err
}

nodes, err = parseRedisOutput(replicationInfo)
if err != nil {
return nodes, err
for _, node := range nodes {
if _, ok := a.nodes[node]; !ok {
delete(a.nodes, node)
}
}

return nodes, err
return err
}

// CurrentFunction check if the current node is the Primary node
func (a *AclManager) CurrentFunction() (function int, err error) {
func (a *AclManager) CurrentFunction(ctx context.Context) (function int, err error) {
slog.Debug("Check node current function")
replicationInfo, err := a.RedisClient.Info(context.Background(), "replication").Result()
err = a.findNodes(ctx)
if err != nil {
return function, err
return Unknown, err
}

if role.MatchString(replicationInfo) {
return Primary, nil
if a.primary.Load() {
return Primary, err
}

return Follower, err
}

// SyncAcls connects to master node and syncs the acls to the current node
func (a *AclManager) SyncAcls() (err error) {
slog.Debug("Syncing acls")
nodes, err := a.FindNodes()
func (a *AclManager) Primary(ctx context.Context) (primary *AclManager, err error) {
err = a.findNodes(ctx)
if err != nil {
return err
return nil, err
}

ctx := context.Background()
for _, node := range nodes {
if node.Function == Primary {
if a.Addr == node.Address {
return err
}
master := redis.NewClient(&redis.Options{
Addr: node.Address,
Username: a.Username,
Password: a.Password,
})
defer master.Close()

added, deleted, err := mirrorAcls(ctx, master, a.RedisClient)
if err != nil {
return fmt.Errorf("error syncing acls: %v", err)
}
slog.Info("Synced acls", "added", added, "deleted", deleted)
for address, function := range a.nodes {
if function == Primary {
return New(address, a.Username, a.Password), err
}
}

return err
return nil, err
}

// Close closes the redis client
Expand Down Expand Up @@ -181,15 +164,19 @@ func listAcls(ctx context.Context, client *redis.Client) (acls []string, err err
return acls, nil
}

// mirrorAcls returns a list of acls in the cluster based on the redis acl list command
func mirrorAcls(ctx context.Context, source *redis.Client, destination *redis.Client) (added []string, deleted []string, err error) {
slog.Debug("Mirroring acls")
sourceAcls, err := listAcls(ctx, source)
// SyncAcls connects to master node and syncs the acls to the current node
func (a *AclManager) SyncAcls(ctx context.Context, primary *AclManager) (added []string, deleted []string, err error) {
slog.Debug("Syncing acls")
if primary == nil {
return added, deleted, fmt.Errorf("no primary found")
}

sourceAcls, err := listAcls(ctx, primary.RedisClient)
if err != nil {
return nil, nil, fmt.Errorf("error listing source acls: %v", err)
}

destinationAcls, err := listAcls(ctx, destination)
destinationAcls, err := listAcls(ctx, a.RedisClient)
if err != nil {
return nil, nil, fmt.Errorf("error listing current acls: %v", err)
}
Expand All @@ -201,18 +188,16 @@ func mirrorAcls(ctx context.Context, source *redis.Client, destination *redis.Cl
}

// Delete ACLs not in source and remove from the toAdd map if present in destination
var insync uint
for _, acl := range destinationAcls {
username := strings.Split(acl, " ")[1]
if _, found := toAdd[acl]; found {
// If found in source, don't need to add, so remove from map
delete(toAdd, acl)
slog.Debug("ACL already in sync", "username", username)
insync++
} else {
// If not found in source, delete from destination
slog.Debug("Deleting ACL", "username", username)
if err := destination.Do(context.Background(), "ACL", "DELUSER", username).Err(); err != nil {
if err := a.RedisClient.Do(context.Background(), "ACL", "DELUSER", username).Err(); err != nil {
return nil, nil, fmt.Errorf("error deleting acl: %v", err)
}
deleted = append(deleted, username)
Expand All @@ -228,7 +213,7 @@ func mirrorAcls(ctx context.Context, source *redis.Client, destination *redis.Cl
for i, s := range command {
commandInterfce[i] = s
}
if err := destination.Do(context.Background(), commandInterfce...).Err(); err != nil {
if err := a.RedisClient.Do(context.Background(), commandInterfce...).Err(); err != nil {
return nil, nil, fmt.Errorf("error setting acl: %v", err)
}
added = append(added, username)
Expand All @@ -242,22 +227,31 @@ func (a *AclManager) Loop(ctx context.Context) (err error) {
ticker := time.NewTicker(viper.GetDuration("syncInterval") * time.Second)
defer ticker.Stop()

var primary *AclManager
for {
select {
case <-ctx.Done():
return err
case <-ticker.C:
function, e := a.CurrentFunction()
function, e := a.CurrentFunction(ctx)
if err != nil {
slog.Warn("unable to check if it's a primary", "message", e)
err = fmt.Errorf("unable to check if it's a primary: %w", e)
slog.Warn("unable to check if it's a Primary", "message", err)
err = fmt.Errorf("unable to check if it's a Primary: %w", err)
}
if function == Follower {
e = a.SyncAcls()
primary, err = a.Primary(ctx)
if err != nil {
slog.Warn("unable to find Primary", "message", e)
continue
}
var added, deleted []string
added, deleted, err = a.SyncAcls(ctx, primary)
if err != nil {
slog.Warn("unable to sync acls from primary", "message", e)
slog.Warn("unable to sync acls from Primary", "message", err)
err = fmt.Errorf("unable to sync acls from Primary: %w", err)
continue
}
err = fmt.Errorf("unable to sync acls from primary: %w", e)
slog.Info("Synced acls from Primary", "added", added, "deleted", deleted)
}
}
}
Expand Down
Loading