Skip to content

Commit

Permalink
Fix kafka reader. (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
moromimay authored Apr 14, 2021
1 parent 43392f4 commit 0f31164
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion butterfree/extract/readers/kafka_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def consume(self, client: SparkClient) -> DataFrame:
"""
# read using client and cast key and value columns from binary to string
raw_df = (
client.read(format="kafka", options=self.options, stream=self.stream)
client.read(format="kafka", stream=self.stream, **self.options)
.withColumn("key", col("key").cast("string"))
.withColumn("value", col("value").cast("string"))
)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev8"
__version__ = "1.2.0.dev9"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/butterfree/extract/readers/test_kafka_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_consume(

# assert
spark_client.read.assert_called_once_with(
format="kafka", options=options, stream=kafka_reader.stream
format="kafka", stream=kafka_reader.stream, **options
)
assert_dataframe_equality(target_df, output_df)

Expand Down

0 comments on commit 0f31164

Please sign in to comment.