diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 5211e87b..d4c40609 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -736,19 +736,17 @@ func (c *BrokerAdminClient) GetACLs( return aclinfos, nil } -// CreateACL creates an ACL in the cluster. -func (c *BrokerAdminClient) CreateACL( +// CreateACLs creates an ACL in the cluster. +func (c *BrokerAdminClient) CreateACLs( ctx context.Context, - entry kafka.ACLEntry, + acls []kafka.ACLEntry, ) error { if c.config.ReadOnly { return errors.New("Cannot create ACL in read-only mode") } req := kafka.CreateACLsRequest{ - ACLs: []kafka.ACLEntry{ - entry, - }, + ACLs: acls, } log.Debugf("CreateACLs request: %+v", req) diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index e88eea25..85587e45 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -616,16 +616,18 @@ func TestBrokerClientCreateGetACL(t *testing.T) { } }() - err = client.CreateACL( + err = client.CreateACLs( ctx, - kafka.ACLEntry{ - Principal: principal, - PermissionType: kafka.ACLPermissionTypeAllow, - Operation: kafka.ACLOperationTypeRead, - ResourceType: kafka.ResourceTypeTopic, - ResourcePatternType: kafka.PatternTypeLiteral, - ResourceName: topicName, - Host: "*", + []kafka.ACLEntry{ + { + Principal: principal, + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + ResourceType: kafka.ResourceTypeTopic, + ResourcePatternType: kafka.PatternTypeLiteral, + ResourceName: topicName, + Host: "*", + }, }, ) require.NoError(t, err) @@ -670,7 +672,7 @@ func TestBrokerClientCreateACLReadOnly(t *testing.T) { ) require.NoError(t, err) - err = client.CreateACL(ctx, kafka.ACLEntry{}) + err = client.CreateACLs(ctx, []kafka.ACLEntry{}) assert.Equal(t, err, errors.New("Cannot create ACL in read-only mode")) } diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 5ec1f731..5c367b04 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -68,10 +68,10 @@ type Client interface { config kafka.TopicConfig, ) error - // Create ACL creates an ACL in the cluster. - CreateACL( + // Create ACLs creates ACLs in the cluster. + CreateACLs( ctx context.Context, - entry kafka.ACLEntry, + acls []kafka.ACLEntry, ) error // AssignPartitions sets the replica broker IDs for one or more partitions in a topic. diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 0aca8e77..a805db8a 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -429,9 +429,9 @@ func (c *ZKAdminClient) GetACLs( return nil, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } -func (c *ZKAdminClient) CreateACL( +func (c *ZKAdminClient) CreateACLs( ctx context.Context, - entry kafka.ACLEntry, + acls []kafka.ACLEntry, ) error { return errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index 34383b7d..650e410c 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -1103,6 +1103,6 @@ func TestZkCreateACL(t *testing.T) { require.NoError(t, err) defer adminClient.Close() - err = adminClient.CreateACL(ctx, kafka.ACLEntry{}) + err = adminClient.CreateACLs(ctx, []kafka.ACLEntry{}) assert.Equal(t, err, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.")) }