diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac9d46ed..5b135df2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,7 +117,7 @@ jobs: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - test241: + test271: runs-on: ubuntu-latest container: image: cimg/go:1.19 @@ -148,7 +148,7 @@ jobs: - "2181:2181" kafka1: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9092:9092" env: @@ -161,7 +161,7 @@ jobs: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka2: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9093:9092" env: @@ -174,7 +174,7 @@ jobs: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka3: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9094:9092" env: @@ -187,7 +187,7 @@ jobs: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka4: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9095:9092" env: @@ -200,7 +200,7 @@ jobs: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka5: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9096:9092" env: @@ -213,7 +213,7 @@ jobs: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka6: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 ports: - "9097:9092" env: @@ -227,7 +227,7 @@ jobs: snyk: runs-on: ubuntu-latest - needs: [test010, test241] + needs: [test010, test271] steps: - uses: actions/checkout@v3 - name: Run Snyk to check for vulnerabilities diff --git a/README.md b/README.md index 997d1fa3..7e77dcc6 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,7 @@ resource type in the cluster. Currently, the following operations are supported: | `get offsets [topic]` | Number of messages per partition along with start and end times | | `get topics` | All topics in the cluster | | `get acls [flags]` | Describe access control levels (ACLs) in the cluster | +| `get users` | All users in the cluster | #### rebalance diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 2f96f9d4..60e458a5 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -68,6 +68,7 @@ func init() { offsetsCmd(), topicsCmd(), aclsCmd(), + usersCmd(), ) RootCmd.AddCommand(getCmd) } @@ -414,3 +415,24 @@ $ topicctl get acls --host 198.51.100.0 ) return cmd } + +func usersCmd() *cobra.Command { + return &cobra.Command{ + Use: "users", + Short: "Displays information for all users in the cluster.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetUsers(ctx, nil) + }, + } +} diff --git a/docker-compose-auth.yml b/docker-compose-auth.yml index 7897cb8d..22d46a46 100644 --- a/docker-compose-auth.yml +++ b/docker-compose-auth.yml @@ -16,7 +16,7 @@ services: - "2181:2181" kafka: - image: wurstmeister/kafka:2.12-2.4.1 + image: wurstmeister/kafka:2.13-2.7.1 restart: on-failure:3 links: - zookeeper diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index fa70d4bd..48c4bd34 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -99,11 +99,17 @@ func NewBrokerAdminClient( supportedFeatures.DynamicBrokerConfigs = true } - // If we have DescribeAcls, than we're running a version of Kafka > 2.0.1, + // If we have DescribeAcls, then we're running a version of Kafka > 2.0.1, // that will have support for all ACLs APIs. if _, ok := maxVersions["DescribeAcls"]; ok { supportedFeatures.ACLs = true } + + // If we have DescribeUserScramCredentials, than we're running a version of Kafka > 2.7.1, + // that will have support for all User APIs. + if _, ok := maxVersions["DescribeUserScramCredentials"]; ok { + supportedFeatures.Users = true + } log.Debugf("Supported features: %+v", supportedFeatures) adminClient := &BrokerAdminClient{ @@ -379,6 +385,72 @@ func (c *BrokerAdminClient) GetTopic( return topicInfos[0], nil } +func (c *BrokerAdminClient) GetUsers( + ctx context.Context, + names []string, +) ([]UserInfo, error) { + var users []kafka.UserScramCredentialsUser + for _, name := range names { + users = append(users, kafka.UserScramCredentialsUser{ + Name: name, + }) + } + + req := kafka.DescribeUserScramCredentialsRequest{ + Users: users, + } + log.Debugf("DescribeUserScramCredentials request: %+v", req) + + resp, err := c.client.DescribeUserScramCredentials(ctx, &req) + log.Debugf("DescribeUserScramCredentials response: %+v (%+v)", resp, err) + if err != nil { + return nil, err + } + + if err = util.DescribeUserScramCredentialsResponseResultsError(resp.Results); err != nil { + return nil, err + } + + results := []UserInfo{} + + for _, result := range resp.Results { + var credentials []CredentialInfo + for _, credential := range result.CredentialInfos { + credentials = append(credentials, CredentialInfo{ + ScramMechanism: ScramMechanism(credential.Mechanism), + Iterations: credential.Iterations, + }) + } + results = append(results, UserInfo{ + Name: result.User, + CredentialInfos: credentials, + }) + } + return results, err +} + +func (c *BrokerAdminClient) UpsertUser( + ctx context.Context, + user kafka.UserScramCredentialsUpsertion, +) error { + if c.config.ReadOnly { + return errors.New("Cannot create user in read-only mode") + } + req := kafka.AlterUserScramCredentialsRequest{ + Upsertions: []kafka.UserScramCredentialsUpsertion{user}, + } + log.Debugf("AlterUserScramCredentials request: %+v", req) + resp, err := c.client.AlterUserScramCredentials(ctx, &req) + log.Debugf("AlterUserScramCredentials response: %+v", resp) + if err != nil { + return err + } + if err = resp.Results[0].Error; err != nil { + return err + } + return nil +} + // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. func (c *BrokerAdminClient) UpdateTopicConfig( diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index b75e5491..542d225c 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -664,6 +664,94 @@ func TestBrokerClientCreateGetACL(t *testing.T) { assert.Equal(t, expected, aclsInfo) } +func TestBrokerClientCreateGetUsers(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + name := util.RandomString("test-user-", 6) + mechanism := kafka.ScramMechanismSha512 + + defer func() { + resp, err := client.client.AlterUserScramCredentials( + ctx, + &kafka.AlterUserScramCredentialsRequest{ + Deletions: []kafka.UserScramCredentialsDeletion{ + { + Name: name, + Mechanism: mechanism, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up user, err: %v", err)) + } + for _, response := range resp.Results { + if err = response.Error; err != nil { + t.Fatal(fmt.Errorf("failed to clean up user, err: %v", err)) + } + } + + }() + + err = client.UpsertUser(ctx, kafka.UserScramCredentialsUpsertion{ + Name: name, + Mechanism: mechanism, + Iterations: 15000, + Salt: []byte("my-salt"), + SaltedPassword: []byte("my-salted-password"), + }) + + require.NoError(t, err) + + resp, err := client.GetUsers(ctx, []string{name}) + require.NoError(t, err) + assert.Equal(t, []UserInfo{ + { + Name: name, + CredentialInfos: []CredentialInfo{ + { + ScramMechanism: ScramMechanism(mechanism), + Iterations: 15000, + }, + }, + }, + }, resp) +} + +func TestBrokerClientUpsertUserReadOnly(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + ReadOnly: true, + }, + ) + require.NoError(t, err) + err = client.UpsertUser(ctx, kafka.UserScramCredentialsUpsertion{}) + + assert.Equal(t, errors.New("Cannot create user in read-only mode"), err) +} + func TestBrokerClientCreateACLReadOnly(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 3db53f9a..f872ad90 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -47,6 +47,12 @@ type Client interface { // GetAllTopicsMetadata performs kafka-go metadata call to get topic information GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error) + // GetUsers gets information about users in the cluster. + GetUsers( + ctx context.Context, + names []string, + ) ([]UserInfo, error) + // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. UpdateTopicConfig( @@ -77,6 +83,12 @@ type Client interface { acls []kafka.ACLEntry, ) error + // UpsertUser creates or updates an user in zookeeper. + UpsertUser( + ctx context.Context, + user kafka.UserScramCredentialsUpsertion, + ) error + // AssignPartitions sets the replica broker IDs for one or more partitions in a topic. AssignPartitions( ctx context.Context, diff --git a/pkg/admin/format.go b/pkg/admin/format.go index 619b8ff1..03921237 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -1000,6 +1000,51 @@ func FormatACLs(acls []ACLInfo) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatUsers creates a pretty table that lists the details of the +// argument users. +func FormatUsers(users []UserInfo) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Name", + "Mechanism", + "Iterations", + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for _, user := range users { + for _, credential := range user.CredentialInfos { + row := []string{ + user.Name, + credential.ScramMechanism.String(), + fmt.Sprintf("%d", credential.Iterations), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + func prettyConfig(config map[string]string) string { rows := []string{} diff --git a/pkg/admin/support.go b/pkg/admin/support.go index 242032b4..d2ce58b5 100644 --- a/pkg/admin/support.go +++ b/pkg/admin/support.go @@ -19,4 +19,7 @@ type SupportedFeatures struct { // ACLs indicates whether the client supports access control levels. ACLs bool + + // Users indicates whether the client supports SASL Users. + Users bool } diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 388383da..d55a0e85 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -291,6 +291,35 @@ func (p *ACLPermissionType) Type() string { return "ACLPermissionType" } +// UserInfo represents the information stored about a user +// in zookeeper. +type UserInfo struct { + Name string `json:"name"` + CredentialInfos []CredentialInfo `json:"credentialInfos"` +} + +// CredentialInfo represents read only information about +// a users credentials in zookeeper. +type CredentialInfo struct { + ScramMechanism ScramMechanism `json:"scramMechanism"` + Iterations int `json:"iterations"` +} + +// ScramMechanism represents the ScramMechanism used +// for a users credential in zookeeper. +type ScramMechanism kafka.ScramMechanism + +func (s *ScramMechanism) String() string { + switch kafka.ScramMechanism(*s) { + case kafka.ScramMechanismSha256: + return "sha256" + case kafka.ScramMechanismSha512: + return "sha512" + default: + return "unknown" + } +} + type zkClusterID struct { Version string `json:"version"` ID string `json:"id"` diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 6424224b..8f7f4d9d 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -436,6 +436,20 @@ func (c *ZKAdminClient) CreateACLs( return errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } +func (c *ZKAdminClient) GetUsers( + ctx context.Context, + names []string, +) ([]UserInfo, error) { + return nil, errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.") +} + +func (c *ZKAdminClient) UpsertUser( + ctx context.Context, + user kafka.UserScramCredentialsUpsertion, +) error { + return errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.") +} + // UpdateTopicConfig updates the config JSON for a topic and sets a change // notification so that the brokers are notified. If overwrite is true, then // it will overwrite existing config entries. @@ -778,6 +792,7 @@ func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures { Locks: true, DynamicBrokerConfigs: true, ACLs: false, + Users: false, } } diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index 650e410c..72ec4c10 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -1106,3 +1106,34 @@ func TestZkCreateACL(t *testing.T) { err = adminClient.CreateACLs(ctx, []kafka.ACLEntry{}) assert.Equal(t, err, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.")) } + +func TestZkGetUsers(t *testing.T) { + ctx := context.Background() + adminClient, err := NewZKAdminClient( + ctx, + ZKAdminClientConfig{ + ZKAddrs: []string{util.TestZKAddr()}, + }, + ) + require.NoError(t, err) + defer adminClient.Close() + + acls, err := adminClient.GetUsers(ctx, []string{}) + assert.Empty(t, acls) + assert.Equal(t, err, errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.")) +} + +func TestZkUpsertUser(t *testing.T) { + ctx := context.Background() + adminClient, err := NewZKAdminClient( + ctx, + ZKAdminClientConfig{ + ZKAddrs: []string{util.TestZKAddr()}, + }, + ) + require.NoError(t, err) + defer adminClient.Close() + + err = adminClient.UpsertUser(ctx, kafka.UserScramCredentialsUpsertion{}) + assert.Equal(t, err, errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.")) +} diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index aff6af1c..312e286d 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -520,6 +520,21 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { return nil } +// GerUsers fetches the details of each user in the cluster and prints out a table of them. +func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error { + c.startSpinner() + + users, err := c.adminClient.GetUsers(ctx, names) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Users:\n%s", admin.FormatUsers(users)) + + return nil +} + // ResetOffsets resets the offsets for a single consumer group / topic combination. func (c *CLIRunner) ResetOffsets( ctx context.Context, diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index 0fcdb2d0..d1f26826 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -79,6 +79,10 @@ var ( Text: "topics", Description: "Get all topics", }, + { + Text: "users", + Description: "Get all users", + }, } helpTableStr = helpTable() @@ -239,7 +243,6 @@ func (r *Repl) executor(in string) { log.Errorf("Error: %+v", err) return } - case "balance": if err := command.checkArgs(2, 3, nil); err != nil { log.Errorf("Error: %+v", err) @@ -360,6 +363,15 @@ func (r *Repl) executor(in string) { log.Errorf("Error: %+v", err) return } + case "users": + if err := command.checkArgs(2, 2, nil); err != nil { + log.Errorf("Error: %+v", err) + return + } + if err := r.cliRunner.GetUsers(ctx, nil); err != nil { + log.Errorf("Error: %+v", err) + return + } default: log.Error("Unrecognized input. Run 'help' for details on available commands.") } @@ -506,6 +518,10 @@ func helpTable() string { " get topics", "Get all topics", }, + { + " get users", + "Get all users", + }, { " tail [topic] [optional filter regexp] [--raw]", "Tail all messages in a topic", diff --git a/pkg/util/error.go b/pkg/util/error.go index 41cdfb6c..d721fffb 100644 --- a/pkg/util/error.go +++ b/pkg/util/error.go @@ -49,3 +49,18 @@ func AlterPartitionReassignmentsRequestAssignmentError(results []kafka.AlterPart } return nil } + +func DescribeUserScramCredentialsResponseResultsError(results []kafka.DescribeUserScramCredentialsResponseResult) error { + errors := map[string]error{} + var hasErrors bool + for _, result := range results { + if result.Error != nil { + hasErrors = true + errors[result.User] = result.Error + } + } + if hasErrors { + return fmt.Errorf("%+v", errors) + } + return nil +}