Skip to content

Commit

Permalink
Merge pull request #7 from Undertone0809/feat-multi-broadcast
Browse files Browse the repository at this point in the history
feat: add multiple listen and multiple  broadcast option
  • Loading branch information
Undertone0809 authored Jan 11, 2023
2 parents 3a6169c + 20a4006 commit e2df444
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 67 deletions.
79 changes: 62 additions & 17 deletions broadcast_service/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Optional, List, Callable

__all__ = ['broadcast_service', 'BroadcastService']
Expand Down Expand Up @@ -61,27 +60,68 @@ def __init__(self):

# function renaming
self.subscribe = self.listen
self.publish = self.broadcast
self.unsubscribe = self.stop_listen

self.on = self.listen
self.publish = self.broadcast
self.emit = self.broadcast
self.unsubscribe = self.stop_listen
self.off = self.stop_listen
self.on_subscribe = self.on_listen
self.subscribe_all = self.listen_all
self.publish_all = self.broadcast_all

def listen(self, topic_name: str, callback: Callable):
""" listen topic """
def listen(self, topics: str or List[str], callback: Callable):
"""
listen topic.
"""
if type(topics) == str:
self._listen_topic(topics, callback)
elif type(topics) == list:
for topic in topics:
self._listen_topic(topic, callback)
else:
raise ValueError("Unknown broadcast-service error, please submit "
"issue to https://github.com/Undertone0809/broadcast-service/issues")

def listen_all(self, callback: Callable):
"""
'__all__' is a special topic. It can receive any topic message.
"""
self._listen_topic('__all__', callback)

def broadcast(self, topics: str or List[str], *args, **kwargs):
"""
Launch broadcast on the specify topic
"""
if type(topics) == str:
self._broadcast_topic(topics, *args, **kwargs)
elif type(topics) == list:
for topic in topics:
self._broadcast_topic(topic, *args, **kwargs)
else:
raise ValueError("Unknown broadcast-service error, please submit "
"issue to https://github.com/Undertone0809/broadcast-service/issues")

def broadcast_all(self, *args, **kwargs):
"""
All topics listened on will be called back.
Attention: Not all callback function will called. If your publish
and your subscribe takes different arguments, your callback function
will not be executed.
"""
for topic in self.pubsub_channels.keys():
self._broadcast_topic(topic, *args, **kwargs)

def _listen_topic(self, topic_name: str, callback: Callable):
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

if callback not in self.pubsub_channels[topic_name]:
# options = {
# 'callback_function': callback,
# }
self.pubsub_channels[topic_name].append(callback)

def broadcast(self, topic_name: str, *args, **kwargs):
def _broadcast_topic(self, topic_name: str, *args, **kwargs):
"""
Launch broadcast on the specify topic
broadcast single topic.
TODO fix problem: There is no guarantee that every callback function will be executed unnecessarily in some cases.
"""
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []
Expand All @@ -108,12 +148,16 @@ def stop_listen(self, topic_name: str, callback: Callable):
else:
self.pubsub_channels[topic_name].remove(callback)

def on_listen(self, topics: Optional[List[str]] = None) -> Callable:
def on_listen(self, topics: str or Optional[List[str]] = None) -> Callable:
"""
Decorator to listen specify topic. If topics is none, then listen all topics has exits.
:param topics: topic list, you can input topic like: ["topic1", "topic2"].
Usage::
@broadcast_service.on_listen('topic1')
def handle_all_msg():
# your code
@broadcast_service.on_listen(['topic1'])
def handle_all_msg():
# your code
Expand All @@ -130,11 +174,12 @@ def handle_all_msg(*args, **kwargs):
the callback function you handle should take arguments, otherwise it will not be called back.
"""
def decorator(fn: Callable) -> Callable:
if topics is not None:
for topic in topics:
self.listen(topic, fn)
else:
self.listen('__all__', fn)
if not topics:
self._listen_topic('__all__', fn)
elif type(topics) == str:
self._listen_topic(topics, fn)
elif type(topics) == list:
self.listen(topics, fn)

def inner(*args, **kwargs) -> Callable:
ret = fn(*args, **kwargs)
Expand Down
37 changes: 2 additions & 35 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- A publishing subscriber pattern can be built with a very simple syntax
- Support different application scenarios, such as asynchronous and synchronous
- Provide different syntax writing modes for lambda, callback functions, decorators, etc
- A callback function listens on multiple subscriptions

## Setup
```sh
Expand Down Expand Up @@ -58,41 +59,7 @@ if __name__ == '__main__':

```

- You can use `publish, emit, broadcast` to send your topic msg and use `listen, on, subscribe` to listen your topic msg.

- You can also add more arguments or no argument when you publish thr broadcast.
```python
from broadcast_service import broadcast_service

# subscribe topic
@broadcast_service.on_listen(['my_topic'])
def handle_msg(info, info2):
print(info)
print(info2)

if __name__ == '__main__':
info = 'This is very important msg'
info2 = 'This is also a very important msg.'

# publish broadcast
broadcast_service.publish('my_topic', info, info2)
```
```python
from broadcast_service import broadcast_service

# subscribe topic
@broadcast_service.on_listen(['my_topic'])
def handle_msg():
print('handle_msg callback')

if __name__ == '__main__':
# publish broadcast
broadcast_service.publish('Test')
```

You can use decorator to subscirbe your

Actually, you can see more example in [example](/example) and [tests](/tests).
**About more example, please see [Quick Start](/quickstart.md)**

## TODO
- optimize documents and show more examples.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

setuptools.setup(
name="broadcast_service",
version="1.1.7",
version="1.2.0",
author="Zeeland",
author_email="zeeland@foxmail.com",
description="A lightweight third-party broadcast library",
Expand Down
6 changes: 3 additions & 3 deletions tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


def handle():
time.sleep(2)
time.sleep(1)


class TestAsync(unittest.TestCase):
Expand All @@ -28,15 +28,15 @@ def test_async(self):
broadcast_service.listen("test_topic", handle)
broadcast_service.broadcast("test_topic")
used_time = time.time() - start_time
self.assertLessEqual(used_time, 2)
self.assertLessEqual(used_time, 1)

def test_sync(self):
start_time = time.time()
broadcast_service.enable_async = False
broadcast_service.listen("test_topic", handle)
broadcast_service.broadcast("test_topic")
used_time = time.time() - start_time
self.assertEqual(int(used_time), 2)
self.assertEqual(int(used_time), 1)


if __name__ == '__main__':
Expand Down
107 changes: 96 additions & 11 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
from broadcast_service import broadcast_service


def wait(seconds=0.5):
def wait(seconds=0.1):
time.sleep(seconds)


class TestBroadcast(TestCase):
def test_listen_of_common(self):
self.test_listen_of_common_no_params = False
self.test_listen_of_common_specify_params = False
self.test_listen_multi_topic1_of_common = False
self.test_listen_multi_topic2_of_common = False
self.counter = 0
self.all_counter = 0

def handle_topic_no_params():
self.test_listen_of_common_no_params = True
Expand All @@ -35,28 +39,66 @@ def handle_topic_specify_params(a, b, c):
self.assertEqual(33, c)
self.test_listen_of_common_specify_params = True

def handle_multi_topics(*args, **kwargs):
self.counter += 1
if args[0] == 111:
self.test_listen_multi_topic1_of_common = True
if args[0] == 222:
self.test_listen_multi_topic2_of_common = True

def handle_all_topics(*args, **kwargs):
self.all_counter += 1

# test all topics
broadcast_service.subscribe_all(handle_all_topics)

# test listen of common no params
broadcast_service.subscribe(
"test_listen_of_common_no_params", handle_topic_no_params)
broadcast_service.publish("test_listen_of_common_no_params")
wait()
self.assertTrue(self.test_listen_of_common_no_params)

# test listen of common specify params
broadcast_service.subscribe(
"test_listen_of_common_specify_params", handle_topic_specify_params)
broadcast_service.publish(
"test_listen_of_common_specify_params", 11, 22, 33)
wait()
self.assertTrue(self.test_listen_of_common_specify_params)

# test listen multi topics of common
topics = ['test_listen_multi_topic1_of_common', 'test_listen_multi_topic2_of_common']
broadcast_service.subscribe(topics, handle_multi_topics)
broadcast_service.publish("test_listen_multi_topic1_of_common", 111)
broadcast_service.publish("test_listen_multi_topic2_of_common", 222)
wait()
self.assertTrue(self.test_listen_multi_topic1_of_common)
self.assertTrue(self.test_listen_multi_topic2_of_common)
self.assertEqual(2, self.counter)

self.assertEqual(4, self.all_counter)

def test_listen_of_decorator(self):
self.test_listen_of_decorator_no_params = False
self.test_listen_of_decorator_no_params1 = False
self.test_listen_of_decorator_no_params2 = False
self.test_listen_of_decorator_multi_params = False
self.test_listen_of_decorator_specify_params = False
self.test_listen_of_decorator_listen_all = False
self.counter = 0

@broadcast_service.on_listen(["test_listen_of_decorator_no_params"])
@broadcast_service.on_listen("test_listen_of_decorator_no_params1")
def handle_topic_no_params():
self.test_listen_of_decorator_no_params = True
self.test_listen_of_decorator_no_params1 = True

@broadcast_service.on_listen(["test_listen_of_decorator_no_params2"])
def handle_topic_no_params():
self.test_listen_of_decorator_no_params2 = True

@broadcast_service.on_listen(["test_listen_of_decorator_no_params1","test_listen_of_decorator_no_params2"])
def handle_topic_no_params():
if self.test_listen_of_decorator_no_params1 and self.test_listen_of_decorator_no_params2:
self.test_listen_of_decorator_multi_params = True

@broadcast_service.on_listen(["test_listen_of_decorator_specify_params"])
def handle_topic_specify_params(a, b, c):
Expand All @@ -70,9 +112,16 @@ def handle_listen_all_topics(*args, **kwargs):
self.counter += 1
self.test_listen_of_decorator_listen_all = True

broadcast_service.publish("test_listen_of_decorator_no_params")
broadcast_service.publish("test_listen_of_decorator_no_params1")
wait()
self.assertTrue(self.test_listen_of_decorator_no_params1)

broadcast_service.publish("test_listen_of_decorator_no_params2")
wait()
self.assertTrue(self.test_listen_of_decorator_no_params2)

wait()
self.assertTrue(self.test_listen_of_decorator_no_params)
self.assertTrue(self.test_listen_of_decorator_multi_params)

broadcast_service.publish(
"test_listen_of_decorator_specify_params", 11, 22, 33)
Expand All @@ -82,8 +131,7 @@ def handle_listen_all_topics(*args, **kwargs):
broadcast_service.publish("test_listen_of_decorator_listen_all")
wait()
self.assertTrue(self.test_listen_of_decorator_listen_all)
print(broadcast_service.pubsub_channels)
self.assertEqual(3, self.counter)
self.assertEqual(4, self.counter)

def test_listen_of_lambda(self):
self.test_listen_of_lambda_no_params = False
Expand All @@ -108,7 +156,44 @@ def handle_topic_specify_params(params: bool):
self.assertTrue(self.test_listen_of_lambda_no_params)

def test_broadcast(self):
pass
self.test_broadcast_one_topic1 = False
self.test_broadcast_one_topic2 = False
self.test_broadcast_multi_topic1 = False
self.test_broadcast_multi_topic2 = False
self.counter = 0

def test_close(self):
pass
@broadcast_service.on_listen("test_broadcast_one_topic1")
def handle_one_topic1():
self.counter += 1
self.test_broadcast_one_topic1 = True

@broadcast_service.on_listen("test_broadcast_one_topic2")
def handle_one_topic2():
self.counter += 1
self.test_broadcast_one_topic2 = True

@broadcast_service.on_listen("test_broadcast_multi_topic1")
def handle_multi_topic1():
self.counter += 1
self.test_broadcast_multi_topic1 = True

@broadcast_service.on_listen("test_broadcast_multi_topic2")
def handle_multi_topic2():
self.counter += 1
self.test_broadcast_multi_topic2 = True

broadcast_service.publish("test_broadcast_one_topic1")
wait()
self.assertTrue(self.test_broadcast_one_topic1)

broadcast_service.publish(["test_broadcast_one_topic2"])
wait()
self.assertTrue(self.test_broadcast_one_topic2)

broadcast_service.publish(["test_broadcast_multi_topic1", "test_broadcast_multi_topic2"])
wait()
self.assertTrue(self.test_broadcast_multi_topic1 and self.test_broadcast_multi_topic2)

broadcast_service.publish_all()
wait()
self.assertEqual(8, self.counter)

0 comments on commit e2df444

Please sign in to comment.