Skip to content

Commit

Permalink
feat: add decorator, optimize syntactic expression and optimize tests…
Browse files Browse the repository at this point in the history
… and examples

feat: add decorator, optimize syntactic expression and optimize tests and examples
  • Loading branch information
Undertone0809 authored Jan 9, 2023
2 parents d4b47a8 + eaa225c commit 920f646
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 113 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ if __name__ == '__main__':
info = 'This is very important msg'

# listen topic
broadcast_service.listen('Test', handle_msg)
broadcast_service.subscribe('Test', handle_msg)

# publish broadcast
broadcast_service.broadcast('Test', info)
broadcast_service.publish('Test', info)

```

Expand All @@ -64,10 +64,10 @@ if __name__ == '__main__':
info2 = 'This is also a very important msg.'

# listen topic
broadcast_service.listen('Test', handle_msg)
broadcast_service.subscribe('Test', handle_msg)

# publish broadcast
broadcast_service.broadcast('Test', info, info2)
broadcast_service.publish('Test', info, info2)
```
```python
from broadcast_service import broadcast_service
Expand All @@ -77,12 +77,12 @@ def handle_msg():

if __name__ == '__main__':
# listen topic
broadcast_service.listen('Test', handle_msg)
broadcast_service.subscribe('Test', handle_msg)

# publish broadcast
broadcast_service.broadcast('Test')
broadcast_service.publish('Test')
```
Actually, you can see more example in [example](/example).
Actually, you can see more example in [example](/example) and [tests](/tests).

## TODO
- optimize documents and show more examples.
Expand Down
130 changes: 88 additions & 42 deletions broadcast_service/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Optional, List, Callable

__all__ = ['broadcast_service', 'BroadcastService']

__all__ = ['broadcast_service']

class BroadcastService:
"""
Expand All @@ -23,7 +26,6 @@ class BroadcastService:
callback function if some classes subscribe the topic.
example:
app.py
---------------------------------------------------------------------------------
from broadcast_service import broadcast_service
Expand All @@ -34,67 +36,111 @@ def handle_msg(params):
info = 'This is very important msg'
# listen topic
broadcast_service.listen('Test', handle_msg)
broadcast_service.subscribe('Test', handle_msg)
# publish broadcast
broadcast_service.broadcast('Test', info)
broadcast_service.publish('Test', info)
---------------------------------------------------------------------------------
"""

def __init__(self):
"""
subscribe_info example:
subscribe_info = {
'my_topic': [{
'callback_function': function1,
'params': {
'name':'jack',
'age':20
}
},{
'callback_function': function2,
'params': 666
}]
pubsub_channels is the dict to store publish/subscribe data.
pubsub_channels example:
pubsub_channels = {
'my_topic': [callback_function1: Callable,callback_function2: Callable]
'__all__': [callback_function3: Callable]
}
"""
self.topic_list = []
self.subscribe_info = {}
self.enable_async = True
self.pubsub_channels: dict = {
'__all__': []
}
self.enable_async: bool = True
self.thread_pool = ThreadPoolExecutor(max_workers=5)

def listen(self, topic_name, callback):
# function renaming
self.subscribe = self.listen
self.publish = self.broadcast
self.unsubscribe = self.stop_listen

self.on = self.listen
self.emit = self.broadcast
self.off = self.stop_listen

def listen(self, topic_name: str, callback: Callable):
""" listen topic """
if topic_name not in self.subscribe_info.keys():
self.subscribe_info[topic_name] = []
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

if callback not in self.subscribe_info[topic_name]:
options = {
'callback_function': callback,
}
self.subscribe_info[topic_name].append(options)
if callback not in self.pubsub_channels[topic_name]:
# options = {
# 'callback_function': callback,
# }
self.pubsub_channels[topic_name].append(callback)

def broadcast(self, topic_name, *args, **kwargs):
""" Launch broadcast on the specifide topic """
if topic_name not in self.topic_list:
self.topic_list.append(topic_name)
def broadcast(self, topic_name: str, *args, **kwargs):
"""
Launch broadcast on the specify topic
"""
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

if topic_name not in self.subscribe_info.keys():
return
for item in self.pubsub_channels[topic_name]:
if self.enable_async:
self.thread_pool.submit(
item, *args, **kwargs)
else:
item(*args, **kwargs)

