Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create acls #165

Merged
merged 106 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
b86d182
bump kafka-go to include acl apis
petedannemann Aug 11, 2023
f1ec537
add acl interfaces and aclinfo type stub
petedannemann Aug 11, 2023
07a63c7
pull latest kafka-go and use kafka-go aclresource type
petedannemann Aug 11, 2023
474c260
wip
petedannemann Sep 8, 2023
6e0ec36
fix test
petedannemann Sep 8, 2023
7b9454d
fix typos
petedannemann Sep 11, 2023
49f7e19
get acls working
petedannemann Sep 11, 2023
8382e98
getacls working
petedannemann Sep 12, 2023
7b8ee42
upgrade cobra to latest
petedannemann Sep 12, 2023
2a7d2de
finish separating get into separate subcommands
petedannemann Sep 12, 2023
1b84ef3
remove unneeded variables
petedannemann Sep 12, 2023
ff7d81d
Merge branch 'chore/separate-subcmd-for-get' into spike/acls-and-users
petedannemann Sep 12, 2023
ea28ea9
wip
petedannemann Sep 12, 2023
07667dd
pr feedback
petedannemann Sep 12, 2023
dcdd0e8
Revert "upgrade cobra to latest"
petedannemann Sep 13, 2023
d58262c
Merge branch 'chore/separate-subcmd-for-get' into spike/acls-and-users
petedannemann Sep 13, 2023
9f8f550
use getCliRunnerAndCtx in get acls
petedannemann Sep 13, 2023
4a78af2
more consistent variable names
petedannemann Sep 13, 2023
1dbf200
custom cobra type
petedannemann Sep 13, 2023
226ae1c
bring in new kafka-go
petedannemann Sep 13, 2023
acc011f
support resource pattern type
petedannemann Sep 13, 2023
2536e50
add support for acloperationtype and remove options for unknown
petedannemann Sep 14, 2023
62671b0
improve descriptions
petedannemann Sep 14, 2023
3f050ca
support permissiontype and host filters
petedannemann Sep 14, 2023
925670c
add resource name filter and fix permission type formatting
petedannemann Sep 14, 2023
5cff332
support principal filtering
petedannemann Sep 14, 2023
e6e8c63
improve docs
petedannemann Sep 14, 2023
e28cb01
add examples
petedannemann Sep 15, 2023
9735b1b
remove comment
petedannemann Sep 15, 2023
b19a4e1
remove TODOs that are complete
petedannemann Sep 15, 2023
43806c0
remove TODOs that are complete
petedannemann Sep 15, 2023
2fb8c8e
update README
petedannemann Sep 15, 2023
45d403d
fix test
petedannemann Sep 15, 2023
c5e909d
Merge branch 'master' into feat/get-acls
petedannemann Sep 15, 2023
b3a5ef8
wip
petedannemann Sep 15, 2023
6c1f7f1
fix error handling
petedannemann Sep 15, 2023
cd3a1f6
error handling for zk
petedannemann Sep 15, 2023
e0c8c63
more consistent error msg
petedannemann Sep 15, 2023
90147f3
clean up createacl
petedannemann Sep 15, 2023
7534ecf
add TestBrokerClientCreateACLReadOnly
petedannemann Sep 15, 2023
7551ece
improve zk tests
petedannemann Sep 15, 2023
df19f18
run acl tests in ci
petedannemann Sep 15, 2023
e799c40
enable acls for kafka 2.4.1 in ci
petedannemann Sep 15, 2023
cf690ee
fix zk tests
petedannemann Sep 15, 2023
41283c7
skip TestBrokerClientCreateACLReadOnly on old versions of kafka
petedannemann Sep 15, 2023
b553d3d
try to debug
petedannemann Sep 15, 2023
14811f7
handle nested errors from createacls
petedannemann Sep 15, 2023
7cc16a6
operations -> operation
petedannemann Sep 15, 2023
2d12642
operations -> operation
petedannemann Sep 15, 2023
fdb8288
remove setting log level in test
petedannemann Sep 15, 2023
4f4be70
Merge branch 'master' into feat/get-acls
petedannemann Sep 18, 2023
96dedfd
clean up allowed types in help command
petedannemann Sep 18, 2023
d65759d
fix merge conflict
petedannemann Sep 18, 2023
36d3de9
fix test
petedannemann Sep 19, 2023
9b1262a
add json annotations
petedannemann Sep 19, 2023
e960c37
bump kafka-go to version on main
petedannemann Sep 18, 2023
47650f9
wip
petedannemann Sep 19, 2023
8d9ab94
basic tests
petedannemann Sep 19, 2023
561eb2a
start on getusers cmd
petedannemann Sep 19, 2023
ead5d31
add json annotations
petedannemann Sep 19, 2023
5dcb773
get users working
petedannemann Sep 19, 2023
46a50ef
wip
petedannemann Sep 19, 2023
6800114
add todos and fix type annotaitons
petedannemann Sep 19, 2023
2b0d87c
improve test
petedannemann Sep 19, 2023
128be0d
use CanTestBrokerAdminSecurity to feature flag test
petedannemann Sep 19, 2023
a69e71f
update README
petedannemann Sep 19, 2023
5efedaa
remove duplicate test from merge conflicts
petedannemann Sep 20, 2023
83eca68
fix more merge conflicts
petedannemann Sep 20, 2023
a5dbb56
create user working
petedannemann Sep 26, 2023
5d46ebe
add uncommitted files
petedannemann Oct 3, 2023
0f8d283
start adding validation
petedannemann Oct 4, 2023
f7cd4f8
meta validation for users
petedannemann Oct 4, 2023
01efe00
wip
petedannemann Oct 10, 2023
ba844d0
Merge branch 'master' into feat/apply-acls
petedannemann Oct 12, 2023
79d9d3b
support dry run and skip confirm
petedannemann Oct 18, 2023
a335872
wip
petedannemann Oct 30, 2023
107751e
wip
petedannemann Nov 9, 2023
aa72764
add more files
petedannemann Nov 9, 2023
61e6925
resourcemta
petedannemann Nov 15, 2023
72fbe94
consistency checking for acls
petedannemann Nov 15, 2023
c1f10df
remove emacs backups
petedannemann Nov 15, 2023
a2d4686
remove user stuff
petedannemann Nov 15, 2023
a7c130c
remove diff from cluster.yaml file
petedannemann Nov 15, 2023
23b544a
remove diff from topic file
petedannemann Nov 15, 2023
7c57063
remove debug log
petedannemann Nov 15, 2023
5931b92
smaller diff
petedannemann Nov 15, 2023
3d52f5a
remove completed todos
petedannemann Nov 15, 2023
d0f0ec9
remove unused error helper
petedannemann Nov 15, 2023
8400a73
add missing meta file
petedannemann Nov 15, 2023
9b3b13c
Merge branch 'master' into feat/create-acls
petedannemann Nov 15, 2023
1e7cc3b
skip ACL tests when ACLs cannot be used due to kafka version limitations
petedannemann Nov 15, 2023
24fffd0
fix loadacls test
petedannemann Nov 15, 2023
32a9490
add more todos
petedannemann Nov 15, 2023
39349ad
add validation and set defaults
petedannemann Nov 16, 2023
0cd4dcf
don't use ioutil
petedannemann Nov 16, 2023
41981eb
move confirm to util package
petedannemann Nov 16, 2023
a6d7a5e
move confirm to util package
petedannemann Nov 16, 2023
3b4fd43
add create to README
petedannemann Nov 16, 2023
b1b5f21
use validation and setdefaults
petedannemann Nov 16, 2023
d749523
add example acl
petedannemann Nov 16, 2023
67f7178
fix formatting in readme
petedannemann Nov 16, 2023
3ae74de
use released version of kafka-go
petedannemann Nov 16, 2023
c7db718
fix spelling
petedannemann Nov 16, 2023
1f8d7b4
make invalid field more obvious
petedannemann Nov 16, 2023
23f7979
fix dryrun and skip confirm
petedannemann Nov 16, 2023
2c4c747
fix grammar
petedannemann Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ vendor/
build/

