Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create acls #165

Merged
merged 106 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
b86d182
bump kafka-go to include acl apis
petedannemann Aug 11, 2023
f1ec537
add acl interfaces and aclinfo type stub
petedannemann Aug 11, 2023
07a63c7
pull latest kafka-go and use kafka-go aclresource type
petedannemann Aug 11, 2023
474c260
wip
petedannemann Sep 8, 2023
6e0ec36
fix test
petedannemann Sep 8, 2023
7b9454d
fix typos
petedannemann Sep 11, 2023
49f7e19
get acls working
petedannemann Sep 11, 2023
8382e98
getacls working
petedannemann Sep 12, 2023
7b8ee42
upgrade cobra to latest
petedannemann Sep 12, 2023
2a7d2de
finish separating get into separate subcommands
petedannemann Sep 12, 2023
1b84ef3
remove unneeded variables
petedannemann Sep 12, 2023
ff7d81d
Merge branch 'chore/separate-subcmd-for-get' into spike/acls-and-users
petedannemann Sep 12, 2023
ea28ea9
wip
petedannemann Sep 12, 2023
07667dd
pr feedback
petedannemann Sep 12, 2023
dcdd0e8
Revert "upgrade cobra to latest"
petedannemann Sep 13, 2023
d58262c
Merge branch 'chore/separate-subcmd-for-get' into spike/acls-and-users
petedannemann Sep 13, 2023
9f8f550
use getCliRunnerAndCtx in get acls
petedannemann Sep 13, 2023
4a78af2
more consistent variable names
petedannemann Sep 13, 2023
1dbf200
custom cobra type
petedannemann Sep 13, 2023
226ae1c
bring in new kafka-go
petedannemann Sep 13, 2023
acc011f
support resource pattern type
petedannemann Sep 13, 2023
2536e50
add support for acloperationtype and remove options for unknown
petedannemann Sep 14, 2023
62671b0
improve descriptions
petedannemann Sep 14, 2023
3f050ca
support permissiontype and host filters
petedannemann Sep 14, 2023
925670c
add resource name filter and fix permission type formatting
petedannemann Sep 14, 2023
5cff332
support principal filtering
petedannemann Sep 14, 2023
e6e8c63
improve docs
petedannemann Sep 14, 2023
e28cb01
add examples
petedannemann Sep 15, 2023
9735b1b
remove comment
petedannemann Sep 15, 2023
b19a4e1
remove TODOs that are complete
petedannemann Sep 15, 2023
43806c0
remove TODOs that are complete
petedannemann Sep 15, 2023
2fb8c8e
update README
petedannemann Sep 15, 2023
45d403d
fix test
petedannemann Sep 15, 2023
c5e909d
Merge branch 'master' into feat/get-acls
petedannemann Sep 15, 2023
b3a5ef8
wip
petedannemann Sep 15, 2023
6c1f7f1
fix error handling
petedannemann Sep 15, 2023
cd3a1f6
error handling for zk
petedannemann Sep 15, 2023
e0c8c63
more consistent error msg
petedannemann Sep 15, 2023
90147f3
clean up createacl
petedannemann Sep 15, 2023
7534ecf
add TestBrokerClientCreateACLReadOnly
petedannemann Sep 15, 2023
7551ece
improve zk tests
petedannemann Sep 15, 2023
df19f18
run acl tests in ci
petedannemann Sep 15, 2023
e799c40
enable acls for kafka 2.4.1 in ci
petedannemann Sep 15, 2023
cf690ee
fix zk tests
petedannemann Sep 15, 2023
41283c7
skip TestBrokerClientCreateACLReadOnly on old versions of kafka
petedannemann Sep 15, 2023
b553d3d
try to debug
petedannemann Sep 15, 2023
14811f7
handle nested errors from createacls
petedannemann Sep 15, 2023
7cc16a6
operations -> operation
petedannemann Sep 15, 2023
2d12642
operations -> operation
petedannemann Sep 15, 2023
fdb8288
remove setting log level in test
petedannemann Sep 15, 2023
4f4be70
Merge branch 'master' into feat/get-acls
petedannemann Sep 18, 2023
96dedfd
clean up allowed types in help command
petedannemann Sep 18, 2023
d65759d
fix merge conflict
petedannemann Sep 18, 2023
36d3de9
fix test
petedannemann Sep 19, 2023
9b1262a
add json annotations
petedannemann Sep 19, 2023
e960c37
bump kafka-go to version on main
petedannemann Sep 18, 2023
47650f9
wip
petedannemann Sep 19, 2023
8d9ab94
basic tests
petedannemann Sep 19, 2023
561eb2a
start on getusers cmd
petedannemann Sep 19, 2023
ead5d31
add json annotations
petedannemann Sep 19, 2023
5dcb773
get users working
petedannemann Sep 19, 2023
46a50ef
wip
petedannemann Sep 19, 2023
6800114
add todos and fix type annotaitons
petedannemann Sep 19, 2023
2b0d87c
improve test
petedannemann Sep 19, 2023
128be0d
use CanTestBrokerAdminSecurity to feature flag test
petedannemann Sep 19, 2023
a69e71f
update README
petedannemann Sep 19, 2023
5efedaa
remove duplicate test from merge conflicts
petedannemann Sep 20, 2023
83eca68
fix more merge conflicts
petedannemann Sep 20, 2023
a5dbb56
create user working
petedannemann Sep 26, 2023
5d46ebe
add uncommitted files
petedannemann Oct 3, 2023
0f8d283
start adding validation
petedannemann Oct 4, 2023
f7cd4f8
meta validation for users
petedannemann Oct 4, 2023
01efe00
wip
petedannemann Oct 10, 2023
ba844d0
Merge branch 'master' into feat/apply-acls
petedannemann Oct 12, 2023
79d9d3b
support dry run and skip confirm
petedannemann Oct 18, 2023
a335872
wip
petedannemann Oct 30, 2023
107751e
wip
petedannemann Nov 9, 2023
aa72764
add more files
petedannemann Nov 9, 2023
61e6925
resourcemta
petedannemann Nov 15, 2023
72fbe94
consistency checking for acls
petedannemann Nov 15, 2023
c1f10df
remove emacs backups
petedannemann Nov 15, 2023
a2d4686
remove user stuff
petedannemann Nov 15, 2023
a7c130c
remove diff from cluster.yaml file
petedannemann Nov 15, 2023
23b544a
remove diff from topic file
petedannemann Nov 15, 2023
7c57063
remove debug log
petedannemann Nov 15, 2023
5931b92
smaller diff
petedannemann Nov 15, 2023
3d52f5a
remove completed todos
petedannemann Nov 15, 2023
d0f0ec9
remove unused error helper
petedannemann Nov 15, 2023
8400a73
add missing meta file
petedannemann Nov 15, 2023
9b3b13c
Merge branch 'master' into feat/create-acls
petedannemann Nov 15, 2023
1e7cc3b
skip ACL tests when ACLs cannot be used due to kafka version limitations
petedannemann Nov 15, 2023
24fffd0
fix loadacls test
petedannemann Nov 15, 2023
32a9490
add more todos
petedannemann Nov 15, 2023
39349ad
add validation and set defaults
petedannemann Nov 16, 2023
0cd4dcf
don't use ioutil
petedannemann Nov 16, 2023
41981eb
move confirm to util package
petedannemann Nov 16, 2023
a6d7a5e
move confirm to util package
petedannemann Nov 16, 2023
3b4fd43
add create to README
petedannemann Nov 16, 2023
b1b5f21
use validation and setdefaults
petedannemann Nov 16, 2023
d749523
add example acl
petedannemann Nov 16, 2023
67f7178
fix formatting in readme
petedannemann Nov 16, 2023
3ae74de
use released version of kafka-go
petedannemann Nov 16, 2023
c7db718
fix spelling
petedannemann Nov 16, 2023
1f8d7b4
make invalid field more obvious
petedannemann Nov 16, 2023
23f7979
fix dryrun and skip confirm
petedannemann Nov 16, 2023
2c4c747
fix grammar
petedannemann Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ vendor/
build/