for item in self.subscribe_info[topic_name]:
for item in self.pubsub_channels['__all__']:
if self.enable_async:
self.thread_pool.submit(item['callback_function'], *args, **kwargs)
self.thread_pool.submit(
item, *args, **kwargs)
else:
item['callback_function'](*args, **kwargs)
item(*args, **kwargs)

def stop_listen(self, topic_name, callback):
if topic_name not in self.subscribe_info.keys():
def stop_listen(self, topic_name: str, callback: Callable):
if topic_name not in self.pubsub_channels.keys():
raise RuntimeError("you didn't listen the topic:", topic_name)
if callback not in self.subscribe_info[topic_name]:
if callback not in self.pubsub_channels[topic_name]:
pass
else:
self.subscribe_info[topic_name].remove(callback)
self.pubsub_channels[topic_name].remove(callback)

def on_listen(self, topics: Optional[List[str]] = None) -> Callable:
"""
Decorator to listen specify topic. If topics is none, then listen all topics has exits.
:param topics: topic list, you can input topic like: ["topic1", "topic2"].
Usage::
@broadcast_service.on_listen(['topic1'])
def handle_all_msg():
# your code
@broadcast_service.on_listen(['topic1','topic2'])
def handle_all_msg():
# your code
@broadcast_service.on_listen()
def handle_all_msg(*args, **kwargs):
# your code
Attention: Your params should keep '*args, **kwargs'. If you publish a topic take arguments,
the callback function you handle should take arguments, otherwise it will not be called back.
"""
def decorator(fn: Callable) -> Callable:
if topics is not None:
for topic in topics:
self.listen(topic, fn)
else:
self.listen('__all__', fn)

def inner(*args, **kwargs) -> Callable:
ret = fn(*args, **kwargs)
return ret
return inner
return decorator


broadcast_service = BroadcastService()
90 changes: 90 additions & 0 deletions example/demo1_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2022 Zeeland(https://github.com/Undertone0809/). All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from broadcast_service import broadcast_service, BroadcastService


def handle_no_msg():
print("handle_no_msg func")


def callback_of_no_params():
"""
callback of no parameters
"""
# listen topic
broadcast_service.subscribe('no_params', handle_no_msg)

# publish broadcast
broadcast_service.publish('no_params')
"""
other way:
bc = BroadcastService()
bc.listen('no_params', handle_no_msg)
bc.broadcast('no_params')
"""


@broadcast_service.on_listen(["decorator", "lambda"])
def handle_decorator(*args, **kwargs):
print("handle_no_msg func")


def callback_of_decorator():
"""
callback of decorator
"""
broadcast_service.broadcast("decorator")


def callback_of_lambda():
"""
callback of lambda
"""
# listen topic
broadcast_service.listen('lambda', lambda x,y: print(f"the params is {x} and {y}"))

# publish broadcast
broadcast_service.broadcast('lambda', 11, 22)


def handle_2msg(info, info2):
print("handle_2msg func")
print(info)
print(info2)


def callback_of_2params():
"""
callback of 2 parameters
"""
info = 'info'
info2 = 'info2'

# listen topic
broadcast_service.listen('2_params', handle_2msg)

# publish broadcast
broadcast_service.broadcast('2_params', info, info2)


def main():
callback_of_no_params()
# callback_of_decorator()
# callback_of_lambda()
# callback_of_2params()


if __name__ == '__main__':
main()
31 changes: 0 additions & 31 deletions example/demo2.py

This file was deleted.

File renamed without changes.
27 changes: 0 additions & 27 deletions example/demo3.py

This file was deleted.

8 changes: 3 additions & 5 deletions example/demo1.py → example/demo3_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

class Application:
"""
This demo aim to show how to use broadcast-service.
This demo shows how to use async.
Scene:
One day, leader Tom arrive the company but find not one staff in company
because all staff are playing outside. Therefor, Tom send a message
Expand Down Expand Up @@ -64,19 +64,17 @@ def notice_go_back(self):
class Staff:
def __init__(self, name):
self.name = name
self.rec_msg()

def rec_msg(self):
broadcast_service.listen('meeting', self.go_back)

def go_back(self, info):
print("[{2}] {0}(staff) receive msg '{1}' and go back now.".format(
self.name, info, print_time()))
time.sleep(random.randint(1, 5))
time.sleep(2)
print('[{1}] {0}(staff) is back now.'.format(self.name, print_time()))


def main():
broadcast_service.enable_async = False
app = Application()
app.run()

Expand Down
Loading

0 comments on commit 920f646

Please sign in to comment.