{"payload":{"feedbackUrl":"https://github.com/orgs/community/discussions/53140","repo":{"id":503016065,"defaultBranch":"master","name":"kafka-monitor","ownerLogin":"ovotech","currentUserCanPush":false,"isFork":true,"isEmpty":false,"createdAt":"2022-06-13T15:37:39.000Z","ownerAvatar":"https://avatars.githubusercontent.com/u/3543199?v=4","public":true,"private":false,"isOrgOwned":true},"refInfo":{"name":"","listCacheKey":"v0:1655478832.6344478","currentOid":""},"activityList":{"items":[{"before":"e4467c8ae58e1fb0e6f53432aa55a4a2c28730ec","after":"043db6419d1638ea9f8f0b8262950fba576c2806","ref":"refs/heads/master","pushedAt":"2024-09-03T13:07:42.000Z","pushType":"push","commitsCount":9,"pusher":{"login":"yarinwinter","name":null,"path":"/yarinwinter","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/47352047?s=80&v=4"},"commit":{"message":"ConsumeService: fix client closing causing `ConcurrentModificationException`\n\n## Problem\n\n- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid\n- as docummented in _kafka consumer docs_[^1]\n\n> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.\n\nThe exception thrown\n```\n2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.\njava.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service\n        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]\n```\n\n## Solution\n\nThe recommended solution[^1] is\n- to use `consumer.wakeup();` method\n- but the method is not yet adopted by the `KMBaseConsumer` interface\n- so for now `_baseConsumer.close()` is moved into the thread\n- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits\n\n[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)\n\n## Testing Done\n\nIncreased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.\n\n`- ./gradlew test`","shortMessageHtmlLink":"ConsumeService: fix client closing causing `ConcurrentModificationExc…"}}],"hasNextPage":false,"hasPreviousPage":false,"activityType":"all","actor":null,"timePeriod":"all","sort":"DESC","perPage":30,"cursor":"Y3Vyc29yOnYyOpK7MjAyNC0wOS0wM1QxMzowNzo0Mi4wMDAwMDBazwAAAASrl0-g","startCursor":"Y3Vyc29yOnYyOpK7MjAyNC0wOS0wM1QxMzowNzo0Mi4wMDAwMDBazwAAAASrl0-g","endCursor":"Y3Vyc29yOnYyOpK7MjAyNC0wOS0wM1QxMzowNzo0Mi4wMDAwMDBazwAAAASrl0-g"}},"title":"Activity · ovotech/kafka-monitor"}