.vscode

# Emacs backups
*~
200 changes: 200 additions & 0 deletions cmd/topicctl/subcmd/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package subcmd

import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/config"
"github.com/segmentio/topicctl/pkg/create"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var createCmd = &cobra.Command{
Use: "create [resource type]",
Short: "creates one or more resources",
PersistentPreRunE: createPreRun,
}

type createCmdConfig struct {
dryRun bool
pathPrefix string
skipConfirm bool

shared sharedOptions
}

var createConfig createCmdConfig

func init() {
createCmd.Flags().BoolVar(
&createConfig.dryRun,
"dry-run",
false,
"Do a dry-run",
)
createCmd.Flags().StringVar(
&createConfig.pathPrefix,
"path-prefix",
os.Getenv("TOPICCTL_ACL_PATH_PREFIX"),
"Prefix for ACL config paths",
)
createCmd.Flags().BoolVar(
&createConfig.skipConfirm,
"skip-confirm",
false,
"Skip confirmation prompts during creation process",
)

addSharedFlags(createCmd, &createConfig.shared)
createCmd.AddCommand(
createACLsCmd(),
)
RootCmd.AddCommand(createCmd)
}

func createPreRun(cmd *cobra.Command, args []string) error {
if err := RootCmd.PersistentPreRunE(cmd, args); err != nil {
return err
}
return createConfig.shared.validate()
}

func createACLsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "acls [acl configs]",
Short: "creates ACLs from configuration files",
Args: cobra.MinimumNArgs(1),
RunE: createACLRun,
PreRunE: createPreRun,
}

return cmd
}

func createACLRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()

// Keep a cache of the admin clients with the cluster config path as the key
adminClients := map[string]admin.Client{}

defer func() {
for _, adminClient := range adminClients {
adminClient.Close()
}
}()

matchCount := 0

for _, arg := range args {
if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) {
arg = filepath.Join(createConfig.pathPrefix, arg)
}

matches, err := filepath.Glob(arg)
if err != nil {
return err
}

for _, match := range matches {
matchCount++
if err := createACL(ctx, match, adminClients); err != nil {
return err
}
}
}

if matchCount == 0 {
return fmt.Errorf("No ACL configs match the provided args (%+v)", args)
}

return nil
}

func createACL(
ctx context.Context,
aclConfigPath string,
adminClients map[string]admin.Client,
) error {
clusterConfigPath, err := clusterConfigForACLCreate(aclConfigPath)
if err != nil {
return err
}

aclConfigs, err := config.LoadACLsFile(aclConfigPath)
if err != nil {
return err
}

clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv)
if err != nil {
return err
}

adminClient, ok := adminClients[clusterConfigPath]
if !ok {
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
applyConfig.dryRun,
applyConfig.shared.saslUsername,
applyConfig.shared.saslPassword,
)
if err != nil {
return err
}
adminClients[clusterConfigPath] = adminClient
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)

for _, aclConfig := range aclConfigs {
log.Infof(
"Processing ACL %s in config %s with cluster config %s",
aclConfig.Meta.Name,
aclConfigPath,
clusterConfigPath,
)

creatorConfig := create.ACLCreatorConfig{
DryRun: applyConfig.dryRun,
SkipConfirm: applyConfig.skipConfirm,
ACLConfig: aclConfig,
ClusterConfig: clusterConfig,
}

if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil {
return err
}
}

return nil
}

