diff --git a/broadcast_service/_core.py b/broadcast_service/_core.py index ff287a2..6683c6d 100644 --- a/broadcast_service/_core.py +++ b/broadcast_service/_core.py @@ -13,7 +13,6 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor -from functools import wraps from typing import Optional, List, Callable __all__ = ['broadcast_service', 'BroadcastService'] @@ -61,27 +60,68 @@ def __init__(self): # function renaming self.subscribe = self.listen - self.publish = self.broadcast - self.unsubscribe = self.stop_listen - self.on = self.listen + self.publish = self.broadcast self.emit = self.broadcast + self.unsubscribe = self.stop_listen self.off = self.stop_listen + self.on_subscribe = self.on_listen + self.subscribe_all = self.listen_all + self.publish_all = self.broadcast_all - def listen(self, topic_name: str, callback: Callable): - """ listen topic """ + def listen(self, topics: str or List[str], callback: Callable): + """ + listen topic. + """ + if type(topics) == str: + self._listen_topic(topics, callback) + elif type(topics) == list: + for topic in topics: + self._listen_topic(topic, callback) + else: + raise ValueError("Unknown broadcast-service error, please submit " + "issue to https://github.com/Undertone0809/broadcast-service/issues") + + def listen_all(self, callback: Callable): + """ + '__all__' is a special topic. It can receive any topic message. + """ + self._listen_topic('__all__', callback) + + def broadcast(self, topics: str or List[str], *args, **kwargs): + """ + Launch broadcast on the specify topic + """ + if type(topics) == str: + self._broadcast_topic(topics, *args, **kwargs) + elif type(topics) == list: + for topic in topics: + self._broadcast_topic(topic, *args, **kwargs) + else: + raise ValueError("Unknown broadcast-service error, please submit " + "issue to https://github.com/Undertone0809/broadcast-service/issues") + + def broadcast_all(self, *args, **kwargs): + """ + All topics listened on will be called back. + Attention: Not all callback function will called. If your publish + and your subscribe takes different arguments, your callback function + will not be executed. + """ + for topic in self.pubsub_channels.keys(): + self._broadcast_topic(topic, *args, **kwargs) + + def _listen_topic(self, topic_name: str, callback: Callable): if topic_name not in self.pubsub_channels.keys(): self.pubsub_channels[topic_name] = [] if callback not in self.pubsub_channels[topic_name]: - # options = { - # 'callback_function': callback, - # } self.pubsub_channels[topic_name].append(callback) - def broadcast(self, topic_name: str, *args, **kwargs): + def _broadcast_topic(self, topic_name: str, *args, **kwargs): """ - Launch broadcast on the specify topic + broadcast single topic. + TODO fix problem: There is no guarantee that every callback function will be executed unnecessarily in some cases. """ if topic_name not in self.pubsub_channels.keys(): self.pubsub_channels[topic_name] = [] @@ -108,12 +148,16 @@ def stop_listen(self, topic_name: str, callback: Callable): else: self.pubsub_channels[topic_name].remove(callback) - def on_listen(self, topics: Optional[List[str]] = None) -> Callable: + def on_listen(self, topics: str or Optional[List[str]] = None) -> Callable: """ Decorator to listen specify topic. If topics is none, then listen all topics has exits. :param topics: topic list, you can input topic like: ["topic1", "topic2"]. Usage:: + @broadcast_service.on_listen('topic1') + def handle_all_msg(): + # your code + @broadcast_service.on_listen(['topic1']) def handle_all_msg(): # your code @@ -130,11 +174,12 @@ def handle_all_msg(*args, **kwargs): the callback function you handle should take arguments, otherwise it will not be called back. """ def decorator(fn: Callable) -> Callable: - if topics is not None: - for topic in topics: - self.listen(topic, fn) - else: - self.listen('__all__', fn) + if not topics: + self._listen_topic('__all__', fn) + elif type(topics) == str: + self._listen_topic(topics, fn) + elif type(topics) == list: + self.listen(topics, fn) def inner(*args, **kwargs) -> Callable: ret = fn(*args, **kwargs) diff --git a/docs/README.md b/docs/README.md index af082b7..cb5df3f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -25,6 +25,7 @@ - A publishing subscriber pattern can be built with a very simple syntax - Support different application scenarios, such as asynchronous and synchronous - Provide different syntax writing modes for lambda, callback functions, decorators, etc +- A callback function listens on multiple subscriptions ## Setup ```sh @@ -58,41 +59,7 @@ if __name__ == '__main__': ``` -- You can use `publish, emit, broadcast` to send your topic msg and use `listen, on, subscribe` to listen your topic msg. - -- You can also add more arguments or no argument when you publish thr broadcast. -```python -from broadcast_service import broadcast_service - -# subscribe topic -@broadcast_service.on_listen(['my_topic']) -def handle_msg(info, info2): - print(info) - print(info2) - -if __name__ == '__main__': - info = 'This is very important msg' - info2 = 'This is also a very important msg.' - - # publish broadcast - broadcast_service.publish('my_topic', info, info2) -``` -```python -from broadcast_service import broadcast_service - -# subscribe topic -@broadcast_service.on_listen(['my_topic']) -def handle_msg(): - print('handle_msg callback') - -if __name__ == '__main__': - # publish broadcast - broadcast_service.publish('Test') -``` - -You can use decorator to subscirbe your - -Actually, you can see more example in [example](/example) and [tests](/tests). +**About more example, please see [Quick Start](/quickstart.md)** ## TODO - optimize documents and show more examples. diff --git a/setup.py b/setup.py index 1a4ef62..8b56dc6 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setuptools.setup( name="broadcast_service", - version="1.1.7", + version="1.2.0", author="Zeeland", author_email="zeeland@foxmail.com", description="A lightweight third-party broadcast library", diff --git a/tests/test_async.py b/tests/test_async.py index 833b2ac..2f82321 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -18,7 +18,7 @@ def handle(): - time.sleep(2) + time.sleep(1) class TestAsync(unittest.TestCase): @@ -28,7 +28,7 @@ def test_async(self): broadcast_service.listen("test_topic", handle) broadcast_service.broadcast("test_topic") used_time = time.time() - start_time - self.assertLessEqual(used_time, 2) + self.assertLessEqual(used_time, 1) def test_sync(self): start_time = time.time() @@ -36,7 +36,7 @@ def test_sync(self): broadcast_service.listen("test_topic", handle) broadcast_service.broadcast("test_topic") used_time = time.time() - start_time - self.assertEqual(int(used_time), 2) + self.assertEqual(int(used_time), 1) if __name__ == '__main__': diff --git a/tests/test_base.py b/tests/test_base.py index 8137cbd..bc99074 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -17,7 +17,7 @@ from broadcast_service import broadcast_service -def wait(seconds=0.5): +def wait(seconds=0.1): time.sleep(seconds) @@ -25,6 +25,10 @@ class TestBroadcast(TestCase): def test_listen_of_common(self): self.test_listen_of_common_no_params = False self.test_listen_of_common_specify_params = False + self.test_listen_multi_topic1_of_common = False + self.test_listen_multi_topic2_of_common = False + self.counter = 0 + self.all_counter = 0 def handle_topic_no_params(): self.test_listen_of_common_no_params = True @@ -35,12 +39,27 @@ def handle_topic_specify_params(a, b, c): self.assertEqual(33, c) self.test_listen_of_common_specify_params = True + def handle_multi_topics(*args, **kwargs): + self.counter += 1 + if args[0] == 111: + self.test_listen_multi_topic1_of_common = True + if args[0] == 222: + self.test_listen_multi_topic2_of_common = True + + def handle_all_topics(*args, **kwargs): + self.all_counter += 1 + + # test all topics + broadcast_service.subscribe_all(handle_all_topics) + + # test listen of common no params broadcast_service.subscribe( "test_listen_of_common_no_params", handle_topic_no_params) broadcast_service.publish("test_listen_of_common_no_params") wait() self.assertTrue(self.test_listen_of_common_no_params) + # test listen of common specify params broadcast_service.subscribe( "test_listen_of_common_specify_params", handle_topic_specify_params) broadcast_service.publish( @@ -48,15 +67,38 @@ def handle_topic_specify_params(a, b, c): wait() self.assertTrue(self.test_listen_of_common_specify_params) + # test listen multi topics of common + topics = ['test_listen_multi_topic1_of_common', 'test_listen_multi_topic2_of_common'] + broadcast_service.subscribe(topics, handle_multi_topics) + broadcast_service.publish("test_listen_multi_topic1_of_common", 111) + broadcast_service.publish("test_listen_multi_topic2_of_common", 222) + wait() + self.assertTrue(self.test_listen_multi_topic1_of_common) + self.assertTrue(self.test_listen_multi_topic2_of_common) + self.assertEqual(2, self.counter) + + self.assertEqual(4, self.all_counter) + def test_listen_of_decorator(self): - self.test_listen_of_decorator_no_params = False + self.test_listen_of_decorator_no_params1 = False + self.test_listen_of_decorator_no_params2 = False + self.test_listen_of_decorator_multi_params = False self.test_listen_of_decorator_specify_params = False self.test_listen_of_decorator_listen_all = False self.counter = 0 - @broadcast_service.on_listen(["test_listen_of_decorator_no_params"]) + @broadcast_service.on_listen("test_listen_of_decorator_no_params1") def handle_topic_no_params(): - self.test_listen_of_decorator_no_params = True + self.test_listen_of_decorator_no_params1 = True + + @broadcast_service.on_listen(["test_listen_of_decorator_no_params2"]) + def handle_topic_no_params(): + self.test_listen_of_decorator_no_params2 = True + + @broadcast_service.on_listen(["test_listen_of_decorator_no_params1","test_listen_of_decorator_no_params2"]) + def handle_topic_no_params(): + if self.test_listen_of_decorator_no_params1 and self.test_listen_of_decorator_no_params2: + self.test_listen_of_decorator_multi_params = True @broadcast_service.on_listen(["test_listen_of_decorator_specify_params"]) def handle_topic_specify_params(a, b, c): @@ -70,9 +112,16 @@ def handle_listen_all_topics(*args, **kwargs): self.counter += 1 self.test_listen_of_decorator_listen_all = True - broadcast_service.publish("test_listen_of_decorator_no_params") + broadcast_service.publish("test_listen_of_decorator_no_params1") + wait() + self.assertTrue(self.test_listen_of_decorator_no_params1) + + broadcast_service.publish("test_listen_of_decorator_no_params2") + wait() + self.assertTrue(self.test_listen_of_decorator_no_params2) + wait() - self.assertTrue(self.test_listen_of_decorator_no_params) + self.assertTrue(self.test_listen_of_decorator_multi_params) broadcast_service.publish( "test_listen_of_decorator_specify_params", 11, 22, 33) @@ -82,8 +131,7 @@ def handle_listen_all_topics(*args, **kwargs): broadcast_service.publish("test_listen_of_decorator_listen_all") wait() self.assertTrue(self.test_listen_of_decorator_listen_all) - print(broadcast_service.pubsub_channels) - self.assertEqual(3, self.counter) + self.assertEqual(4, self.counter) def test_listen_of_lambda(self): self.test_listen_of_lambda_no_params = False @@ -108,7 +156,44 @@ def handle_topic_specify_params(params: bool): self.assertTrue(self.test_listen_of_lambda_no_params) def test_broadcast(self): - pass + self.test_broadcast_one_topic1 = False + self.test_broadcast_one_topic2 = False + self.test_broadcast_multi_topic1 = False + self.test_broadcast_multi_topic2 = False + self.counter = 0 - def test_close(self): - pass + @broadcast_service.on_listen("test_broadcast_one_topic1") + def handle_one_topic1(): + self.counter += 1 + self.test_broadcast_one_topic1 = True + + @broadcast_service.on_listen("test_broadcast_one_topic2") + def handle_one_topic2(): + self.counter += 1 + self.test_broadcast_one_topic2 = True + + @broadcast_service.on_listen("test_broadcast_multi_topic1") + def handle_multi_topic1(): + self.counter += 1 + self.test_broadcast_multi_topic1 = True + + @broadcast_service.on_listen("test_broadcast_multi_topic2") + def handle_multi_topic2(): + self.counter += 1 + self.test_broadcast_multi_topic2 = True + + broadcast_service.publish("test_broadcast_one_topic1") + wait() + self.assertTrue(self.test_broadcast_one_topic1) + + broadcast_service.publish(["test_broadcast_one_topic2"]) + wait() + self.assertTrue(self.test_broadcast_one_topic2) + + broadcast_service.publish(["test_broadcast_multi_topic1", "test_broadcast_multi_topic2"]) + wait() + self.assertTrue(self.test_broadcast_multi_topic1 and self.test_broadcast_multi_topic2) + + broadcast_service.publish_all() + wait() + self.assertEqual(8, self.counter)