Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to reproduce error 41 #973

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ ci-test-unit:
ci-test-all:
pytest -s -v --log-format="%(asctime)s %(levelname)s %(message)s" --log-level DEBUG --cov aiokafka --cov-report xml --color=yes --docker-image $(DOCKER_IMAGE) $(FLAGS) tests

.PHONY: manual-test
manual-test:
docker compose up --build --exit-code-from aiokafka --attach aiokafka

coverage.xml: .coverage
coverage xml

Expand Down
64 changes: 64 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Network and IPs are hard coded otherwise a broker might be confused on restarting from a different IP

services:
kafka1:
build:
context: ./docker/kafka
command: "start-broker.sh"
environment:
BROKER_ID: "1"
KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.11
stop_grace_period: 30s
healthcheck:
test: [ "CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1" ]
retries: 30
interval: 1s

kafka2:
extends:
service: kafka1
environment:
BROKER_ID: "2"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.12

kafka3:
extends:
service: kafka1
environment:
BROKER_ID: "3"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.13


aiokafka:
build:
context: .
dockerfile: docker/aiokafka/Dockerfile
command: [ "python", "-u", "-m", "tests.manual.topic_management" ]
environment:
BOOTSTRAP_SERVERS: "kafka3:9092,kafka2:9092,kafka1:9092"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.100
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
kafka3:
condition: service_healthy

networks:
aiokafka-test-network:
ipam:
driver: default
config:
- subnet: 172.16.23.0/24
ip_range: 172.28.23.0/24
10 changes: 10 additions & 0 deletions docker/aiokafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.12

WORKDIR /opt/project

COPY setup.py pyproject.toml requirements-* /opt/project/

RUN pip install -r requirements-ci.txt

COPY aiokafka /opt/project/aiokafka
COPY tests /opt/project/tests
21 changes: 21 additions & 0 deletions docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM ubuntu:22.04

RUN apt-get update && \
apt-get install -y --no-install-recommends default-jre wget && \
rm -rf /var/lib/apt/lists/*

ENV PATH="/opt/kafka/bin:${PATH}"
WORKDIR /opt/kafka

# API like CreateTopics are redirected automatically to the controller starting to 2.8.0
# https://issues.apache.org/jira/browse/KAFKA-10181
# To reproduce the error, we must use a previous version of kafka
ARG SCALA_VERSION=2.13
ARG KAFKA_VERSION=3.3.0

RUN wget -q -O kafka.tgz "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \
&& tar xfvz kafka.tgz --strip 1 \
&& rm -rf kafka.tgz site-docs

COPY start-broker.sh /opt/kafka/bin/
COPY base-server.properties /opt/kafka/config
13 changes: 13 additions & 0 deletions docker/kafka/base-server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
21 changes: 21 additions & 0 deletions docker/kafka/start-broker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash


echo "
broker.id=${BROKER_ID:-0}
process.roles=broker,controller
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://$(hostname -i):9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
controller.listener.names=CONTROLLER
offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1}
" > config/runtime.properties

cat config/base-server.properties config/runtime.properties > config/server.properties

if [ ! -e "/tmp/kafka-logs/meta.properties" ]; then
kafka-storage.sh format --config config/server.properties --cluster-id "YPKJRKEhT06jEqGlBQar5A"
fi

exec kafka-server-start.sh config/server.properties
Empty file added tests/manual/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions tests/manual/topic_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import os

Check warning on line 2 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from aiokafka.admin import AIOKafkaAdminClient, NewTopic

Check warning on line 4 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L4

Added line #L4 was not covered by tests


async def main() -> None:
client = AIOKafkaAdminClient(bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"])
await client.start()
try:

Check warning on line 10 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L7-L10

Added lines #L7 - L10 were not covered by tests
for i in range(20):
topic = f"test-{i}"
print("Creating topic:", topic)
await client.create_topics(

Check warning on line 14 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L12-L14

Added lines #L12 - L14 were not covered by tests
[NewTopic(name=topic, num_partitions=3, replication_factor=2)]
)
await asyncio.sleep(1)
print("Deleting topic:", topic)
await client.delete_topics([topic])
await asyncio.sleep(1)

Check warning on line 20 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L17-L20

Added lines #L17 - L20 were not covered by tests
finally:
await client.close()

Check warning on line 22 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L22

Added line #L22 was not covered by tests


if __name__ == "__main__":
# Start the asyncio loop by running the main function
asyncio.run(main())

Check warning on line 27 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L27

Added line #L27 was not covered by tests
Loading