Skip to content

Run an async function under any limited rate

Notifications You must be signed in to change notification settings

antct/qps-limit

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

QPS Limit

Run functions under any limited rate using multiprocessing + asyncio

Available on Unix (i.e. Linux, MacOS) only, as the default multiprocessing start method is fork

Installation

pip install qps-limit

or install manually via git

git clone git://github.com/antct/qps-limit.git qps-limit
cd qps-limit
python setup.py install

Usage

from qps_limit import Limiter

Limiter(
    func="an asynchronous function",
    params="a generator function yields args and kwargs",
    callback="a callback function that handles the return values of func",
    num_workers="number of processes, recommended <= number of CPUs",
    worker_max_qps="maximum qps per process, None means unlimited",
    ordered="return ordered results or not, the default option is False"
)

BTW: The wrapped function returns tuples (idx, res) consisting of the data index and the function return value. If ordered=False is set, the order of the returned values may be randomized for better performance.

Quick Start

10 workers, each with a maximum qps of 10, to sample data from input stream (i.e. reservoir sampling)

import random

from qps_limit import Limiter


async def func(n: int):
    return n


def params():
    for n in range(1000):
        yield (), {"n": n}


f = Limiter(
    func=func,
    params=params,
    num_workers=10,
    worker_max_qps=10,
    ordered=False
)

i = 0
k = 10
d = []
for _, res in f():
    if i < k:
        d.append(res)
    else:
        j = random.randint(0, i)
        if j < k:
            d[j] = res
    i += 1
receiver: 1000it [00:00, 861961.36it/s]
producer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 87.07it/s]
consumer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 87.11it/s]

Best Practice

Initialize resources that can not be pickled

resource = None

async def func(n):
    global resource
    # with process lock
    if resource is None:
        resource = {}

Debugging code using only partial data

Limiter(
    ...,
    max_steps=100
)

Early termination if specific conditions are met

i, max_i = 0, 100
for _, res in f():
    if # condition:
        if i > max_i:
            f.stop()
            break
        i += 1

Safely write files with multiple processes

import fcntl

writer = open('...', 'w+')

def callback(line):
    global writer
    fcntl.flock(writer, fcntl.LOCK_EX)
    writer.write('{}\n'.format(line))
    writer.flush()
    fcntl.flock(writer, fcntl.LOCK_UN)

f = Limiter(
    ...
    callback=callback
)

About

Run an async function under any limited rate

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages