Skip to content

Commit

Permalink
add combined kraft mode
Browse files Browse the repository at this point in the history
  • Loading branch information
piif committed Nov 8, 2024
1 parent baa0027 commit 6884638
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 19 deletions.
21 changes: 20 additions & 1 deletion docs/VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -940,11 +940,30 @@ Default: 10

***

### kraft_combined

Boolean used to declare broker nodes as controller (combined mode). Do not use in production environment

Default: false

***

### kafka_controller_quorum_voters

Default controller quorum voters

Default: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
Default: "
{%- if kraft_combined -%}
{%- for broker_hostname in groups.kafka_broker|default([]) %}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
{%- endfor -%}
{%- else -%}
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
{%- endfor -%}
{%- endif -%}"

***

Expand Down
83 changes: 82 additions & 1 deletion roles/kafka_broker/tasks/get_meta_properties.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,93 @@
---
- name: Prepare SCRAM Users if needed
set_fact:
scram_users_to_create: []

# with kraft combined mode, first install have to define clusterid, instead of getting it from dedicated controllers
- name: Check meta.properties
run_once: true
when: kraft_combined
ansible.builtin.stat:
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: meta_properties

- name: Initialize ClusterId
when:
- kraft_combined
- not meta_properties.stat.exists
run_once: true
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
register: random_uuid

- name: Set ClusterId
when:
- kraft_combined
- not meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ random_uuid.stdout }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

## and initialize temporary controller admin user
- name: Prepare SCRAM 512 admin user
when:
- kraft_combined
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

- name: Prepare SCRAM 256 admin user
when:
- kraft_combined
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

# after first install in combined mode, get clusterid from one broker node
- name: Extract ClusterId from meta.properties on KRaft Controller
when:
- kraft_combined
- meta_properties.stat.exists
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: uuid_broker

- name: Set ClusterId
when:
- kraft_combined
- meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

# with dedicated controller nodes, clusterid is already defined onto controller nodes
- name: Extract ClusterId from meta.properties on KRaft Controller
when: not kraft_combined
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
delegate_to: "{{ groups.kafka_controller[0] }}"
register: uuid_broker

- name: Set ClusterId
when: not kraft_combined
run_once: true
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

- name: Format Storage Directory
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted"
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
register: format_meta
vars:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
63 changes: 57 additions & 6 deletions roles/kafka_controller/tasks/get_meta_properties.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,68 @@
---

- name: Check if Data Directories are Formatted
shell: "{{ binary_base_path }}/bin/kafka-storage info -c {{ kafka_controller.config_file }}"
ignore_errors: true
failed_when: false
register: formatted

- name: Get ClusterId
- name: Prepare SCRAM Users if needed
set_fact:
scram_users_to_create: []

- name: Check meta.properties
run_once: true
ansible.builtin.stat:
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: meta_properties

# if meta.properties does not exists , create uuid
- name: Initialize ClusterId
when: not meta_properties.stat.exists
run_once: true
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
register: uuid_key
register: random_uuid

- name: Set ClusterId
when: not meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ random_uuid.stdout }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups['kafka_controller'] }}"

## and initialize temporary controller admin user
- name: Prepare SCRAM 512 admin user
when:
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

- name: Prepare SCRAM 256 admin user
when:
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

# else, extract it from meta.properties
- name: Extract ClusterId from meta.properties
when: meta_properties.stat.exists
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: uuid_broker

- name: Set ClusterId
when: meta_properties.stat.exists and not kraft_migration|bool
run_once: true
when: not kraft_migration|bool
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups['kafka_controller'] }}"

- name: Extract ClusterId from meta.properties on ZK Broker
slurp:
Expand All @@ -21,8 +72,8 @@
when: kraft_migration|bool

- name: Format Data Directory
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted"
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
register: format_meta
vars:
clusterid: "{{ (zoo_cluster['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] if kraft_migration|bool else uuid_key.stdout }}"
when: formatted.rc == 1 # To trigger the command only when the directories are not formatted
clusterid: "{{ (zoo_cluster['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] if kraft_migration|bool else clusterid }}"
when: kraft_migration|bool or formatted.rc == 1 # To trigger the command only during migration or when the directories are not still formatted
21 changes: 18 additions & 3 deletions roles/variables/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,22 @@ kraft_migration: false
### Parameter to increase the number of retries for Metadata Migration API request
metadata_migration_retries: 10

### Default controller quorum voters
kafka_controller_quorum_voters: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
### set to true to install controller and broker on same nodes
kraft_combined: false

### Default controller quorum voters. Dynamically assigned later if not user provided
kafka_controller_quorum_voters: >-
{%- if kraft_combined -%}
{%- for broker_hostname in groups.kafka_broker|default([]) %}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
{%- endfor -%}
{%- else -%}
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
{%- endfor -%}
{%- endif -%}
### Default Kafka config prefix. Only valid to customize when installation_method: archive
kafka_controller_config_prefix: "{{ config_prefix }}/controller"
Expand Down Expand Up @@ -578,7 +592,8 @@ kafka_broker_default_listeners: "{
'ssl_enabled': {{ssl_enabled|string|lower}},
'ssl_mutual_auth_enabled': {{ssl_mutual_auth_enabled|string|lower}},
'sasl_protocol': '{{sasl_protocol}}'
}{% endif %}{% endif %}
}{% endif %}{% if kraft_enabled|bool and kraft_combined|bool %},
'controller': {{ kafka_controller_listeners['controller'] }}{% endif %}{% endif %}
}"

### Dictionary to put additional listeners to be configured within Kafka. Each listener must include a 'name' and 'port' key. Optionally they can include the keys 'ssl_enabled', 'ssl_mutual_auth_enabled', and 'sasl_protocol'
Expand Down
30 changes: 22 additions & 8 deletions roles/variables/vars/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_
binary_base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_join) if installation_method == 'archive' else '/usr' }}"

### Runs kafka in Kraft mode if controller is present
kraft_enabled: "{{ true if 'kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0 else false }}"
kraft_enabled: "{{ true if kraft_combined or ('kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0) else false }}"

#### Config prefix paths ####
zookeeper_config_prefix_path: "{{ zookeeper_config_prefix.strip('/') }}"
Expand Down Expand Up @@ -157,7 +157,7 @@ kafka_controller_properties:
confluent.security.event.logger.exporter.kafka.topic.replicas: "{{audit_logs_destination_bootstrap_servers.split(',')|length if audit_logs_destination_enabled and rbac_enabled else kafka_controller_default_internal_replication_factor}}"
confluent.support.metrics.enable: "true"
confluent.support.customer.id: anonymous
log.dirs: "/var/lib/controller/data"
log.dirs: "{{ '/var/lib/kafka/data' if kraft_combined else '/var/lib/controller/data' }}"
kafka.rest.enable: "{{kafka_controller_rest_proxy_enabled|string|lower}}"
process.roles: controller
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
Expand Down Expand Up @@ -234,9 +234,9 @@ kafka_controller_properties:
properties:
sasl.kerberos.service.name: "{{kerberos_kafka_controller_primary}}"
inter_broker_sasl:
enabled: "{{ kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
enabled: "{{ kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
properties:
sasl.mechanism.inter.broker.protocol: "{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
sasl.mechanism.inter.broker.protocol: "{{kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
sr:
enabled: "{{ kafka_broker_schema_validation_enabled and 'schema_registry' in groups }}"
properties:
Expand Down Expand Up @@ -385,7 +385,7 @@ kafka_broker_properties:
socket.send.buffer.bytes: 102400
transaction.state.log.min.isr: "{{ [ 2, kafka_broker_default_internal_replication_factor|int ] | min }}"
transaction.state.log.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items|rejectattr('key', 'equalto', 'controller') %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
confluent.ansible.managed: 'true'
confluent.license.topic: _confluent-command
confluent.license.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
Expand Down Expand Up @@ -413,10 +413,24 @@ kafka_broker_properties:
broker_on_controller:
enabled: "{{kraft_enabled|bool}}"
properties:
process.roles: "broker{% if kraft_combined %},controller{% endif %}"
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
controller.listener.names: "{{kafka_controller_listeners['controller']['name']}}"
listener.security.protocol.map: "{% for listener in kafka_controller_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}{% endfor %},{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}{% endfor %}"
listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}{% endfor %}"
controller.listener.names: "{{ kafka_controller_listeners['controller']['name'] }}"
listener.security.protocol.map: >-
{%- for listener in kafka_controller_listeners|dict2items -%}
{%- if loop.index > 1%},{% endif -%}
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}
{%- endfor -%}
,
{%- for listener in kafka_broker_listeners|dict2items -%}
{%- if loop.index > 1%},{% endif -%}
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}
{%- endfor -%}
listeners: >-
{%- for listener in kafka_broker_listeners|dict2items -%}
{%- if loop.index > 1 %},{% endif -%}
{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}
{%- endfor -%}
broker_on_zookeeper:
enabled: "{{not kraft_enabled|bool}}"
properties:
Expand Down

0 comments on commit 6884638

Please sign in to comment.