func clusterConfigForACLCreate(aclConfigPath string) (string, error) {
if createConfig.shared.clusterConfig != "" {
return createConfig.shared.clusterConfig, nil
}

return filepath.Abs(
filepath.Join(
filepath.Dir(aclConfigPath),
"..",
"cluster.yaml",
),
)
}
5 changes: 3 additions & 2 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package subcmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
Expand Down Expand Up @@ -159,7 +160,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {

for _, topicConfig := range topicConfigs {
// topic config should be consistent with the cluster config
if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil {
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/olekukonko/tablewriter v0.0.5
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/segmentio/kafka-go v0.4.44
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
github.com/segmentio/kafka-go v0.4.45-0.20231030174323-c6378c391a97
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/stretchr/testify v1.8.0
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/xdg-go/pbkdf2 v1.0.0
golang.org/x/crypto v0.14.0
)

Expand All @@ -37,7 +38,6 @@ require (
github.com/pkg/term v0.0.0-20200520122047-c3ffed290a03 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.44 h1:Vjjksniy0WSTZ7CuVJrz1k04UoZeTc77UV6Yyk6tLY4=
github.com/segmentio/kafka-go v0.4.44/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go v0.4.45-0.20231030174323-c6378c391a97 h1:vKYoioQZ7SgGcES2pKoNq7zV8ncKNvblHp+0O+dOeI0=
github.com/segmentio/kafka-go v0.4.45-0.20231030174323-c6378c391a97/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070 h1:ng1Z/x5LLOIrzgWUOtypsCkR+dHTux7slqOCVkuwQBo=
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070/go.mod h1:IjMUGcOJoATsnlqAProGN1ezXeEgU5GCWr1/EzmkEMA=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
Expand Down
15 changes: 10 additions & 5 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewBrokerAdminClient(
if _, ok := maxVersions["DescribeUserScramCredentials"]; ok {
supportedFeatures.Users = true
}

log.Debugf("Supported features: %+v", supportedFeatures)

adminClient := &BrokerAdminClient{
Expand Down Expand Up @@ -407,13 +408,17 @@ func (c *BrokerAdminClient) GetUsers(
return nil, err
}

if err = util.DescribeUserScramCredentialsResponseResultsError(resp.Results); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had added this error handler utility function before and discovered it had incorrect behavior

return nil, err
}

results := []UserInfo{}

for _, result := range resp.Results {
if result.Error != nil {
log.Debugf("got here")
if errors.Is(result.Error, kafka.ResourceNotFound) {
log.Debugf("Skipping over user %s because it does not exist", result.User)
continue
}
return nil, fmt.Errorf("Error getting description of user %s: %+v", result.User, result.Error)
}
var credentials []CredentialInfo
for _, credential := range result.CredentialInfos {
credentials = append(credentials, CredentialInfo{
Expand Down Expand Up @@ -808,7 +813,7 @@ func (c *BrokerAdminClient) GetACLs(
return aclinfos, nil
}

// CreateACLs creates an ACL in the cluster.
// CreateACLs creates ACLs in the cluster.
func (c *BrokerAdminClient) CreateACLs(
ctx context.Context,
acls []kafka.ACLEntry,
Expand Down
1 change: 0 additions & 1 deletion pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,5 +770,4 @@ func TestBrokerClientCreateACLReadOnly(t *testing.T) {

err = client.CreateACLs(ctx, []kafka.ACLEntry{})
assert.Equal(t, err, errors.New("Cannot create ACL in read-only mode"))

}
2 changes: 1 addition & 1 deletion pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Client interface {
config kafka.TopicConfig,
) error

// Create ACLs creates ACLs in the cluster.
// CreateACLs creates ACLs in the cluster.
CreateACLs(
ctx context.Context,
acls []kafka.ACLEntry,
Expand Down
16 changes: 8 additions & 8 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type ACLInfo struct {
// as a Cobra flag.
type ResourceType kafka.ResourceType

var resourceTypeMap = map[string]kafka.ResourceType{
var ResourceTypeMap = map[string]kafka.ResourceType{
"any": kafka.ResourceTypeAny,
"topic": kafka.ResourceTypeTopic,
"group": kafka.ResourceTypeGroup,
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *ResourceType) String() string {

// Set is used by Cobra to set the value of a variable from a Cobra flag.
func (r *ResourceType) Set(v string) error {
rt, ok := resourceTypeMap[strings.ToLower(v)]
rt, ok := ResourceTypeMap[strings.ToLower(v)]
if !ok {
return errors.New(`must be one of "any", "topic", "group", "cluster", "transactionalid", or "delegationtoken"`)
}
Expand All @@ -144,7 +144,7 @@ func (r *ResourceType) Type() string {
// as a Cobra flag.
type PatternType kafka.PatternType

var patternTypeMap = map[string]kafka.PatternType{
var PatternTypeMap = map[string]kafka.PatternType{
"any": kafka.PatternTypeAny,
"match": kafka.PatternTypeMatch,
"literal": kafka.PatternTypeLiteral,
Expand All @@ -169,7 +169,7 @@ func (p *PatternType) String() string {

// Set is used by Cobra to set the value of a variable from a Cobra flag.
func (p *PatternType) Set(v string) error {
pt, ok := patternTypeMap[strings.ToLower(v)]
pt, ok := PatternTypeMap[strings.ToLower(v)]
if !ok {
return errors.New(`must be one of "any", "match", "literal", or "prefixed"`)
}
Expand All @@ -188,7 +188,7 @@ func (r *PatternType) Type() string {
// as a Cobra flag.
type ACLOperationType kafka.ACLOperationType

var aclOperationTypeMap = map[string]kafka.ACLOperationType{
var AclOperationTypeMap = map[string]kafka.ACLOperationType{
"any": kafka.ACLOperationTypeAny,
"all": kafka.ACLOperationTypeAll,
"read": kafka.ACLOperationTypeRead,
Expand Down Expand Up @@ -237,7 +237,7 @@ func (o *ACLOperationType) String() string {

// Set is used by Cobra to set the value of a variable from a Cobra flag.
func (o *ACLOperationType) Set(v string) error {
ot, ok := aclOperationTypeMap[strings.ToLower(v)]
ot, ok := AclOperationTypeMap[strings.ToLower(v)]
if !ok {
return errors.New(`must be one of "any", "all", "read", "write", "create", "delete", "alter", "describe", "clusteraction", "describeconfigs", "alterconfigs" or "idempotentwrite"`)
}
Expand All @@ -256,7 +256,7 @@ func (o *ACLOperationType) Type() string {
// as a Cobra flag.
type ACLPermissionType kafka.ACLPermissionType

var aclPermissionTypeMap = map[string]kafka.ACLPermissionType{
var AclPermissionTypeMap = map[string]kafka.ACLPermissionType{
"any": kafka.ACLPermissionTypeAny,
"allow": kafka.ACLPermissionTypeAllow,
"deny": kafka.ACLPermissionTypeDeny,
Expand All @@ -278,7 +278,7 @@ func (p *ACLPermissionType) String() string {

// Set is used by Cobra to set the value of a variable from a Cobra flag.
func (p *ACLPermissionType) Set(v string) error {
pt, ok := aclPermissionTypeMap[strings.ToLower(v)]
pt, ok := AclPermissionTypeMap[strings.ToLower(v)]
if !ok {
return errors.New(`must be one of "any", "allow", or "deny"`)
}
Expand Down
Loading
Loading