Skip to content

Commit

Permalink
feat: add multi broadcast, optimize listen
Browse files Browse the repository at this point in the history
  • Loading branch information
Undertone0809 committed Jan 11, 2023
1 parent 01fffa0 commit 20a4006
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 38 deletions.
68 changes: 41 additions & 27 deletions broadcast_service/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def __init__(self):

# function renaming
self.subscribe = self.listen
self.subscribe_all = self.listen_all
self.publish = self.broadcast
self.unsubscribe = self.stop_listen

self.on = self.listen
self.on_all = self.listen_all
self.publish = self.broadcast
self.emit = self.broadcast
self.unsubscribe = self.stop_listen
self.off = self.stop_listen
self.on_subscribe = self.on_listen
self.subscribe_all = self.listen_all
self.publish_all = self.broadcast_all

def listen(self, topics: str or List[str], callback: Callable):
"""
Expand All @@ -83,28 +83,33 @@ def listen(self, topics: str or List[str], callback: Callable):
"issue to https://github.com/Undertone0809/broadcast-service/issues")

def listen_all(self, callback: Callable):
"""
'__all__' is a special topic. It can receive any topic message.
"""
self._listen_topic('__all__', callback)

def broadcast(self, topic_name: str, *args, **kwargs):
def broadcast(self, topics: str or List[str], *args, **kwargs):
"""
Launch broadcast on the specify topic
"""
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

for item in self.pubsub_channels[topic_name]:
if self.enable_async:
self.thread_pool.submit(
item, *args, **kwargs)
else:
item(*args, **kwargs)
if type(topics) == str:
self._broadcast_topic(topics, *args, **kwargs)
elif type(topics) == list:
for topic in topics:
self._broadcast_topic(topic, *args, **kwargs)
else:
raise ValueError("Unknown broadcast-service error, please submit "
"issue to https://github.com/Undertone0809/broadcast-service/issues")

for item in self.pubsub_channels['__all__']:
if self.enable_async:
self.thread_pool.submit(
item, *args, **kwargs)
else:
item(*args, **kwargs)
def broadcast_all(self, *args, **kwargs):
"""
All topics listened on will be called back.
Attention: Not all callback function will called. If your publish
and your subscribe takes different arguments, your callback function
will not be executed.
"""
for topic in self.pubsub_channels.keys():
self._broadcast_topic(topic, *args, **kwargs)

def _listen_topic(self, topic_name: str, callback: Callable):
if topic_name not in self.pubsub_channels.keys():
Expand All @@ -114,6 +119,10 @@ def _listen_topic(self, topic_name: str, callback: Callable):
self.pubsub_channels[topic_name].append(callback)

def _broadcast_topic(self, topic_name: str, *args, **kwargs):
"""
broadcast single topic.
TODO fix problem: There is no guarantee that every callback function will be executed unnecessarily in some cases.
"""
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

Expand All @@ -139,12 +148,16 @@ def stop_listen(self, topic_name: str, callback: Callable):
else:
self.pubsub_channels[topic_name].remove(callback)

def on_listen(self, topics: Optional[List[str]] = None) -> Callable:
def on_listen(self, topics: str or Optional[List[str]] = None) -> Callable:
"""
Decorator to listen specify topic. If topics is none, then listen all topics has exits.
:param topics: topic list, you can input topic like: ["topic1", "topic2"].
Usage::
@broadcast_service.on_listen('topic1')
def handle_all_msg():
# your code
@broadcast_service.on_listen(['topic1'])
def handle_all_msg():
# your code
Expand All @@ -161,11 +174,12 @@ def handle_all_msg(*args, **kwargs):
the callback function you handle should take arguments, otherwise it will not be called back.
"""
def decorator(fn: Callable) -> Callable:
if topics is not None:
for topic in topics:
self.listen(topic, fn)
else:
self.listen('__all__', fn)
if not topics:
self._listen_topic('__all__', fn)
elif type(topics) == str:
self._listen_topic(topics, fn)
elif type(topics) == list:
self.listen(topics, fn)

def inner(*args, **kwargs) -> Callable:
ret = fn(*args, **kwargs)
Expand Down
75 changes: 64 additions & 11 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ def handle_topic_specify_params(a, b, c):
self.test_listen_of_common_specify_params = True

def handle_multi_topics(*args, **kwargs):
print('here')
print(args)
self.counter += 1
if args[0] == 111:
self.test_listen_multi_topic1_of_common = True
Expand Down Expand Up @@ -82,14 +80,25 @@ def handle_all_topics(*args, **kwargs):
self.assertEqual(4, self.all_counter)

def test_listen_of_decorator(self):
self.test_listen_of_decorator_no_params = False
self.test_listen_of_decorator_no_params1 = False
self.test_listen_of_decorator_no_params2 = False
self.test_listen_of_decorator_multi_params = False
self.test_listen_of_decorator_specify_params = False
self.test_listen_of_decorator_listen_all = False
self.counter = 0

@broadcast_service.on_listen(["test_listen_of_decorator_no_params"])
@broadcast_service.on_listen("test_listen_of_decorator_no_params1")
def handle_topic_no_params():
self.test_listen_of_decorator_no_params = True
self.test_listen_of_decorator_no_params1 = True

@broadcast_service.on_listen(["test_listen_of_decorator_no_params2"])
def handle_topic_no_params():
self.test_listen_of_decorator_no_params2 = True

@broadcast_service.on_listen(["test_listen_of_decorator_no_params1","test_listen_of_decorator_no_params2"])
def handle_topic_no_params():
if self.test_listen_of_decorator_no_params1 and self.test_listen_of_decorator_no_params2:
self.test_listen_of_decorator_multi_params = True

@broadcast_service.on_listen(["test_listen_of_decorator_specify_params"])
def handle_topic_specify_params(a, b, c):
Expand All @@ -103,9 +112,16 @@ def handle_listen_all_topics(*args, **kwargs):
self.counter += 1
self.test_listen_of_decorator_listen_all = True

broadcast_service.publish("test_listen_of_decorator_no_params")
broadcast_service.publish("test_listen_of_decorator_no_params1")
wait()
self.assertTrue(self.test_listen_of_decorator_no_params1)

broadcast_service.publish("test_listen_of_decorator_no_params2")
wait()
self.assertTrue(self.test_listen_of_decorator_no_params2)

wait()
self.assertTrue(self.test_listen_of_decorator_no_params)
self.assertTrue(self.test_listen_of_decorator_multi_params)

broadcast_service.publish(
"test_listen_of_decorator_specify_params", 11, 22, 33)
Expand All @@ -115,7 +131,7 @@ def handle_listen_all_topics(*args, **kwargs):
broadcast_service.publish("test_listen_of_decorator_listen_all")
wait()
self.assertTrue(self.test_listen_of_decorator_listen_all)
self.assertEqual(3, self.counter)
self.assertEqual(4, self.counter)

def test_listen_of_lambda(self):
self.test_listen_of_lambda_no_params = False
Expand All @@ -140,7 +156,44 @@ def handle_topic_specify_params(params: bool):
self.assertTrue(self.test_listen_of_lambda_no_params)

def test_broadcast(self):
pass
self.test_broadcast_one_topic1 = False
self.test_broadcast_one_topic2 = False
self.test_broadcast_multi_topic1 = False
self.test_broadcast_multi_topic2 = False
self.counter = 0

@broadcast_service.on_listen("test_broadcast_one_topic1")
def handle_one_topic1():
self.counter += 1
self.test_broadcast_one_topic1 = True

@broadcast_service.on_listen("test_broadcast_one_topic2")
def handle_one_topic2():
self.counter += 1
self.test_broadcast_one_topic2 = True

def test_close(self):
pass
@broadcast_service.on_listen("test_broadcast_multi_topic1")
def handle_multi_topic1():
self.counter += 1
self.test_broadcast_multi_topic1 = True

@broadcast_service.on_listen("test_broadcast_multi_topic2")
def handle_multi_topic2():
self.counter += 1
self.test_broadcast_multi_topic2 = True

broadcast_service.publish("test_broadcast_one_topic1")
wait()
self.assertTrue(self.test_broadcast_one_topic1)

broadcast_service.publish(["test_broadcast_one_topic2"])
wait()
self.assertTrue(self.test_broadcast_one_topic2)

broadcast_service.publish(["test_broadcast_multi_topic1", "test_broadcast_multi_topic2"])
wait()
self.assertTrue(self.test_broadcast_multi_topic1 and self.test_broadcast_multi_topic2)

broadcast_service.publish_all()
wait()
self.assertEqual(8, self.counter)

0 comments on commit 20a4006

Please sign in to comment.