freeza-offset is a Python package that provides a simple way to commit the offset consumed by Spark Streaming in Kafka's ConsumerGroup, but this commit is only for consum tracking.
Here are just a few of the things that freeza-offset does well:
- Commits the offset consumed in kafka
- Tracking Spark consumption lag at Kafka
- The offset is not just in control of the spark
The source code is currently hosted on GitHub at: https://github.com/HashLoad/freeza-offset
Binary installers for the latest released version are available at the Python package index and on conda.
# conda
conda install freeza-offset
# PyPI
pip install freeza-offset
# Databricks
dbutils.library.installPyPI("freeza-offset")
In the freeza-offset
directory (same one where you found this file after
cloning the git repo), execute:
python setup.py install
pip install freeza-offset
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("FreezaCommitTest") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") \
.option("subscribe", "topic-name") \
.option("startingOffsets", "earliest") \
.option("kafka.group.id", "spark-freeza-runner") \
.load()
df.selectExpr("key", "value")
qry = df.writeStream \
.format("console") \
.option("truncate","false") \
.start()
import freeza
tr = freeza.start_commiter_thread(
query=qry,
bootstrap_servers=bootstrap_servers,
group_id="spark-freeza-commiter"
)
tr.isAlive()
For usage questions, the best place to go to is open new issue
All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.