Skip to content

Commit

Permalink
Add connection open/close callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
poochie10 authored and oded-zahavi committed Sep 16, 2019
1 parent 7253e21 commit 61bfdf9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGES
-------

552.feature
^^^^^^^^^^^

Add new callbacks for connection opened/close for consumer and producer

523.feature
^^^^^^^^^^^

Expand Down
22 changes: 22 additions & 0 deletions aiokafka/client.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
self._md_update_fut = None
self._md_update_waiter = create_future(loop=self._loop)
self._get_conn_lock = asyncio.Lock(loop=loop)
self._on_connection_closed_callback = None
self._on_connection_opened_callback = None

def __repr__(self):
return '<AIOKafkaClient client_id=%s>' % self._client_id
Expand Down Expand Up @@ -371,7 +373,24 @@ def set_topics(self, topics):
self._topics = set(topics)
return res

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self._on_connection_closed_callback = on_connection_closed_callback

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self._on_connection_opened_callback = on_connection_opened_callback

def _on_connection_closed(self, conn, reason):
if self._on_connection_closed_callback:
self._on_connection_closed_callback(conn, reason)

""" Callback called when connection is closed
"""
# Connection failures imply that our metadata is stale, so let's
Expand Down Expand Up @@ -441,6 +460,9 @@ async def _get_conn(
self.force_metadata_update()
return None
else:
if self._on_connection_opened_callback:
self._on_connection_opened_callback(self._conns[conn_id])

return self._conns[conn_id]

async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT):
Expand Down
15 changes: 15 additions & 0 deletions aiokafka/consumer/consumer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,21 @@ def __del__(self, _warnings=warnings):
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self._client.set_on_connection_closed_callback(on_connection_closed_callback)

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self._client.set_on_connection_opened_callback(on_connection_opened_callback)


async def start(self):
""" Connect to Kafka cluster. This will:
Expand Down
14 changes: 14 additions & 0 deletions aiokafka/producer/producer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ async def send_offsets_to_transaction(self, offsets, group_id):
fut = self._txn_manager.add_offsets_to_txn(formatted_offsets, group_id)
await asyncio.shield(fut, loop=self._loop)

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self.client.set_on_connection_closed_callback(on_connection_closed_callback)

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self.client.set_on_connection_opened_callback(on_connection_opened_callback)


class TransactionContext:

Expand Down

0 comments on commit 61bfdf9

Please sign in to comment.