Simple Python client to ksqlDB
This code is inspired from the work of @sauljabin taken from here.
To connect to a running ksqlDB
server (in this example running on localhost
on port 8088
):
from pyksql.ksql import KSQL
ksql = KSQL("http://localhost:8088")
General information about the ksqlDB
server can be obtained like so:
print("info", ksql.info())
To list the Kafka topics, defined streams and tables:
print("topics", ksql.topics())
print("streams", ksql.streams())
print("tables", ksql.tables())
To send a query to the ksqlDB
server:
asyncio.run(
ksql.query(
"SELECT * FROM services;",
on_init=print,
on_new_row=print,
on_close=lambda: print("No more entries"),
)
)
To stream the content of a ksql TABLE services
to a pandas DataFrame:
df = asyncio.run(
ksql.query_to_dataframe("SELECT * FROM services;")
)
To run queries in a Jupyter
NoteBook use:
await ksql.query(
"SELECT * FROM services;",
on_init=print,
on_new_row=print,
on_close=lambda: print("No more entries"),
)
and
df = await ksql.query_to_dataframe(SELECT * FROM services;)
To insert some rows into a ksql data stream named data_tags_stream
use:
rows = [
{
"TAG_ID": "user_tag",
"TAG_VALUE": "my tag",
},
]
ksql.insert_into_stream('data_tags_stream', rows)
where as many rows as needed can be inserted at the same time.
Create a local clone of this repository and in the root directory of the cloned code run:
pip install .
or install it directly from GitHub:
pip install git+https://github.com/rwuthric/PyKSQL.git
Use
pip uninstall pyksql