Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve message has error in logs #470

Open
bingkunyangvungle opened this issue Dec 14, 2023 · 3 comments
Open

Retrieve message has error in logs #470

bingkunyangvungle opened this issue Dec 14, 2023 · 3 comments

Comments

@bingkunyangvungle
Copy link

bingkunyangvungle commented Dec 14, 2023

What happened?

After I try to retrieve the message from client(maven kafka-clients with version 3.4.0), it can get the messages, but on the server side, it generate the following error:

[2023-12-14 14:23:49,286] ERROR Error occurred while reading the remote data for user_s3-0 (kafka.log.remote.RemoteLogReader)
org.apache.kafka.common.KafkaException: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:379)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:342)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.createCacheEntry(RemoteIndexCache.java:375)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$getIndexEntry$6(RemoteIndexCache.java:365)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.getIndexEntry(RemoteIndexCache.java:364)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lookupOffset(RemoteIndexCache.java:436)
at kafka.log.remote.RemoteLogManager.lookupPositionForOffset(RemoteLogManager.java:1326)
at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1272)
at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:62)
at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:31)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:532)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.lambda$fetchIndex$5(ClassLoaderAwareRemoteStorageManager.java:89)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.withClassLoader(ClassLoaderAwareRemoteStorageManager.java:66)
at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.fetchIndex(ClassLoaderAwareRemoteStorageManager.java:89)
at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:377)
... 19 more
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:87)
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchSegmentManifest(RemoteStorageManager.java:552)
at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:507)
... 23 more
Caused by: java.lang.InterruptedException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:69)
... 25 more

What did you expect to happen?

The server side should have no error in logs.

What else do we need to know?

When I used the released package, it not only generated error, but the client couldn't get the messages as well.
I tried with self-built packages with this commit e06cae8 in main branch, it also generated error, but the client can get the messages.

@ivanyu
Copy link
Contributor

ivanyu commented Dec 28, 2023

Hi @bingkunyangvungle
Thanks for reporting this.
This is a know issue originating from how the broker side handles things. And we're working on a workaround on the plugin level and also on fixing it on the broker side.
For now, please try increasing fetch.max.wait.ms (e.g. 2 seconds).

@bingkunyangvungle
Copy link
Author

Thank you @ivanyu for the follow-up. I'll try modifying the configuration for now.

@jeqo
Copy link
Contributor

jeqo commented Jan 23, 2024

I have created #483 to track the resolution of this issue on the TS framework.

@bingkunyangvungle we have recently added an async cache #472 for indexes that has solved the last scenario we've identified that this exception could lead to blocked consumers. Even though will still be thrown, async caches should allow the consumer to eventually read values locally and progress.

Let us know if @ivanyu suggestion was enough for your environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants