Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
feat: allow user to set callback for simple produce call (#23)
Browse files Browse the repository at this point in the history
* feat: allow user to set callback for simple produce call
  • Loading branch information
shawnsarwar authored Jun 11, 2020
1 parent 375d102 commit 3e1a4f3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
14 changes: 8 additions & 6 deletions aet/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ def get_broker_info(kclient, scope='all'):
else:
errstr = ""

t_str.append("partition {} leader: {}, replicas: {}, isrs: {}".format(
p.id, p.leader, p.replicas, p.isrs, errstr))
t_str.append(
f"partition {p.id} leader: {p.leader}, "
f"replicas: {p.replicas}, isrs: {p.isrs}, err: {errstr}"
)
res['topics'].append(t_str)
if scope in res.keys():
return res[scope]
Expand Down Expand Up @@ -143,8 +145,9 @@ def kafka_callback(err=None, msg=None, _=None, **kwargs):
LOG.error(f'NO-SAVE: {_id} in | err {err.name()}')


def produce(docs, schema, topic_name, producer):

def produce(docs, schema, topic_name, producer, callback=None):
if not callback:
callback = kafka_callback
with io.BytesIO() as bytes_writer:
writer = DataFileWriter(
bytes_writer, DatumWriter(), schema, codec='deflate')
Expand All @@ -159,15 +162,14 @@ def produce(docs, schema, topic_name, producer):
# Message doesn't have the proper format for the current schema.
LOG.debug(
f"SCHEMA_MISMATCH:NOT SAVED! TOPIC:{topic_name}, ID:{_id}")

writer.flush()
raw_bytes = bytes_writer.getvalue()

producer.poll(0)
producer.produce(
topic_name,
raw_bytes,
callback=kafka_callback,
callback=callback,
headers={
'avro_size': str(len(_ids)),
'contains_id': json.dumps(_ids)
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def mocked_consumer():
consumer.stop()
sleep(.5)


# API Assets
@pytest.mark.unit
@pytest.fixture(scope="module")
Expand Down

0 comments on commit 3e1a4f3

Please sign in to comment.