Lupro Scheduler 是一个任务链同步调度器(支持协程,线程,进程同步通讯)
使用 PyPi 安装 Lupro
pip
Find, install and publish Python packages with the Python Package Indexpip install lupro-scheduler
- 导入
from lupro_scheduler import Scheduler
from lupro_scheduler import Task
from lupro_scheduler.typing import RunIOintensive
import asyncio
async def main(a, b):
await asyncio.sleep(2)
return a+b
task = Task(main, range(10), kwargs = [{'b': i} for i in range(10)], method = RunIOintensive. RunAsync_CREATETASK)
print(task.run())
这样即可使用RunTask调用不同的执行方法
class RunTask(enum.Enum):
'''RunTask method'''
APPLY = 0
APPLYASYNC = 1
MAP = 2
MAPASYNC = 3
EXECUTOR = 4
RunAsync_CREATETASK = 10
RunAsync_GATHER = 11
RunAsync_ASCOMPLETED = 12
RunAsync_RUNFOREVER = 13
RunThreadMap_MAP = 20
RunTreadPool_POOL = 30
RunGevent_GEVENT = 40
RunGevent_MAP = 41
from lupro_scheduler import Scheduler
from lupro_scheduler.typing import RunIOintensive
import random
import asyncio
import time
async def main(a, b):
await asyncio.sleep(random.randint(4,8))
print(f'IOintensive task {a} {b}!')
return a+b
def main1(a):
print(f'Calintensive task {a}!')
time.sleep(random.randint(1,4))
return a**a
if __name__ == '__main__':
task = Scheduler()
# 添加IO密集型任务
task.addIO(main, range(10), kwargs = [{'b': i} for i in range(10)], method = 10)
# 添加计算密集型任务
task.addCal(main1)
# 大概12s左右完成
print(task.run())
- 协程接口
- 线程接口
- 进程接口
- 任务链异步调度
- 任务链同步调度
- Scheduler(IO) 可等待
完善中