Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ERROR WorkerSinkTask Commit of offsets threw an unexpected exception for sequence number... #1026

Open
jmks opened this issue Dec 9, 2024 · 3 comments

Comments

@jmks
Copy link

jmks commented Dec 9, 2024

We're running SnowflakeSinkConnector version 2.5.0 using Snowflake streaming and had this error happen:

image
[2024-11-22 21:25:36,487] ERROR WorkerSinkTask{id=pm-prod-snowflake-1} Commit of offsets threw an unexpected exception for sequence number 3: ... list of 423 channel offsets

We only had it happen on that day (Nov 22) and it's since been fine.

I've found a couple issues with this error message but they did not have a resolution.
Has anyone seen this or know any way to help prevent it?

@sfc-gh-rsawicki
Copy link
Contributor

Hi @jmks, thanks for using our connector.
Could you provide the full message for the other visible error? The "Task threw and uncaught and unrecoverable exception" one. Are there any stack traces visible in those errors?

And for the error you provided is the list of channel offsets the only visible data or is there something else that might have been trimmed out?

@jmks
Copy link
Author

jmks commented Jan 3, 2025

Hi @sfc-gh-rsawicki

Thanks for looking into this.

Could you provide the full message for the other visible error?

[2024-11-22 20:43:02,278] ERROR WorkerSinkTask{id=pm-prod-snowflake-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.IllegalStateException: No more unfilled consumers to be assigned.
	at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.assignRoundRobin(AbstractStickyAssignor.java:764)
	at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:647)
	at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
	at org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assignPartitions(CooperativeStickyAssignor.java:111)
	at org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:703)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:736)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:112)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:640)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:603)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1270)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1245)

This happened much later and sounds like a rebalance did not go well. I'm not familiar with this error as we only recently started using the sticky assignments (to speed up rebalances with many topic-partitions).

And for the error you provided is the list of channel offsets the only visible data or is there something else that might have been trimmed out?

There were a coupld extra lines filtered out, and I found some WARN that lead to this exception.
This sequence of WARN and ERROR happened to each of our four workers:

[2024-11-22 18:22:56,708] WARN [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Received unknown topic or partition error in fetch for partition aws.pm.prod.cdc.policy_configurations.0-10 (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2024-11-22 18:22:56,719] WARN [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Received unknown topic or partition error in fetch for partition aws.pm.prod.cdc.policy_configurations.0-6 (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2024-11-22 18:22:56,720] WARN [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Received unknown topic or partition error in fetch for partition aws.pm.prod.cdc.policy_configurations.0-2 (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2024-11-22 18:22:56,741] WARN [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Offset commit failed on partition aws.pm.prod.cdc.policy_configurations.0-10 at offset 6: This server does not host this topic-partition. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-11-22 18:22:56,742] ERROR WorkerSinkTask{id=pm-prod-snowflake-3} Commit of offsets threw an unexpected exception for sequence number 14574: {aws.pm.prod.cdc.policy_configurations.0-10=OffsetAndMetadata{offset=6, leaderEpoch=null, metadata=''}, aws.pm.prod.cdc.policy_configurations.0-6=OffsetAndMetadata{offset=19, leaderEpoch=null, metadata=''}, aws.pm.prod.cdc.policy_configurations.0-2=OffsetAndMetadata{offset=7, leaderEpoch=null, metadata=''}, ...} (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

So it seems like this worker got some unexpected topic-partitions as it shows up in their metadata. When it rebalances, those topics are removed from its assignment:

[2024-11-22 18:22:57,055] INFO [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Revoke previously assigned partitions aws.pm.prod.cdc.policy_configurations.0-2, aws.pm.prod.cdc.policy_configurations.0-6, aws.pm.prod.cdc.policy_configurations.0-10 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-11-22 18:22:58,363] INFO [Consumer clientId=connector-consumer-pm-prod-snowflake-3, groupId=connect-pm-prod-snowflake] Updating assignment with
	Assigned partitions:                       [ No policy_configurations topic-partition in here ]

Here are the logs mentioning this worker: https://gist.github.com/jmks/c228bd6df68f3383c8d32b5d59a13048

So this is what the logs are telling me:

  1. Task fetched an offset for a topic-partition it was not assigned
  2. It tried to commit that offset, which failed
  3. It restarted with an error
  4. During rebalance, the unexpected topic-partitions were removed
  5. Task runs as usual

Over the next few hours we had this happen a few times. It even happened to multiple tasks at the same time, then the logs get muddied with rebalances, slow startups, etc.

I'm not sure what caused this worker to fetch offsets for a topic-partition it was not assigned. I could not find a log for aws.pm.prod.cdc.policy_configurations.0-10 in the previous week.

The last thing to note is we get a lot of these (even today):

[2024-11-22 18:01:00,028] WARN WorkerSinkTask{id=pm-prod-snowflake-3} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)

My understanding is that it fails to commit all the offsets it has, but still makes progress. The large number of topic-partitions probably causes commits to be slow, but we could increase the timeout to give it more time.

@sfc-gh-rsawicki
Copy link
Contributor

Hi @jmks, thanks for more details.

I'm not sure what caused this worker to fetch offsets for a topic-partition it was not assigned. I could not find a log for aws.pm.prod.cdc.policy_configurations.0-10 in the previous week.

Is it possible that there were some new topics-partitions created at that time? From what I know topic creation is not guaranteed to be an atomic operation, especially in multi node environments and most of the libraries for Kafka do such operations asynchronously. Since you couldn't find logs for this topic-partition before is it possible that it was a new one and connector might have hit a case where topic/partition creation was still being propagated throughout the environment?

My understanding is that it fails to commit all the offsets it has, but still makes progress. The large number of topic-partitions probably causes commits to be slow, but we could increase the timeout to give it more time.

Increasing timeout sounds reasonable especially if you have a lot of topics/partitions, however it will be difficult to say anything more on that matter without knowing the specifics of you setup and configuration.

If the issue reoccurs more often I would recommend to reach out to Snowflake support directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants