diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5b135df2..e1ea588c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,10 @@ name: ci -on: [push, pull_request] +on: + push: + branches: + - master + pull_request: jobs: @@ -46,78 +50,86 @@ jobs: services: zookeeper: - image: wurstmeister/zookeeper + image: bitnami/zookeeper:latest ports: - "2181:2181" + env: + ALLOW_ANONYMOUS_LOGIN: yes kafka1: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9092:9092" env: KAFKA_BROKER_ID: 1 KAFKA_BROKER_RACK: zone1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka1 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafka2: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9093:9092" env: KAFKA_BROKER_ID: 2 KAFKA_BROKER_RACK: zone1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka2 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafka3: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9094:9092" env: KAFKA_BROKER_ID: 3 KAFKA_BROKER_RACK: zone2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka3 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafka4: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9095:9092" - env: + env: KAFKA_BROKER_ID: 4 KAFKA_BROKER_RACK: zone2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka4 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafka5: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9096:9092" - env: + env: KAFKA_BROKER_ID: 5 KAFKA_BROKER_RACK: zone3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka5 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafka6: - image: wurstmeister/kafka:2.11-0.10.2.2 + image: bitnami/kafka:0.10.2.1 ports: - "9097:9092" - env: + env: KAFKA_BROKER_ID: 6 KAFKA_BROKER_RACK: zone3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_HOST_NAME: kafka6 KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - test271: + test270: runs-on: ubuntu-latest container: image: cimg/go:1.19 @@ -143,91 +155,99 @@ jobs: services: zookeeper: - image: wurstmeister/zookeeper + image: bitnami/zookeeper:latest ports: - "2181:2181" + env: + ALLOW_ANONYMOUS_LOGIN: yes kafka1: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9092:9092" env: - KAFKA_BROKER_ID: 1 - KAFKA_BROKER_RACK: zone1 - KAFKA_ADVERTISED_HOST_NAME: kafka1 - KAFKA_ADVERTISED_PORT: 9092 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_BROKER_RACK: zone1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka1 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka2: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9093:9092" env: - KAFKA_BROKER_ID: 2 - KAFKA_BROKER_RACK: zone1 - KAFKA_ADVERTISED_HOST_NAME: kafka2 - KAFKA_ADVERTISED_PORT: 9092 + KAFKA_CFG_BROKER_ID: 2 + KAFKA_CFG_BROKER_RACK: zone1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka2 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka3: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9094:9092" env: - KAFKA_BROKER_ID: 3 - KAFKA_BROKER_RACK: zone2 - KAFKA_ADVERTISED_HOST_NAME: kafka3 - KAFKA_ADVERTISED_PORT: 9092 + KAFKA_CFG_BROKER_ID: 3 + KAFKA_CFG_BROKER_RACK: zone2 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka3 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka4: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9095:9092" - env: - KAFKA_BROKER_ID: 4 - KAFKA_BROKER_RACK: zone2 - KAFKA_ADVERTISED_HOST_NAME: kafka4 - KAFKA_ADVERTISED_PORT: 9092 + env: + KAFKA_CFG_BROKER_ID: 4 + KAFKA_CFG_BROKER_RACK: zone2 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka4 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka5: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9096:9092" - env: - KAFKA_BROKER_ID: 5 - KAFKA_BROKER_RACK: zone3 - KAFKA_ADVERTISED_HOST_NAME: kafka5 - KAFKA_ADVERTISED_PORT: 9092 + env: + KAFKA_CFG_BROKER_ID: 5 + KAFKA_CFG_BROKER_RACK: zone3 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka5 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true kafka6: - image: wurstmeister/kafka:2.13-2.7.1 + image: bitnami/kafka:2.7.0 ports: - "9097:9092" - env: - KAFKA_BROKER_ID: 6 - KAFKA_BROKER_RACK: zone3 - KAFKA_ADVERTISED_HOST_NAME: kafka6 - KAFKA_ADVERTISED_PORT: 9092 + env: + KAFKA_CFG_BROKER_ID: 6 + KAFKA_CFG_BROKER_RACK: zone3 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ADVERTISED_HOST_NAME: kafka6 + KAFKA_CFG_ADVERTISED_PORT: 9092 + KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true snyk: runs-on: ubuntu-latest - needs: [test010, test271] + needs: [test010, test270] steps: - uses: actions/checkout@v3 - name: Run Snyk to check for vulnerabilities diff --git a/README.md b/README.md index 09da81fd..00bafb0f 100644 --- a/README.md +++ b/README.md @@ -165,7 +165,6 @@ Currently, only ACLs are supported. The create command is separate from the appl command as it is intended for usage with immutable resources managed by topicctl. #### delete - ``` topicctl delete [flags] [operation] ``` @@ -504,7 +503,7 @@ The `apply` subcommand can make changes, but under the following conditions: The `reset-offsets` command can also make changes in the cluster and should be used carefully. The `create` command can be used to create new resources in the cluster. It cannot be used with -mutuable resources. +mutable resources. ### Idempotency @@ -619,7 +618,7 @@ make test You can change the Kafka version of the local cluster by setting the `KAFKA_IMAGE_TAG` environment variable when running `docker-compose up -d`. See the -[`wurstmeister/kafka` dockerhub page](https://hub.docker.com/r/wurstmeister/kafka/tags) for more +[`bitnami/kafka` dockerhub page](https://hub.docker.com/r/bitnami/kafka/tags) for more details on the available versions. #### Run against local cluster diff --git a/docker-compose-auth.yml b/docker-compose-auth.yml index 22d46a46..553e9a0e 100644 --- a/docker-compose-auth.yml +++ b/docker-compose-auth.yml @@ -1,3 +1,8 @@ +# By default, this docker-compose setup uses Kafka 2.7.0. This version can +# be overwritten by setting the KAFKA_IMAGE_TAG environment variable. +# +# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list. +# # This config sets up a simple, single-node cluster that's equipped to use SSL/TLS and/or SASL. # It exposes access on four separate ports: # @@ -7,46 +12,57 @@ # 4. 9095: SASL over SSL # # See examples/auth for the associated cluster configs and certs. -version: '2' +version: '3' services: zookeeper: - image: "wurstmeister/zookeeper:latest" + container_name: zookeeper + hostname: zookeeper + image: bitnami/zookeeper:latest ports: - "2181:2181" + environment: + ALLOW_ANONYMOUS_LOGIN: yes kafka: - image: wurstmeister/kafka:2.13-2.7.1 + container_name: kafka + hostname: kafka + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} + depends_on: + - zookeeper restart: on-failure:3 - links: - - zookeeper ports: - 9092:9092 - 9093:9093 - 9094:9094 - 9095:9095 environment: - KAFKA_BROKER_ID: 1 - KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_ADVERTISED_PORT: 9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 200000000 - KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094,SASL_SSL://:9095" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_PLAINTEXT://localhost:9094,SASL_SSL://localhost:9095" - KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" - KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_SSL_KEYSTORE_LOCATION: /certs/kafka.keystore.jks - KAFKA_SSL_KEYSTORE_PASSWORD: test123 - KAFKA_SSL_KEY_PASSWORD: test123 - KAFKA_SSL_TRUSTSTORE_LOCATION: /certs/kafka.truststore.jks - KAFKA_SSL_TRUSTSTORE_PASSWORD: test123 - KAFKA_SSL_CLIENT_AUTH: none - KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" - CUSTOM_INIT_SCRIPT: |- - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; - /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_BROKER_RACK: zone1 + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CFG_MESSAGE_MAX_BYTES: 200000000 + KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094,SASL_SSL://:9095" + KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_PLAINTEXT://localhost:9094,SASL_SSL://localhost:9095" + KAFKA_CFG_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" + KAFKA_CFG_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer" + + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + + KAFKA_CFG_SSL_KEYSTORE_LOCATION: /opt/bitnami/kafka/config/certs/kafka.truststore.jks + KAFKA_CFG_SSL_KEYSTORE_PASSWORD: test123 + + KAFKA_CFG_SSL_TRUSTSTORE_LOCATION: /opt/bitnami/kafka/config/certs/kafka.truststore.jks + KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD: test123 + + + KAFKA_CFG_SSL_CLIENT_AUTH: none + KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + ALLOW_PLAINTEXT_LISTENER: "yes" + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh volumes: - /var/run/docker.sock:/var/run/docker.sock - - ./examples/auth/certs:/certs + - ./examples/auth/certs:/opt/bitnami/kafka/config/certs diff --git a/docker-compose.yml b/docker-compose.yml index 99a5e5c6..d142a765 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,116 +1,125 @@ -# By default, this docker-compose setup uses Kafka 2.4.1. This version can -# be overwritten by setting the KAFKA_IMAGE_TAG environment variable; some choices here include: +# By default, this docker-compose setup uses Kafka 2.7.0. This version can +# be overwritten by setting the KAFKA_IMAGE_TAG environment variable. # -# 1. Kafka 2.6: 2.13-2.6.0 -# 2. Kafka 2.5: 2.13-2.5.0 -# 3. Kafka 0.10: 2.11-0.10.2.2 -# -# See https://hub.docker.com/r/wurstmeister/kafka/tags for the complete list. -version: '2.1' +# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list. +version: '3' services: zookeeper: - image: wurstmeister/zookeeper + container_name: zookeeper + hostname: zookeeper + image: bitnami/zookeeper:latest ports: - "2181:2181" + environment: + ALLOW_ANONYMOUS_LOGIN: yes # Zone 1 brokers kafka1: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka1 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9092:9092" environment: - KAFKA_BROKER_ID: 1 - KAFKA_BROKER_RACK: zone1 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9092 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_BROKER_RACK: zone1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9092 restart: on-failure depends_on: - zookeeper kafka2: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka2 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9093:9092" environment: - KAFKA_BROKER_ID: 2 - KAFKA_BROKER_RACK: zone1 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9093 + KAFKA_CFG_BROKER_ID: 2 + KAFKA_CFG_BROKER_RACK: zone1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9093 restart: on-failure depends_on: - zookeeper - - kafka1 # Zone 2 brokers kafka3: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka3 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9094:9092" environment: - KAFKA_BROKER_ID: 3 - KAFKA_BROKER_RACK: zone2 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9094 + KAFKA_CFG_BROKER_ID: 3 + KAFKA_CFG_BROKER_RACK: zone2 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9094 restart: on-failure depends_on: - zookeeper - - kafka1 - - kafka2 kafka4: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka4 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9095:9092" environment: - KAFKA_BROKER_ID: 4 - KAFKA_BROKER_RACK: zone2 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9095 + KAFKA_CFG_BROKER_ID: 4 + KAFKA_CFG_BROKER_RACK: zone2 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9095 restart: on-failure depends_on: - zookeeper - - kafka1 - - kafka2 - - kafka3 # Zone 3 brokers kafka5: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka5 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9096:9092" environment: - KAFKA_BROKER_ID: 5 - KAFKA_BROKER_RACK: zone3 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9096 + KAFKA_CFG_BROKER_ID: 5 + KAFKA_CFG_BROKER_RACK: zone3 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9096 restart: on-failure depends_on: - zookeeper - - kafka1 - - kafka2 - - kafka3 - - kafka4 kafka6: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} + container_name: kafka6 + hostname: 169.254.123.123 + image: bitnami/kafka:${KAFKA_IMAGE_TAG:-2.7.0} ports: - "9097:9092" environment: - KAFKA_BROKER_ID: 6 - KAFKA_BROKER_RACK: zone3 - KAFKA_ADVERTISED_HOST_NAME: 169.254.123.123 - KAFKA_ADVERTISED_PORT: 9097 + KAFKA_CFG_BROKER_ID: 6 + KAFKA_CFG_BROKER_RACK: zone3 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://169.254.123.123:9097 restart: on-failure depends_on: - zookeeper - - kafka1 - - kafka2 - - kafka3 - - kafka4 - - kafka5 diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 338a44a1..b969bae0 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -19,6 +19,7 @@ import ( "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/create" "github.com/segmentio/topicctl/pkg/groups" "github.com/segmentio/topicctl/pkg/messages" log "github.com/sirupsen/logrus" @@ -136,6 +137,7 @@ func (c *CLIRunner) CreateACL( ) err = aclAdmin.Create(ctx) + if err != nil { return err } diff --git a/pkg/create/acl.go b/pkg/create/acl.go new file mode 100644 index 00000000..ddf488e0 --- /dev/null +++ b/pkg/create/acl.go @@ -0,0 +1,141 @@ +package create + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" +) + +// ACLCreatorConfig contains the configuration for an ACL creator. +type ACLCreatorConfig struct { + ClusterConfig config.ClusterConfig + DryRun bool + SkipConfirm bool + ACLConfig config.ACLConfig +} + +type ACLCreator struct { + config ACLCreatorConfig + adminClient admin.Client + + clusterConfig config.ClusterConfig + aclConfig config.ACLConfig +} + +func NewACLCreator( + ctx context.Context, + adminClient admin.Client, + creatorConfig ACLCreatorConfig, +) (*ACLCreator, error) { + if !adminClient.GetSupportedFeatures().ACLs { + return nil, fmt.Errorf("ACLs are not supported by this cluster") + } + + return &ACLCreator{ + config: creatorConfig, + adminClient: adminClient, + clusterConfig: creatorConfig.ClusterConfig, + aclConfig: creatorConfig.ACLConfig, + }, nil +} + +func (a *ACLCreator) Create(ctx context.Context) error { + log.Info("Validating configs...") + + if err := a.clusterConfig.Validate(); err != nil { + return err + } + + if err := a.aclConfig.Validate(); err != nil { + return err + } + + if err := config.CheckConsistency(a.aclConfig.Meta, a.clusterConfig); err != nil { + return err + } + + log.Info("Checking if ACLs already exist...") + + acls := a.aclConfig.ToNewACLEntries() + + allExistingACLs := []kafka.ACLEntry{} + newACLs := []kafka.ACLEntry{} + + for _, acl := range acls { + existingACLs, err := a.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: acl.ResourceType, + ResourceNameFilter: acl.ResourceName, + ResourcePatternTypeFilter: acl.ResourcePatternType, + PrincipalFilter: acl.Principal, + HostFilter: acl.Host, + Operation: acl.Operation, + PermissionType: acl.PermissionType, + }) + if err != nil { + return fmt.Errorf("error checking for existing ACL (%v): %v", acl, err) + } + if len(existingACLs) > 0 { + allExistingACLs = append(allExistingACLs, acl) + } else { + newACLs = append(newACLs, acl) + } + } + + if len(allExistingACLs) > 0 { + log.Infof( + "Found %d existing ACLs:\n%s", + len(allExistingACLs), + formatNewACLsConfig(allExistingACLs), + ) + } + + if len(newACLs) == 0 { + log.Infof("No ACLs to create") + return nil + } + + if a.config.DryRun { + log.Infof( + "Would create ACLs with config %+v", + formatNewACLsConfig(newACLs), + ) + return nil + } + + log.Infof( + "It looks like these ACLs don't already exist. Will create them with this config:\n%s", + formatNewACLsConfig(newACLs), + ) + + ok, _ := util.Confirm("OK to continue?", a.config.SkipConfirm) + if !ok { + return errors.New("Stopping because of user response") + } + + log.Infof("Creating new ACLs for user with config %+v", formatNewACLsConfig(newACLs)) + + if err := a.adminClient.CreateACLs(ctx, acls); err != nil { + return fmt.Errorf("error creating new ACLs: %v", err) + } + + return nil +} + +// formatNewACLsConfig generates a pretty string representation of kafka-go +// ACL configurations. +func formatNewACLsConfig(config []kafka.ACLEntry) string { + content, err := json.MarshalIndent(config, "", " ") + if err != nil { + log.Warnf("Error marshalling ACLs config: %+v", err) + return "Error" + } + + return string(content) +} diff --git a/pkg/create/acl_test.go b/pkg/create/acl_test.go new file mode 100644 index 00000000..0797fb81 --- /dev/null +++ b/pkg/create/acl_test.go @@ -0,0 +1,318 @@ +package create + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestCreateNewACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateExistingACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + // Run create again and make sure it is idempotent + err = creator.Create(ctx) + require.NoError(t, err) + acl, err = creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateACLsDryRun(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + creator := testCreator(ctx, t, aclConfig) + defer creator.adminClient.Close() + creator.config.DryRun = true + + defer func() { + _, err := creator.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := creator.Create(ctx) + require.NoError(t, err) + acl, err := creator.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{}, acl) +} + +func testCreator( + ctx context.Context, + t *testing.T, + aclConfig config.ACLConfig, +) *ACLCreator { + clusterConfig := config.ClusterConfig{ + Meta: config.ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ClusterSpec{ + BootstrapAddrs: []string{util.TestKafkaAddr()}, + ZKLockPath: "/topicctl/locks", + }, + } + + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") + require.NoError(t, err) + + applier, err := NewACLCreator( + ctx, + adminClient, + ACLCreatorConfig{ + ClusterConfig: clusterConfig, + ACLConfig: aclConfig, + DryRun: false, + SkipConfirm: true, + }, + ) + require.NoError(t, err) + return applier +}