diff --git a/.gitignore b/.gitignore index d7298b1c..a51a3ca8 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ vendor/ build/ .vscode + +# Emacs backups +*~ \ No newline at end of file diff --git a/README.md b/README.md index 7e77dcc6..a1cd9279 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,15 @@ The `check` command validates that each topic config has the correct fields set consistent with the associated cluster config. Unless `--validate-only` is set, it then checks the topic config against the state of the topic in the corresponding cluster. +#### create +``` +topicctl create [flags] [command] +``` + +The `create` command creates resources in the cluster from a configuration file. +Currently, only ACLs are supported. The create command is separate from the apply +command as it is intended for usage with immutable resources managed by topicctl. + #### get ``` @@ -419,6 +428,47 @@ This subcommand will not rebalance a topic if: 1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config 1. a topic does not exist in the kafka cluster +### ACLs + +Sets of ACLs can be configured in a YAML file. The following is an +annotated example: + +```yaml +meta: + name: acls-test # Name of the group of ACLs + cluster: my-cluster # Name of the cluster + environment: stage # Environment of the cluster + region: us-west-2 # Region of the cluster + description: | # Free-text description of the topic (optional) + Test topic in my-cluster. + labels: # Custom key-value pairs purposed for ACL bookkeeping (optional) + key1: value1 + key2: value2 + +spec: + acls: + - resource: + type: topic # Type of resource (topic, group, cluster, etc.) + name: test-topic # Name of the resource to apply an ACL to + patternType: literal # Type of pattern (literal, prefixed, etc.) + principal: User:my-user # Principal to apply the ACL to + host: * # Host to apply the ACL to + permission: allow # Permission to apply (allow, deny) + operations: # List of operations to use for the ACLs + - read + - describe +``` + +The `cluster`, `environment`, and `region` fields are used for matching +against a cluster config and double-checking that the cluster we're applying +in is correct; they don't appear in any API calls. + +See the [Kafka documentation](https://kafka.apache.org/documentation/#security_authz_primitives) +for more details on the parameters that can be set in the `acls` field. + +Multiple groups of ACLs can be included in the same file, separated by `---` lines, provided +that they reference the same cluster. + ## Tool safety The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make @@ -441,6 +491,9 @@ The `apply` subcommand can make changes, but under the following conditions: The `reset-offsets` command can also make changes in the cluster and should be used carefully. +The `create` command can be used to create new resources in the cluster. It cannot be used with +mutable resources. + ### Idempotency Apply runs are designed to be idemponent- the effects should be the same no matter how many diff --git a/cmd/topicctl/subcmd/create.go b/cmd/topicctl/subcmd/create.go new file mode 100644 index 00000000..09a1299b --- /dev/null +++ b/cmd/topicctl/subcmd/create.go @@ -0,0 +1,201 @@ +package subcmd + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/cli" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/create" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var createCmd = &cobra.Command{ + Use: "create [resource type]", + Short: "creates one or more resources", + PersistentPreRunE: createPreRun, +} + +type createCmdConfig struct { + dryRun bool + pathPrefix string + skipConfirm bool + + shared sharedOptions +} + +var createConfig createCmdConfig + +func init() { + createCmd.PersistentFlags().BoolVar( + &createConfig.dryRun, + "dry-run", + false, + "Do a dry-run", + ) + createCmd.PersistentFlags().StringVar( + &createConfig.pathPrefix, + "path-prefix", + os.Getenv("TOPICCTL_ACL_PATH_PREFIX"), + "Prefix for ACL config paths", + ) + createCmd.PersistentFlags().BoolVar( + &createConfig.skipConfirm, + "skip-confirm", + false, + "Skip confirmation prompts during creation process", + ) + + addSharedFlags(createCmd, &createConfig.shared) + createCmd.AddCommand( + createACLsCmd(), + ) + RootCmd.AddCommand(createCmd) +} + +func createPreRun(cmd *cobra.Command, args []string) error { + if err := RootCmd.PersistentPreRunE(cmd, args); err != nil { + return err + } + return createConfig.shared.validate() +} + +func createACLsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "acls [acl configs]", + Short: "creates ACLs from configuration files", + Args: cobra.MinimumNArgs(1), + RunE: createACLRun, + PreRunE: createPreRun, + } + + return cmd +} + +func createACLRun(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-sigChan + cancel() + }() + + // Keep a cache of the admin clients with the cluster config path as the key + adminClients := map[string]admin.Client{} + + defer func() { + for _, adminClient := range adminClients { + adminClient.Close() + } + }() + + matchCount := 0 + + for _, arg := range args { + if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) { + arg = filepath.Join(createConfig.pathPrefix, arg) + } + + matches, err := filepath.Glob(arg) + if err != nil { + return err + } + + for _, match := range matches { + matchCount++ + if err := createACL(ctx, match, adminClients); err != nil { + return err + } + } + } + + if matchCount == 0 { + return fmt.Errorf("No ACL configs match the provided args (%+v)", args) + } + + return nil +} + +func createACL( + ctx context.Context, + aclConfigPath string, + adminClients map[string]admin.Client, +) error { + clusterConfigPath, err := clusterConfigForACLCreate(aclConfigPath) + if err != nil { + return err + } + + aclConfigs, err := config.LoadACLsFile(aclConfigPath) + if err != nil { + return err + } + + clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv) + if err != nil { + return err + } + + adminClient, ok := adminClients[clusterConfigPath] + if !ok { + adminClient, err = clusterConfig.NewAdminClient( + ctx, + nil, + createConfig.dryRun, + createConfig.shared.saslUsername, + createConfig.shared.saslPassword, + ) + if err != nil { + return err + } + adminClients[clusterConfigPath] = adminClient + } + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) + + for _, aclConfig := range aclConfigs { + aclConfig.SetDefaults() + log.Infof( + "Processing ACL %s in config %s with cluster config %s", + aclConfig.Meta.Name, + aclConfigPath, + clusterConfigPath, + ) + + creatorConfig := create.ACLCreatorConfig{ + DryRun: createConfig.dryRun, + SkipConfirm: createConfig.skipConfirm, + ACLConfig: aclConfig, + ClusterConfig: clusterConfig, + } + + if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil { + return err + } + } + + return nil +} + +func clusterConfigForACLCreate(aclConfigPath string) (string, error) { + if createConfig.shared.clusterConfig != "" { + return createConfig.shared.clusterConfig, nil + } + + return filepath.Abs( + filepath.Join( + filepath.Dir(aclConfigPath), + "..", + "cluster.yaml", + ), + ) +} diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 7da0096f..a85150d6 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -3,7 +3,6 @@ package subcmd import ( "context" "fmt" - "github.com/spf13/cobra" "os" "os/signal" "path/filepath" @@ -11,6 +10,8 @@ import ( "syscall" "time" + "github.com/spf13/cobra" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" @@ -159,7 +160,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { for _, topicConfig := range topicConfigs { // topic config should be consistent with the cluster config - if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { + if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) continue } diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index 831e049d..acd9a049 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -6,9 +6,9 @@ import ( "fmt" "strconv" - "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/groups" + "github.com/segmentio/topicctl/pkg/util" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -167,7 +167,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { "Please ensure that all other consumers are stopped, otherwise the reset might be overridden.", ) - ok, _ := apply.Confirm("OK to continue?", false) + ok, _ := util.Confirm("OK to continue?", false) if !ok { return errors.New("Stopping because of user response") } diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index 4c75a9de..03bd18eb 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -10,7 +10,7 @@ import ( "time" "github.com/segmentio/kafka-go" - "github.com/segmentio/topicctl/pkg/apply" + "github.com/segmentio/topicctl/pkg/util" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -104,7 +104,7 @@ func runTestReader(ctx context.Context) error { testerConfig.readConsumer, ) - ok, _ := apply.Confirm("OK to continue?", false) + ok, _ := util.Confirm("OK to continue?", false) if !ok { return errors.New("Stopping because of user response") } @@ -153,7 +153,7 @@ func runTestWriter(ctx context.Context) error { testerConfig.writeRate, ) - ok, _ := apply.Confirm("OK to continue?", false) + ok, _ := util.Confirm("OK to continue?", false) if !ok { return errors.New("Stopping because of user response") } diff --git a/examples/auth/acls/acl-default.yaml b/examples/auth/acls/acl-default.yaml new file mode 100644 index 00000000..d28898b4 --- /dev/null +++ b/examples/auth/acls/acl-default.yaml @@ -0,0 +1,31 @@ +meta: + name: acl-default + cluster: local-cluster-auth + environment: local-env + region: local-region + description: | + This is a default ACL for the local cluster. + It grants read and describe access to the topic `my-topic` and read access to the group `my-group` + to the user `default`. + +spec: + acls: + - resource: + type: topic + name: my-topic + patternType: literal + principal: 'User:default' + host: '*' + permission: allow + operations: + - Read + - Describe + - resource: + type: group + name: my-group + patternType: prefixed + principal: 'User:default' + host: '*' + permission: allow + operations: + - Read diff --git a/go.mod b/go.mod index 083f473e..66f80b7d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/olekukonko/tablewriter v0.0.5 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da - github.com/segmentio/kafka-go v0.4.44 + github.com/segmentio/kafka-go v0.4.45 github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 diff --git a/go.sum b/go.sum index f20d07f4..20b055b5 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= -github.com/segmentio/kafka-go v0.4.44 h1:Vjjksniy0WSTZ7CuVJrz1k04UoZeTc77UV6Yyk6tLY4= -github.com/segmentio/kafka-go v0.4.44/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E= +github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070 h1:ng1Z/x5LLOIrzgWUOtypsCkR+dHTux7slqOCVkuwQBo= github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070/go.mod h1:IjMUGcOJoATsnlqAProGN1ezXeEgU5GCWr1/EzmkEMA= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 48c4bd34..0d5b2429 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -110,6 +110,7 @@ func NewBrokerAdminClient( if _, ok := maxVersions["DescribeUserScramCredentials"]; ok { supportedFeatures.Users = true } + log.Debugf("Supported features: %+v", supportedFeatures) adminClient := &BrokerAdminClient{ @@ -407,13 +408,16 @@ func (c *BrokerAdminClient) GetUsers( return nil, err } - if err = util.DescribeUserScramCredentialsResponseResultsError(resp.Results); err != nil { - return nil, err - } - results := []UserInfo{} for _, result := range resp.Results { + if result.Error != nil { + if errors.Is(result.Error, kafka.ResourceNotFound) { + log.Debugf("Skipping over user %s because it does not exist", result.User) + continue + } + return nil, fmt.Errorf("Error getting description of user %s: %+v", result.User, result.Error) + } var credentials []CredentialInfo for _, credential := range result.CredentialInfos { credentials = append(credentials, CredentialInfo{ @@ -808,7 +812,7 @@ func (c *BrokerAdminClient) GetACLs( return aclinfos, nil } -// CreateACLs creates an ACL in the cluster. +// CreateACLs creates ACLs in the cluster. func (c *BrokerAdminClient) CreateACLs( ctx context.Context, acls []kafka.ACLEntry, diff --git a/pkg/admin/client.go b/pkg/admin/client.go index f872ad90..93d20e00 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -77,7 +77,7 @@ type Client interface { config kafka.TopicConfig, ) error - // Create ACLs creates ACLs in the cluster. + // CreateACLs creates ACLs in the cluster. CreateACLs( ctx context.Context, acls []kafka.ACLEntry, diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 6d55048a..25b0084b 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -130,7 +130,7 @@ func (t *TopicApplier) Apply(ctx context.Context) error { if err := t.topicConfig.Validate(len(brokerRacks)); err != nil { return err } - if err := config.CheckConsistency(t.topicConfig, t.clusterConfig); err != nil { + if err := config.CheckConsistency(t.topicConfig.Meta, t.clusterConfig); err != nil { return err } @@ -163,7 +163,7 @@ func (t *TopicApplier) applyNewTopic(ctx context.Context) error { FormatNewTopicConfig(newTopicConfig), ) - ok, _ := Confirm("OK to continue?", t.config.SkipConfirm) + ok, _ := util.Confirm("OK to continue?", t.config.SkipConfirm) if !ok { return errors.New("Stopping because of user response") } @@ -283,7 +283,7 @@ func (t *TopicApplier) checkExistingState( if t.config.DryRun { log.Infof("Skipping update because dryRun is set to true") } else { - ok, err := Confirm("OK to remove these?", t.config.SkipConfirm) + ok, err := util.Confirm("OK to remove these?", t.config.SkipConfirm) if err != nil { return err } else if !ok { @@ -326,7 +326,7 @@ func (t *TopicApplier) checkExistingState( if t.config.DryRun { log.Infof("Skipping update because dryRun is set to true") } else { - ok, err := Confirm("OK to remove broker throttles?", t.config.SkipConfirm) + ok, err := util.Confirm("OK to remove broker throttles?", t.config.SkipConfirm) if err != nil { return err } else if !ok { @@ -413,7 +413,7 @@ func (t *TopicApplier) updateSettings( return nil } - ok, _ := Confirm( + ok, _ := util.Confirm( "OK to update to the new values in the topic config?", t.config.SkipConfirm, ) @@ -576,7 +576,7 @@ func (t *TopicApplier) updatePartitionsHelper( return nil } - ok, _ := Confirm("OK to apply?", t.config.SkipConfirm) + ok, _ := util.Confirm("OK to apply?", t.config.SkipConfirm) if !ok { return errors.New("Stopping because of user response") } @@ -678,7 +678,7 @@ func (t *TopicApplier) updatePlacement( desiredPlacement, ) - ok, _ := Confirm( + ok, _ := util.Confirm( fmt.Sprintf("OK to apply %s despite having unbalanced leaders?", desiredPlacement), t.config.SkipConfirm || t.config.DryRun, ) @@ -841,7 +841,7 @@ func (t *TopicApplier) updatePlacementRunner( log.Warnf("Autocontinue flag detected, user will not be prompted each round") } - ok, _ := Confirm("OK to apply?", t.config.SkipConfirm) + ok, _ := util.Confirm("OK to apply?", t.config.SkipConfirm) if !ok { return errors.New("Stopping because of user response") } @@ -917,7 +917,7 @@ func (t *TopicApplier) updatePlacementRunner( if t.config.AutoContinueRebalance { log.Infof("Autocontinuing to next round") } else { - ok, _ := Confirm("OK to continue?", t.config.SkipConfirm) + ok, _ := util.Confirm("OK to continue?", t.config.SkipConfirm) if !ok { return errors.New("Stopping because of user response") } @@ -1249,7 +1249,7 @@ func (t *TopicApplier) updateLeaders( batchSize = len(wrongLeaders) } - ok, _ := Confirm( + ok, _ := util.Confirm( fmt.Sprintf( "OK to run leader elections (in batches of %d partitions each) ?", batchSize, diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index 5f8e0b88..450beaec 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -20,7 +20,7 @@ func TestApplyBasicUpdates(t *testing.T) { topicName := util.RandomString("apply-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -88,7 +88,7 @@ func TestApplyPlacementUpdates(t *testing.T) { topicName := util.RandomString("apply-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -206,7 +206,7 @@ func TestApplyRebalance(t *testing.T) { topicName := util.RandomString("apply-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -285,7 +285,7 @@ func TestApplyExtendPartitions(t *testing.T) { topicName := util.RandomString("apply-topic-extend-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -376,7 +376,7 @@ func TestApplyExistingThrottles(t *testing.T) { topicName2 := util.RandomString("apply-topic-extend-", 6) topicConfig1 := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName1, Cluster: "test-cluster", Region: "test-region", @@ -397,7 +397,7 @@ func TestApplyExistingThrottles(t *testing.T) { }, } topicConfig2 := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName2, Cluster: "test-cluster", Region: "test-region", @@ -555,7 +555,7 @@ func TestApplyDryRun(t *testing.T) { topicName := util.RandomString("apply-topic-dry-run-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -620,7 +620,7 @@ func TestApplyThrottles(t *testing.T) { topicName := util.RandomString("apply-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -858,7 +858,7 @@ func TestApplyOverrides(t *testing.T) { topicName := util.RandomString("apply-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", diff --git a/pkg/check/check.go b/pkg/check/check.go index 7b7463c6..70be6098 100644 --- a/pkg/check/check.go +++ b/pkg/check/check.go @@ -48,7 +48,7 @@ func CheckTopic(ctx context.Context, config CheckConfig) (TopicCheckResults, err Name: CheckNameConfigsConsistent, }, ) - if err := tconfig.CheckConsistency(config.TopicConfig, config.ClusterConfig); err == nil { + if err := tconfig.CheckConsistency(config.TopicConfig.Meta, config.ClusterConfig); err == nil { results.UpdateLastResult(true, "") } else { results.UpdateLastResult( diff --git a/pkg/check/check_test.go b/pkg/check/check_test.go index b070d824..455ba135 100644 --- a/pkg/check/check_test.go +++ b/pkg/check/check_test.go @@ -33,7 +33,7 @@ func TestCheck(t *testing.T) { topicName := util.RandomString("check-topic-", 6) topicConfig := config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", @@ -106,7 +106,7 @@ func TestCheck(t *testing.T) { { description: "topic does not exist", checkTopicConfig: config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: "non-existent-topic", Cluster: "non-matching-cluster", Region: "test-region", @@ -134,7 +134,7 @@ func TestCheck(t *testing.T) { { description: "topic does not exist", checkTopicConfig: config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: "non-existent-topic", Cluster: "test-cluster", Region: "test-region", @@ -163,7 +163,7 @@ func TestCheck(t *testing.T) { { description: "wrong configuration", checkTopicConfig: config.TopicConfig{ - Meta: config.TopicMeta{ + Meta: config.ResourceMeta{ Name: topicName, Cluster: "test-cluster", Region: "test-region", diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 71abde33..898f4f45 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -18,6 +18,7 @@ import ( "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/create" "github.com/segmentio/topicctl/pkg/groups" "github.com/segmentio/topicctl/pkg/messages" log "github.com/sirupsen/logrus" @@ -110,6 +111,38 @@ func (c *CLIRunner) ApplyTopic( return nil } +// CreateACL does an apply run according to the spec in the argument config. +func (c *CLIRunner) CreateACL( + ctx context.Context, + creatorConfig create.ACLCreatorConfig, +) error { + creator, err := create.NewACLCreator( + ctx, + c.adminClient, + creatorConfig, + ) + if err != nil { + return err + } + + highlighter := color.New(color.FgYellow, color.Bold).SprintfFunc() + + c.printer( + "Starting creation for ACLs %s in environment %s, cluster %s", + highlighter(creatorConfig.ACLConfig.Meta.Name), + highlighter(creatorConfig.ACLConfig.Meta.Environment), + highlighter(creatorConfig.ACLConfig.Meta.Cluster), + ) + + err = creator.Create(ctx) + if err != nil { + return err + } + + c.printer("Create completed successfully!") + return nil +} + // BootstrapTopics creates configs for one or more topics based on their current state in the // cluster. func (c *CLIRunner) BootstrapTopics( diff --git a/pkg/config/acl.go b/pkg/config/acl.go new file mode 100644 index 00000000..2bb336ff --- /dev/null +++ b/pkg/config/acl.go @@ -0,0 +1,93 @@ +package config + +import ( + "errors" + + "github.com/hashicorp/go-multierror" + "github.com/segmentio/kafka-go" +) + +type ACLConfig struct { + Meta ResourceMeta `json:"meta"` + Spec ACLSpec `json:"spec"` +} + +type ACLSpec struct { + ACLs []ACL `json:"acls"` +} + +type ACL struct { + Resource ACLResource `json:"resource"` + Operations []kafka.ACLOperationType `json:"operations"` +} + +type ACLResource struct { + Type kafka.ResourceType `json:"type"` + Name string `json:"name"` + PatternType kafka.PatternType `json:"patternType"` + Principal string `json:"principal"` + Host string `json:"host"` + Permission kafka.ACLPermissionType `json:"permission"` +} + +func (a ACLConfig) ToNewACLEntries() []kafka.ACLEntry { + acls := []kafka.ACLEntry{} + + for _, acl := range a.Spec.ACLs { + for _, operation := range acl.Operations { + acls = append(acls, kafka.ACLEntry{ + ResourceType: acl.Resource.Type, + ResourceName: acl.Resource.Name, + ResourcePatternType: acl.Resource.PatternType, + Principal: acl.Resource.Principal, + Host: acl.Resource.Host, + Operation: operation, + PermissionType: acl.Resource.Permission, + }) + } + } + return acls +} + +// SetDefaults sets the default host and permission for each ACL in an ACL config +// if these aren't set +func (a *ACLConfig) SetDefaults() { + for i, acl := range a.Spec.ACLs { + if acl.Resource.Host == "" { + a.Spec.ACLs[i].Resource.Host = "*" + } + if acl.Resource.Permission == kafka.ACLPermissionTypeUnknown { + a.Spec.ACLs[i].Resource.Permission = kafka.ACLPermissionTypeAllow + } + } +} + +// Validate evaluates whether the ACL config is valid. +func (a *ACLConfig) Validate() error { + var err error + + err = a.Meta.Validate() + + for _, acl := range a.Spec.ACLs { + if acl.Resource.Type == kafka.ResourceTypeUnknown { + err = multierror.Append(err, errors.New("ACL resource type cannot be unknown")) + } + if acl.Resource.Name == "" { + err = multierror.Append(err, errors.New("ACL resource name cannot be empty")) + } + if acl.Resource.PatternType == kafka.PatternTypeUnknown { + err = multierror.Append(err, errors.New("ACL resource pattern type cannot be unknown")) + } + if acl.Resource.Principal == "" { + err = multierror.Append(err, errors.New("ACL resource principal cannot be empty")) + } + + for _, operation := range acl.Operations { + if operation == kafka.ACLOperationTypeUnknown { + err = multierror.Append(err, errors.New("ACL operation cannot be unknown")) + } + } + } + + return err +} diff --git a/pkg/config/acl_test.go b/pkg/config/acl_test.go new file mode 100644 index 00000000..9f3c6a90 --- /dev/null +++ b/pkg/config/acl_test.go @@ -0,0 +1,267 @@ +package config + +import ( + "testing" + + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" +) + +func TestACLValidate(t *testing.T) { + type testCase struct { + description string + aclConfig ACLConfig + expError bool + } + + testCases := []testCase{ + { + description: "valid ACL config", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: "test-topic", + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expError: false, + }, + { + description: "unknown resource type", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeUnknown, + Name: "test-topic", + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expError: true, + }, + { + description: "empty resource name", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expError: true, + }, + { + description: "unknown resource pattern type", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + PatternType: kafka.PatternTypeUnknown, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expError: true, + }, + { + description: "empty principal", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + PatternType: kafka.PatternTypeUnknown, + Principal: "", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expError: true, + }, + { + description: "unknown ACL operation type", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + PatternType: kafka.PatternTypeUnknown, + Principal: "", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeUnknown, + }, + }, + }, + }, + }, + expError: true, + }, + } + + for _, testCase := range testCases { + testCase.aclConfig.SetDefaults() + err := testCase.aclConfig.Validate() + if testCase.expError { + assert.Error(t, err, testCase.description) + } else { + assert.NoError(t, err, testCase.description) + } + } +} + +func TestACLSetDefaults(t *testing.T) { + type testCase struct { + description string + aclConfig ACLConfig + expConfig ACLConfig + } + + testCases := []testCase{ + { + description: "set defaults", + aclConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: "test-topic", + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Permission: kafka.ACLPermissionTypeUnknown, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + expConfig: ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: "test-topic", + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + testCase.aclConfig.SetDefaults() + assert.Equal(t, testCase.expConfig, testCase.aclConfig, testCase.description) + } +} diff --git a/pkg/config/load.go b/pkg/config/load.go index 6420e87f..f79200b6 100644 --- a/pkg/config/load.go +++ b/pkg/config/load.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "os" "path/filepath" "regexp" @@ -82,27 +83,66 @@ func LoadTopicsFile(path string) ([]TopicConfig, error) { func LoadTopicBytes(contents []byte) (TopicConfig, error) { config := TopicConfig{} err := unmarshalYAMLStrict(contents, &config) + fmt.Println(config) + return config, err +} + +// LoadACLsFile loads one or more ACLConfigs from a path to a YAML file. +func LoadACLsFile(path string) ([]ACLConfig, error) { + contents, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + contents = []byte(os.ExpandEnv(string(contents))) + + trimmedFile := strings.TrimSpace(string(contents)) + aclStrs := sep.Split(trimmedFile, -1) + + aclConfigs := []ACLConfig{} + + for _, aclStr := range aclStrs { + aclStr = strings.TrimSpace(aclStr) + if isEmpty(aclStr) { + continue + } + + aclConfig, err := LoadACLBytes([]byte(aclStr)) + if err != nil { + return nil, err + } + + aclConfigs = append(aclConfigs, aclConfig) + } + + return aclConfigs, nil +} + +// LoadACLBytes loads an ACLConfig from YAML bytes. +func LoadACLBytes(contents []byte) (ACLConfig, error) { + config := ACLConfig{} + err := unmarshalYAMLStrict(contents, &config) return config, err } // CheckConsistency verifies that the argument topic config is consistent with the argument // cluster, e.g. has the same environment and region, etc. -func CheckConsistency(topicConfig TopicConfig, clusterConfig ClusterConfig) error { +func CheckConsistency(resourceMeta ResourceMeta, clusterConfig ClusterConfig) error { var err error - if topicConfig.Meta.Cluster != clusterConfig.Meta.Name { + if resourceMeta.Cluster != clusterConfig.Meta.Name { err = multierror.Append( err, errors.New("Topic cluster name does not match name in cluster config"), ) } - if topicConfig.Meta.Environment != clusterConfig.Meta.Environment { + if resourceMeta.Environment != clusterConfig.Meta.Environment { err = multierror.Append( err, errors.New("Topic environment does not match cluster environment"), ) } - if topicConfig.Meta.Region != clusterConfig.Meta.Region { + if resourceMeta.Region != clusterConfig.Meta.Region { err = multierror.Append( err, errors.New("Topic region does not match cluster region"), diff --git a/pkg/config/load_test.go b/pkg/config/load_test.go index f784e3f3..66bc36d3 100644 --- a/pkg/config/load_test.go +++ b/pkg/config/load_test.go @@ -4,6 +4,7 @@ import ( "os" "testing" + "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -60,7 +61,7 @@ func TestLoadTopicsFile(t *testing.T) { assert.Equal( t, TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "topic-test", Cluster: "test-cluster", Region: "test-region", @@ -104,6 +105,67 @@ func TestLoadTopicsFile(t *testing.T) { assert.Equal(t, "topic-test2", topicConfigs[1].Meta.Name) } +func TestLoadACLsFile(t *testing.T) { + aclConfigs, err := LoadACLsFile("testdata/test-cluster/acls/acl-test.yaml") + require.NoError(t, err) + assert.Equal(t, 1, len(aclConfigs)) + aclConfig := aclConfigs[0] + + assert.Equal( + t, + ACLConfig{ + Meta: ResourceMeta{ + Name: "acl-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + Description: "Test acl\n", + }, + Spec: ACLSpec{ + ACLs: []ACL{ + { + Resource: ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: "test-topic", + PatternType: kafka.PatternTypeLiteral, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + kafka.ACLOperationTypeDescribe, + }, + }, + { + Resource: ACLResource{ + Type: kafka.ResourceTypeGroup, + Name: "test-group", + PatternType: kafka.PatternTypePrefixed, + Principal: "User:Alice", + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + }, + aclConfig, + ) + + invalidAclConfigs, err := LoadACLsFile("testdata/test-cluster/acls/acl-test-invalid.yaml") + assert.Equal(t, 0, len(invalidAclConfigs)) + require.Error(t, err) + + multiAclConfigs, err := LoadACLsFile("testdata/test-cluster/acls/acl-test-multi.yaml") + assert.Equal(t, 2, len(multiAclConfigs)) + assert.Equal(t, "acl-test1", multiAclConfigs[0].Meta.Name) + assert.Equal(t, "acl-test2", multiAclConfigs[1].Meta.Name) +} + func TestCheckConsistency(t *testing.T) { os.Setenv("K2_TEST_ENV_VAR", "test-region") defer os.Unsetenv("K2_TEST_ENV_VAR") @@ -128,6 +190,17 @@ func TestCheckConsistency(t *testing.T) { assert.NoError(t, err) assert.NoError(t, topicConfig.Validate(3)) - assert.NoError(t, CheckConsistency(topicConfig, clusterConfig)) - assert.Error(t, CheckConsistency(topicConfigNoMatch, clusterConfig)) + assert.NoError(t, CheckConsistency(topicConfig.Meta, clusterConfig)) + assert.Error(t, CheckConsistency(topicConfigNoMatch.Meta, clusterConfig)) + + aclConfigs, err := LoadACLsFile("testdata/test-cluster/acls/acl-test.yaml") + assert.Equal(t, 1, len(aclConfigs)) + assert.NoError(t, err) + + aclConfigsNoMatches, err := LoadACLsFile("testdata/test-cluster/acls/acl-test-no-match.yaml") + assert.Equal(t, 1, len(aclConfigsNoMatches)) + assert.NoError(t, err) + + assert.NoError(t, CheckConsistency(aclConfigs[0].Meta, clusterConfig)) + assert.Error(t, CheckConsistency(aclConfigsNoMatches[0].Meta, clusterConfig)) } diff --git a/pkg/config/meta.go b/pkg/config/meta.go new file mode 100644 index 00000000..8415ab59 --- /dev/null +++ b/pkg/config/meta.go @@ -0,0 +1,40 @@ +package config + +import ( + "errors" + + "github.com/hashicorp/go-multierror" +) + +// ResourceMeta stores the (mostly immutable) metadata associated with a resource. +// Inspired by the meta structs in Kubernetes objects. +type ResourceMeta struct { + Name string `json:"name"` + Cluster string `json:"cluster"` + Region string `json:"region"` + Environment string `json:"environment"` + Description string `json:"description"` + Labels map[string]string `json:"labels"` + + // Consumers is a list of consumers who are expected to consume from this + // topic. + Consumers []string `json:"consumers,omitempty"` +} + +// Validate evalutes whether the ResourceMeta is valid. +func (rm *ResourceMeta) Validate() error { + var err error + if rm.Name == "" { + err = multierror.Append(err, errors.New("Name must be set")) + } + if rm.Cluster == "" { + err = multierror.Append(err, errors.New("Cluster must be set")) + } + if rm.Region == "" { + err = multierror.Append(err, errors.New("Region must be set")) + } + if rm.Environment == "" { + err = multierror.Append(err, errors.New("Environment must be set")) + } + return err +} diff --git a/pkg/config/meta_test.go b/pkg/config/meta_test.go new file mode 100644 index 00000000..0d20cd51 --- /dev/null +++ b/pkg/config/meta_test.go @@ -0,0 +1,47 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMetaValidate(t *testing.T) { + type testCase struct { + description string + meta ResourceMeta + expError bool + } + + testCases := []testCase{ + { + description: "valid meta", + meta: ResourceMeta{ + Name: "test-topic", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + Description: "test-description", + }, + expError: false, + }, + { + description: "meta missing fields", + meta: ResourceMeta{ + Name: "test-topic", + Environment: "test-environment", + Description: "Bootstrapped via topicctl bootstrap", + }, + expError: true, + }, + } + + for _, testCase := range testCases { + err := testCase.meta.Validate() + if testCase.expError { + assert.Error(t, err, testCase.description) + } else { + assert.NoError(t, err, testCase.description) + } + } +} diff --git a/pkg/config/testdata/test-cluster/acls/acl-test-invalid.yaml b/pkg/config/testdata/test-cluster/acls/acl-test-invalid.yaml new file mode 100644 index 00000000..52ffd971 --- /dev/null +++ b/pkg/config/testdata/test-cluster/acls/acl-test-invalid.yaml @@ -0,0 +1,23 @@ +meta: + name: acl-test + cluster: test-cluster + environment: test-env + region: test-region + description: | + Test acl + +spec: + acls: + - resource: + type: topic + name: test-topic + patternType: literal + operations: + - read + - describe + - resource: + type: group + name: test-group + patternType: invalid + operations: + - read diff --git a/pkg/config/testdata/test-cluster/acls/acl-test-multi.yaml b/pkg/config/testdata/test-cluster/acls/acl-test-multi.yaml new file mode 100644 index 00000000..175897b3 --- /dev/null +++ b/pkg/config/testdata/test-cluster/acls/acl-test-multi.yaml @@ -0,0 +1,52 @@ +# This is an empty config. +--- +meta: + name: acl-test1 + cluster: test-cluster + environment: test-env + region: test-region + description: | + Test acl + +spec: + acls: + - resource: + type: topic + name: test-topic + patternType: literal + operations: + - read + - describe + - resource: + type: group + name: test-group + patternType: prefixed + operations: + - read +--- +meta: + name: acl-test2 + cluster: test-cluster + environment: test-env + region: test-region + description: | + Test acl + +spec: + acls: + - resource: + type: topic + name: test-topic + patternType: literal + operations: + - read + - describe + - resource: + type: group + name: test-group + patternType: prefixed + operations: + - read +--- +# Another empty one + diff --git a/pkg/config/testdata/test-cluster/acls/acl-test-no-match.yaml b/pkg/config/testdata/test-cluster/acls/acl-test-no-match.yaml new file mode 100644 index 00000000..02fe8617 --- /dev/null +++ b/pkg/config/testdata/test-cluster/acls/acl-test-no-match.yaml @@ -0,0 +1,29 @@ +meta: + name: acl-test-no-match + cluster: test-cluster + environment: bad-env + region: test-region + description: | + Test acl + +spec: + acls: + - resource: + type: topic + name: test-topic + patternType: literal + principal: 'User:Alice' + host: "*" + permission: allow + operations: + - read + - describe + - resource: + type: group + name: test-group + patternType: prefixed + principal: 'User:Alice' + host: "*" + permission: allow + operations: + - read diff --git a/pkg/config/testdata/test-cluster/acls/acl-test.yaml b/pkg/config/testdata/test-cluster/acls/acl-test.yaml new file mode 100644 index 00000000..b590e97f --- /dev/null +++ b/pkg/config/testdata/test-cluster/acls/acl-test.yaml @@ -0,0 +1,29 @@ +meta: + name: acl-test + cluster: test-cluster + environment: test-env + region: test-region + description: | + Test acl + +spec: + acls: + - resource: + type: topic + name: test-topic + patternType: literal + principal: 'User:Alice' + host: "*" + permission: allow + operations: + - read + - describe + - resource: + type: group + name: test-group + patternType: prefixed + principal: 'User:Alice' + host: "*" + permission: allow + operations: + - read diff --git a/pkg/config/topic.go b/pkg/config/topic.go index 5b7240cb..370b877e 100644 --- a/pkg/config/topic.go +++ b/pkg/config/topic.go @@ -77,23 +77,8 @@ var allPickerMethods = []PickerMethod{ // TopicConfig represents the desired configuration of a topic. type TopicConfig struct { - Meta TopicMeta `json:"meta"` - Spec TopicSpec `json:"spec"` -} - -// TopicMeta stores the (mostly immutable) metadata associated with a topic. -// Inspired by the meta structs in Kubernetes objects. -type TopicMeta struct { - Name string `json:"name"` - Cluster string `json:"cluster"` - Region string `json:"region"` - Environment string `json:"environment"` - Description string `json:"description"` - Labels map[string]string `json:"labels"` - - // Consumers is a list of consumers who are expected to consume from this - // topic. - Consumers []string `json:"consumers,omitempty"` + Meta ResourceMeta `json:"meta"` + Spec TopicSpec `json:"spec"` } // TopicSpec stores the (mutable) specification for a topic. @@ -181,18 +166,8 @@ func (t *TopicConfig) SetDefaults() { func (t TopicConfig) Validate(numRacks int) error { var err error - if t.Meta.Name == "" { - err = multierror.Append(err, errors.New("Name must be set")) - } - if t.Meta.Cluster == "" { - err = multierror.Append(err, errors.New("Cluster must be set")) - } - if t.Meta.Region == "" { - err = multierror.Append(err, errors.New("Region must be set")) - } - if t.Meta.Environment == "" { - err = multierror.Append(err, errors.New("Environment must be set")) - } + err = t.Meta.Validate() + if t.Spec.Partitions <= 0 { err = multierror.Append(err, errors.New("Partitions must be a positive number")) } @@ -340,7 +315,7 @@ func TopicConfigFromTopicInfo( topicInfo admin.TopicInfo, ) TopicConfig { topicConfig := TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: topicInfo.Name, Cluster: clusterConfig.Meta.Name, Region: clusterConfig.Meta.Region, diff --git a/pkg/config/topic_test.go b/pkg/config/topic_test.go index 163f4980..764010d8 100644 --- a/pkg/config/topic_test.go +++ b/pkg/config/topic_test.go @@ -19,7 +19,7 @@ func TestTopicValidate(t *testing.T) { { description: "all good any placement", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -43,7 +43,7 @@ func TestTopicValidate(t *testing.T) { { description: "all good balanced-leaders placement", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -65,7 +65,7 @@ func TestTopicValidate(t *testing.T) { { description: "all good static placement", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -90,7 +90,7 @@ func TestTopicValidate(t *testing.T) { { description: "all good static-in-rack placement", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -115,7 +115,7 @@ func TestTopicValidate(t *testing.T) { { description: "missing meta fields", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Environment: "test-environment", Description: "Bootstrapped via topicctl bootstrap", @@ -134,7 +134,7 @@ func TestTopicValidate(t *testing.T) { { description: "double-setting retention", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -158,7 +158,7 @@ func TestTopicValidate(t *testing.T) { { description: "all good double-setting local retention", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -184,7 +184,7 @@ func TestTopicValidate(t *testing.T) { { description: "setting local retention without enabling remote storage", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -209,7 +209,7 @@ func TestTopicValidate(t *testing.T) { { description: "balanced leaders invalid rack count", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -230,7 +230,7 @@ func TestTopicValidate(t *testing.T) { { description: "static placement invalid num partitions", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -254,7 +254,7 @@ func TestTopicValidate(t *testing.T) { { description: "static placement invalid replication", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -278,7 +278,7 @@ func TestTopicValidate(t *testing.T) { { description: "static-in-rack placement invalid partition count", topicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -355,7 +355,7 @@ func TestTopicConfigFromTopicInfo(t *testing.T) { Version: 1, }, expTopicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -408,7 +408,7 @@ func TestTopicConfigFromTopicInfo(t *testing.T) { Version: 1, }, expTopicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", @@ -458,7 +458,7 @@ func TestTopicConfigFromTopicInfo(t *testing.T) { Version: 1, }, expTopicConfig: TopicConfig{ - Meta: TopicMeta{ + Meta: ResourceMeta{ Name: "test-topic", Cluster: "test-cluster", Region: "test-region", diff --git a/pkg/create/acl.go b/pkg/create/acl.go new file mode 100644 index 00000000..ddf488e0 --- /dev/null +++ b/pkg/create/acl.go @@ -0,0 +1,141 @@ +package create + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" +) + +// ACLCreatorConfig contains the configuration for an ACL creator. +type ACLCreatorConfig struct { + ClusterConfig config.ClusterConfig + DryRun bool + SkipConfirm bool + ACLConfig config.ACLConfig +} + +type ACLCreator struct { + config ACLCreatorConfig + adminClient admin.Client + + clusterConfig config.ClusterConfig + aclConfig config.ACLConfig +} + +func NewACLCreator( + ctx context.Context, + adminClient admin.Client, + creatorConfig ACLCreatorConfig, +) (*ACLCreator, error) { + if !adminClient.GetSupportedFeatures().ACLs { + return nil, fmt.Errorf("ACLs are not supported by this cluster") + } + + return &ACLCreator{ + config: creatorConfig, + adminClient: adminClient, + clusterConfig: creatorConfig.ClusterConfig, + aclConfig: creatorConfig.ACLConfig, + }, nil +} + +func (a *ACLCreator) Create(ctx context.Context) error { + log.Info("Validating configs...") + + if err := a.clusterConfig.Validate(); err != nil { + return err + } + + if err := a.aclConfig.Validate(); err != nil { + return err + } + + if err := config.CheckConsistency(a.aclConfig.Meta, a.clusterConfig); err != nil { + return err + } + + log.Info("Checking if ACLs already exist...") + + acls := a.aclConfig.ToNewACLEntries() + + allExistingACLs := []kafka.ACLEntry{} + newACLs := []kafka.ACLEntry{} + + for _, acl := range acls { + existingACLs, err := a.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: acl.ResourceType, + ResourceNameFilter: acl.ResourceName, + ResourcePatternTypeFilter: acl.ResourcePatternType, + PrincipalFilter: acl.Principal, + HostFilter: acl.Host, + Operation: acl.Operation, + PermissionType: acl.PermissionType, + }) + if err != nil { + return fmt.Errorf("error checking for existing ACL (%v): %v", acl, err) + } + if len(existingACLs) > 0 { + allExistingACLs = append(allExistingACLs, acl) + } else { + newACLs = append(newACLs, acl) + } + } + + if len(allExistingACLs) > 0 { + log.Infof( + "Found %d existing ACLs:\n%s", + len(allExistingACLs), + formatNewACLsConfig(allExistingACLs), + ) + } + + if len(newACLs) == 0 { + log.Infof("No ACLs to create") + return nil + } + + if a.config.DryRun { + log.Infof( + "Would create ACLs with config %+v", + formatNewACLsConfig(newACLs), + ) + return nil + } + + log.Infof( + "It looks like these ACLs don't already exist. Will create them with this config:\n%s", + formatNewACLsConfig(newACLs), + ) + + ok, _ := util.Confirm("OK to continue?", a.config.SkipConfirm) + if !ok { + return errors.New("Stopping because of user response") + } + + log.Infof("Creating new ACLs for user with config %+v", formatNewACLsConfig(newACLs)) + + if err := a.adminClient.CreateACLs(ctx, acls); err != nil { + return fmt.Errorf("error creating new ACLs: %v", err) + } + + return nil +} + +// formatNewACLsConfig generates a pretty string representation of kafka-go +// ACL configurations. +func formatNewACLsConfig(config []kafka.ACLEntry) string { + content, err := json.MarshalIndent(config, "", " ") + if err != nil { + log.Warnf("Error marshalling ACLs config: %+v", err) + return "Error" + } + + return string(content) +} diff --git a/pkg/create/acl_test.go b/pkg/create/acl_test.go new file mode 100644 index 00000000..0797fb81 --- /dev/null +++ b/pkg/create/acl_test.go @@ -0,0 +1,318 @@ +package create + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestCreateNewACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateExistingACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + // Run create again and make sure it is idempotent + err = creator.Create(ctx) + require.NoError(t, err) + acl, err = creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateACLsDryRun(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + creator.config.DryRun = true + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{}, acl) +} + +func testCreator( + ctx context.Context, + t *testing.T, + aclConfig config.ACLConfig, +) *ACLCreator { + clusterConfig := config.ClusterConfig{ + Meta: config.ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ClusterSpec{ + BootstrapAddrs: []string{util.TestKafkaAddr()}, + ZKLockPath: "/topicctl/locks", + }, + } + + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") + require.NoError(t, err) + + applier, err := NewACLCreator( + ctx, + adminClient, + ACLCreatorConfig{ + ClusterConfig: clusterConfig, + ACLConfig: aclConfig, + DryRun: false, + SkipConfirm: true, + }, + ) + require.NoError(t, err) + return applier +} diff --git a/pkg/apply/confirm.go b/pkg/util/confirm.go similarity index 98% rename from pkg/apply/confirm.go rename to pkg/util/confirm.go index 62e54787..101d5435 100644 --- a/pkg/apply/confirm.go +++ b/pkg/util/confirm.go @@ -1,4 +1,4 @@ -package apply +package util import ( "fmt" diff --git a/pkg/util/error.go b/pkg/util/error.go index d721fffb..41cdfb6c 100644 --- a/pkg/util/error.go +++ b/pkg/util/error.go @@ -49,18 +49,3 @@ func AlterPartitionReassignmentsRequestAssignmentError(results []kafka.AlterPart } return nil } - -func DescribeUserScramCredentialsResponseResultsError(results []kafka.DescribeUserScramCredentialsResponseResult) error { - errors := map[string]error{} - var hasErrors bool - for _, result := range results { - if result.Error != nil { - hasErrors = true - errors[result.User] = result.Error - } - } - if hasErrors { - return fmt.Errorf("%+v", errors) - } - return nil -}