Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch does not use serializers #886

Closed
ydjin0602 opened this issue May 1, 2023 · 1 comment
Closed

batch does not use serializers #886

ydjin0602 opened this issue May 1, 2023 · 1 comment

Comments

@ydjin0602
Copy link
Contributor

ydjin0602 commented May 1, 2023

Describe the bug
When I use a batch producer with the key_serializer and value_serializer keys, the key and value are not serialized in any way, and therefore I get a TypeError.
When sending a single message via producer.send(), the serialization works as it should.

Environment (please complete the following information):

  • aiokafka version 0.8.0
  • kafka-python version 2.0.2
  • Kafka Broker version: I use docker image confluentinc/cp-kafka:latest on my pc

Reproducible example

def _serializer(value):
      return json.dumps(value).encode("utf-8")

def _key_serializer(value):
      return value.encode("utf-8")

async def send_many_messages(
        self,
        messages: list[BaseSchema],
        topic: str,
        key: str,
        headers: List[Tuple[str, bytes]],
    ) -> None:
        producer = AIOKafkaProducer(
            bootstrap_servers=[f"{CONFIG.KAFKA_HOST}:{CONFIG.KAFKA_PORT}"],
            enable_idempotence=True,
            key_serializer=self._key_serializer,
            value_serializer=self._serializer,
        )
        try:
            await producer.start()
            batch = producer.create_batch()
            for message in messages:
                metadata = batch.append(
                    key=key,
                    headers=headers,
                    value=message,
                    timestamp=None,
                )
                if metadata is None:
                    partitions = await producer.partitions_for(topic=topic)
                    partition = random.choice(tuple(partitions))
                    await producer.send_batch(
                        batch=batch,
                        topic=topic,
                        partition=partition,
                    )
                    logger.info(
                        f"{batch.record_count()} сообщений отправлено в "
                        f"Kafka топик {topic}!"
                    )
                    batch = producer.create_batch()
                    continue

            partitions = await producer.partitions_for(topic=topic)
            partition = random.choice(tuple(partitions))
            await producer.send_batch(
                batch=batch,
                topic=topic,
                partition=partition,
            )
            logger.info(
                f"{batch.record_count()})
        except (ValueError, KafkaConnectionError):
            raise KafkaConnectionError(
                message="some error"
            )
        finally:
            await producer.stop()
@ods
Copy link
Collaborator

ods commented Jan 15, 2024

Fixed in #887

@ods ods closed this as completed Jan 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants