Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka output might lose connection and not recover #37276

Open
belimawr opened this issue Dec 4, 2023 · 13 comments
Open

Kafka output might lose connection and not recover #37276

belimawr opened this issue Dec 4, 2023 · 13 comments
Assignees
Labels
bug Team:Elastic-Agent Label for the Agent team

Comments

@belimawr
Copy link
Contributor

belimawr commented Dec 4, 2023

  • Version: main
  • Operating System: Tested on Linux, likely all OSes

Steps to reproduce

  1. Create a Kafka cluster (docker-compose below)
  2. Create some log files to be ingested, I use flog. flog -d1 -s1 -l > /tmp/flog.log
  3. Start Filebeat (configuration below)
  4. Stop different Kafka nodes until Filebeat stops sending events to the output
    Log entries like this one will stop
    {"log.level":"debug","@timestamp":"2023-12-04T12:15:27.431+0100","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*msgRef).dec","file.name":"kafka/client.go","file.line":392},"message":"finished kafka batch","service.name":"filebeat","ecs.version":"1.6.0"}
    

For the files below bear in mind you will have to add your local IP address on those files.

filebeat.yml

filebeat.inputs:
    - id: my-log-input
      paths:
        - /tmp/flog.log
      type: log
output:
    kafka:
        broker_timeout: 30
        compression: none
        hosts:
            - <YOUR IP>:9091
        partition:
            random:
                group_events: 1
        required_acks: 1
        timeout: 30
        topics:
            - topic: my-topic-three
        type: kafka
        version: 2.6.0
queue.mem:
  flush.timeout: 2s
logging:
  level: debug
  selectors:
    - kafka

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka2:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 2
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper 
  kafka3:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:19091,kafka2:19092,kafka3:19093"
    depends_on:
      - kafka1
      - kafka2
      - kafka3

Tutorial on running a Kafka cluster with Docker: https://betterprogramming.pub/a-simple-apache-kafka-cluster-with-docker-kafdrop-and-python-cf45ab99e2b9

I have also seen reports of users reproducing a similar behaviour with Filebeat v8.8.1 and using HAProxy between the Kafka nodes and Filebeat. Steps:

  1. Configure Filebeat use HAProxy when connecting to Kafka (one HAProxy per Kafka brokers)
  2. Start the Filebeat (logs are sent to Kafka)
  3. Stop the one of the Kafka brokers
  4. Filebeat stops sending data to Kafka
  5. Restart Filebeat and it will start sending data to Kafka.
@belimawr belimawr added bug Team:Elastic-Agent Label for the Agent team labels Dec 4, 2023
@ycombinator
Copy link
Contributor

I have also seen reports of users reproducing a similar behaviour with Filebeat v8.8.1 and using HAProxy between the Kafka nodes and Filebeat. Steps:

  1. Configure Filebeat use HAProxy when connecting to Kafka (one HAProxy per Kafka brokers)
  2. Start the Filebeat (logs are sent to Kafka)
  3. Stop the one of the Kafka brokers
  4. Filebeat stops sending data to Kafka
  5. Restart Filebeat and it will start sending data to Kafka.

Also worth noting that this issue is not reproducible with Filebeat v7.17.1. One of the changes between these versions is the version of the Kafka client that Beats is using:

v7.17.1: https://github.com/elastic/beats/blob/v7.17.1/go.mod#L290
v8.8.1: https://github.com/elastic/beats/blob/v8.8.1/go.mod#L363

It looks like this line was changed in v8.2.0 (PR).

@amitkanfer
Copy link
Collaborator

@jlind23 should we add this to our test cases with our partner?

@botelastic
Copy link

botelastic bot commented Dec 12, 2024

Hi!
We just realized that we haven't looked into this issue in a while. We're sorry!

We're labeling this issue as Stale to make it hit our filters and make sure we get back to it as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1.
Thank you for your contribution!

@botelastic botelastic bot added the Stalled label Dec 12, 2024
@blakerouse
Copy link
Contributor

The difference between the two are here: https://github.com/elastic/sarama/compare/11c3ef800752..ebc2b0d8eef3

It is possible this change of what errors are considered unrecoverable results in the connection not being re-established once it hits one of these errors.

@botelastic botelastic bot removed the Stalled label Dec 27, 2024
@strawgate
Copy link
Contributor

Interestingly, I am unable to reproduce this with the provided docker-compose and filebeat yml. On 8.8.1 and on main. When the broker comes back online, publishing quickly resumes.

I did have to bump Kafka to 7.x to run on my arm64 laptop though so maybe that has something to do with it.

Do we know what version of Kafka the customer is running?

@belimawr
Copy link
Contributor Author

belimawr commented Jan 2, 2025

Do we know what version of Kafka the customer is running?

Unfortunately I do not know which version they were using :/

@shoulian-zhao
Copy link

@strawgate @belimawr
The Kafka is confluent_6.2.0 with jdk-8u212-linux-x64.tar.gz.

@belimawr belimawr self-assigned this Jan 6, 2025
@faec
Copy link
Contributor

faec commented Jan 6, 2025

It is possible this change of what errors are considered unrecoverable results in the connection not being re-established once it hits one of these errors.

That change only affects what errors are reported to the caller (Beats), not how retries are handled. Baseline behavior was the same, but reported all variations as a single generic error. But after these are reported Beats will still retry.

@belimawr
Copy link
Contributor Author

belimawr commented Jan 6, 2025

I've been trying to reproduce this problem, however I cannot reproduce, Filebeat has been able to recover from all errors I managed to produce.

The "worst" one I managed to reproduce puts my Kafka cluster in unstable state, the CLI tools that came with the Kafka installation produce the following error:

[2025-01-06 18:08:13,069] WARN [Producer clientId=console-producer] 1 partitions have leader brokers without a matching listener, including [filebeat-topic-1-0] (org.apache.kafka.clients.NetworkClient)

And Filebeat produces:

{"log.level":"debug","@timestamp":"2025-01-06T13:07:52.855-0500","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*msgRef).dec","file.name":"kafka/client.go","file.line":416},"message":"Kafka publish failed with: kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes","service.name":"filebeat","ecs.version":"1.6.0"}

However once the connection to the problematic brokers is restored, everything starts working again.

I'll test again with one of the versions used in the original reports to understand if it has been fixed or I'm just not managing to reproduce it any more.

@belimawr
Copy link
Contributor Author

belimawr commented Jan 6, 2025

Looking at the changes on our sarama fork, I can see the base version from sarama was updated from 1.29.1 to 1.43.3 on 17/11/2024. Then Beats was updated to use the new version on 29/11/2024. That seems to be what solved this issue.

@strawgate
Copy link
Contributor

I was unable to reproduce this with Filebeat 8.8.1, were you?

How do we know if it's fixed in the newer version if we can't reproduce in the older?

@belimawr
Copy link
Contributor Author

belimawr commented Jan 6, 2025

I was unable to reproduce this with Filebeat 8.8.1, were you?

I cannot reproduce it either. I tried with Filebeat v8.8.1, v8.10.3 and the main branch I fetched this morning.

How do we know if it's fixed in the newer version if we can't reproduce in the older?

We don't... We can close the issue because we cannot reproduce, if someone reports this issue again, we can re-open the issue and ask more details about the setup that reproduces the issue.

@belimawr
Copy link
Contributor Author

belimawr commented Jan 6, 2025

One thing I notice is that if Filebeat cannot connect to Kafka and I try to stop Filebeat, it just hangs there, the Kafka client does not stop, however if I restore the connection to the Kafka brokers, Filebeat eventually stops.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

No branches or pull requests

7 participants