.vscode

# Emacs backups
*~
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ The `check` command validates that each topic config has the correct fields set
consistent with the associated cluster config. Unless `--validate-only` is set, it then
checks the topic config against the state of the topic in the corresponding cluster.

#### create
```
topicctl create [flags] [command]
```

The `create` command creates resources in the cluster from a configuration file.
Currently, only ACLs are supported. The create command is separate from the apply
command as it is intended for usage with immutable resources managed by topicctl.

#### get

```
Expand Down Expand Up @@ -419,6 +428,47 @@ This subcommand will not rebalance a topic if:
1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config
1. a topic does not exist in the kafka cluster

### ACLs

Sets of ACLs can be configured in a YAML file. The following is an
annotated example:

```yaml
meta:
name: acls-test # Name of the group of ACLs
cluster: my-cluster # Name of the cluster
environment: stage # Environment of the cluster
region: us-west-2 # Region of the cluster
description: | # Free-text description of the topic (optional)
Test topic in my-cluster.
labels: # Custom key-value pairs purposed for ACL bookkeeping (optional)
key1: value1
key2: value2

spec:
acls:
- resource:
type: topic # Type of resource (topic, group, cluster, etc.)
name: test-topic # Name of the resource to apply an ACL to
patternType: literal # Type of pattern (literal, prefixed, etc.)
principal: User:my-user # Principal to apply the ACL to
host: * # Host to apply the ACL to
permission: allow # Permission to apply (allow, deny)
operations: # List of operations to use for the ACLs
- read
- describe
```

The `cluster`, `environment`, and `region` fields are used for matching
against a cluster config and double-checking that the cluster we're applying
in is correct; they don't appear in any API calls.

See the [Kafka documentation](https://kafka.apache.org/documentation/#security_authz_primitives)
for more details on the parameters that can be set in the `acls` field.

Multiple groups of ACLs can be included in the same file, separated by `---` lines, provided
that they reference the same cluster.

## Tool safety

The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make
Expand All @@ -441,6 +491,9 @@ The `apply` subcommand can make changes, but under the following conditions:

The `reset-offsets` command can also make changes in the cluster and should be used carefully.

The `create` command can be used to create new resources in the cluster. It cannot be used with
mutable resources.

### Idempotency

Apply runs are designed to be idemponent- the effects should be the same no matter how many
Expand Down
201 changes: 201 additions & 0 deletions cmd/topicctl/subcmd/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package subcmd

import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/config"
"github.com/segmentio/topicctl/pkg/create"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var createCmd = &cobra.Command{
Use: "create [resource type]",
Short: "creates one or more resources",
PersistentPreRunE: createPreRun,
}

type createCmdConfig struct {
dryRun bool
pathPrefix string
skipConfirm bool

shared sharedOptions
}

var createConfig createCmdConfig

func init() {
createCmd.PersistentFlags().BoolVar(
&createConfig.dryRun,
"dry-run",
false,
"Do a dry-run",
)
createCmd.PersistentFlags().StringVar(
&createConfig.pathPrefix,
"path-prefix",
os.Getenv("TOPICCTL_ACL_PATH_PREFIX"),
"Prefix for ACL config paths",
)
createCmd.PersistentFlags().BoolVar(
&createConfig.skipConfirm,
"skip-confirm",
false,
"Skip confirmation prompts during creation process",
)

addSharedFlags(createCmd, &createConfig.shared)
createCmd.AddCommand(
createACLsCmd(),
)
RootCmd.AddCommand(createCmd)
}

func createPreRun(cmd *cobra.Command, args []string) error {
if err := RootCmd.PersistentPreRunE(cmd, args); err != nil {
return err
}
return createConfig.shared.validate()
}

func createACLsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "acls [acl configs]",
Short: "creates ACLs from configuration files",
Args: cobra.MinimumNArgs(1),
RunE: createACLRun,
PreRunE: createPreRun,
}

return cmd
}

func createACLRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()

// Keep a cache of the admin clients with the cluster config path as the key
adminClients := map[string]admin.Client{}

defer func() {
for _, adminClient := range adminClients {
adminClient.Close()
}
}()

matchCount := 0

for _, arg := range args {
if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) {
arg = filepath.Join(createConfig.pathPrefix, arg)
}

matches, err := filepath.Glob(arg)
if err != nil {
return err
}

for _, match := range matches {
matchCount++
if err := createACL(ctx, match, adminClients); err != nil {
return err
}
}
}

if matchCount == 0 {
return fmt.Errorf("No ACL configs match the provided args (%+v)", args)
}

return nil
}

func createACL(
ctx context.Context,
aclConfigPath string,
adminClients map[string]admin.Client,
) error {
clusterConfigPath, err := clusterConfigForACLCreate(aclConfigPath)
if err != nil {
return err
}

aclConfigs, err := config.LoadACLsFile(aclConfigPath)
if err != nil {
return err
}

clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv)
if err != nil {
return err
}

adminClient, ok := adminClients[clusterConfigPath]
if !ok {
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
createConfig.dryRun,
createConfig.shared.saslUsername,
createConfig.shared.saslPassword,
)
if err != nil {
return err
}
adminClients[clusterConfigPath] = adminClient
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)

for _, aclConfig := range aclConfigs {
aclConfig.SetDefaults()
log.Infof(
"Processing ACL %s in config %s with cluster config %s",
aclConfig.Meta.Name,
aclConfigPath,
clusterConfigPath,
)

creatorConfig := create.ACLCreatorConfig{
DryRun: createConfig.dryRun,
SkipConfirm: createConfig.skipConfirm,
ACLConfig: aclConfig,
ClusterConfig: clusterConfig,
}

if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil {
return err
}
}

return nil
}

func clusterConfigForACLCreate(aclConfigPath string) (string, error) {
if createConfig.shared.clusterConfig != "" {
return createConfig.shared.clusterConfig, nil
}

return filepath.Abs(
filepath.Join(
filepath.Dir(aclConfigPath),
"..",
"cluster.yaml",
),
)
}
5 changes: 3 additions & 2 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package subcmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
Expand Down Expand Up @@ -159,7 +160,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {

for _, topicConfig := range topicConfigs {
// topic config should be consistent with the cluster config
if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil {
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"fmt"
"strconv"

"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/groups"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -167,7 +167,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error {
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/topicctl/subcmd/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func runTestReader(ctx context.Context) error {
testerConfig.readConsumer,
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func runTestWriter(ctx context.Context) error {
testerConfig.writeRate,
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down
31 changes: 31 additions & 0 deletions examples/auth/acls/acl-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
meta:
name: acl-default
cluster: local-cluster-auth
environment: local-env
region: local-region
description: |
This is a default ACL for the local cluster.
It grants read and describe access to the topic `my-topic` and read access to the group `my-group`
to the user `default`.

spec:
acls:
- resource:
type: topic
name: my-topic
patternType: literal
principal: 'User:default'
host: '*'
permission: allow
operations:
- Read
- Describe
- resource:
type: group
name: my-group
patternType: prefixed
principal: 'User:default'
host: '*'
permission: allow
operations:
- Read
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/olekukonko/tablewriter v0.0.5
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/segmentio/kafka-go v0.4.44
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
github.com/segmentio/kafka-go v0.4.45
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
Expand Down
Loading
Loading