Skip to content

lightweight message queue base on async redis steam

License

Notifications You must be signed in to change notification settings

kavinbj/aioRedisMQ

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

aio-redis-mq

Lightweight Message Queue & Broker base on async python redis streams

Suitable Application Environment

Modern software applications have moved from being a single monolithic unit to loosely coupled collections of services. While this new architecture brings many benefits, those services still need to interact with each other, creating the need for robust and efficient messaging solutions.

The following problems are suitable for using message queuing:

  • Asynchronous processing
  • Flow control
  • Service decoupling
  • Connect flow computing
  • As a publish / subscribe system

Installation

pip install aio-redis-mq

Quick Start

import asyncio
import time
from aio_redis_mq import MQProducer, MQConsumer

_redis_url = 'redis://root:xxxxx@localhost/1'


async def producer_task(producer):
    for _ in range(0, 10):
        await asyncio.sleep(1)
        send_msg_id = await producer.send_message({'msg': f'msg_{_}', 'content': time.strftime("%Y-%m-%d %H:%M:%S")})
        print(f'producer_task time at {time.strftime("%Y-%m-%d %H:%M:%S")}', f'message id={send_msg_id}')


async def consumer_task(consumer: MQConsumer, consumer_index: int):
    for _ in range(0, 10):
        msg = await consumer.block_read_messages(block=1500)
        print(f'consumer_{consumer_index} block read message', msg)


async def main():
    # one producer
    producer = MQProducer('pub_stream', redis_name='_redis_local', redis_url=_redis_url)

    # three consumer
    consumer1 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url)
    consumer2 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url)
    consumer3 = MQConsumer('pub_stream', redis_name='_redis_local', redis_url=_redis_url)

    await asyncio.gather(
        producer_task(producer),
        consumer_task(consumer1, 1),
        consumer_task(consumer2, 2),
        consumer_task(consumer3, 3)
    )

if __name__ == '__main__':
    asyncio.run(main())

Group Consumer

import asyncio
import time
from aio_redis_mq import MQProducer, GroupManager, Group, GroupConsumer

_redis_url = 'redis://root:xxxxx@localhost/1'


async def producer_task(producer):
    for _ in range(0, 10):
        await asyncio.sleep(1)
        print(f'-------------------------------------{_}-------------------------------------')
        send_msg_id = await producer.send_message({'msg': f'msg_{_}', 'content': time.strftime("%Y-%m-%d %H:%M:%S")})
        print(f'group_producer send_message time at {time.strftime("%Y-%m-%d %H:%M:%S")}', f'message id={send_msg_id}')


async def consumer_task(consumer: GroupConsumer):
    for _ in range(0, 10):
        # Here we use a low-level read message API and do not detect pending messages or handle idle messages
        msg = await consumer.block_read_messages(count=1, block=1500)
        await asyncio.sleep(0.05)
        print(f'group_consumer {consumer.consumer_id} group={consumer.group_name} block read message', msg)
        if len(msg) > 0 and len(msg[0][1]) > 0:
            msg_id = msg[0][1][0][0]
            ack_result = await consumer.ack_message(msg_id)
            print(f'group_consumer {consumer.consumer_id} group={consumer.group_name} ack message id='
                  f'{msg_id} {"successful" if ack_result else "failed"}.')


# show info
async def show_groups_infor(group: Group):
    print(f'-----------------------------{group.group_name}---------- groups info ------------------------------------')
    group_info = await group.get_groups_info()
    print(f'group name: {group.group_name} groups info : {group_info}')
    print(f'-----------------------------{group.group_name}--------- consumer info -----------------------------------')
    consumer_info = await group.get_consumers_info()
    print(f'group name: {group.group_name} consumer info : {consumer_info}')
    print(f'-----------------------------{group.group_name}-------- pending info -------------------------------------')
    pending_info = await group.get_pending_info()
    print(f'group name: {group.group_name} pending info : {pending_info}')


async def main():
    # create one producer
    producer = MQProducer('group_stream1', redis_name='_group_redis_', redis_url=_redis_url)

    # create group manager , via same stream key, same redis_name
    group_manager = GroupManager('group_stream1', redis_name='_group_redis_', redis_url=_redis_url)

    # create first group
    group1 = await group_manager.create_group('group1')
    # create two consumers in the same group
    consumer1 = await group1.create_consumer('consumer1')
    consumer2 = await group1.create_consumer('consumer2')

    # create second group
    group2 = await group_manager.create_group('group2')
    # create three consumers in the same group
    consumer3 = await group2.create_consumer('consumer3')
    consumer4 = await group2.create_consumer('consumer4')
    consumer5 = await group2.create_consumer('consumer5')

    await asyncio.gather(
        producer_task(producer),
        consumer_task(consumer1),
        consumer_task(consumer2),
        consumer_task(consumer3),
        consumer_task(consumer4),
        consumer_task(consumer5)
    )

    print('------------------------------------- show total infor -------------------------------------')
    stream_info = await group_manager.get_stream_info(group_manager.stream_key)
    print(f'stream_key: {group_manager.stream_key} stream info : {stream_info}')

    await show_groups_infor(group1)
    await show_groups_infor(group2)


if __name__ == '__main__':
    asyncio.run(main())

More Example

For more examples, please query the example folder.

About Redis streams

The Redis Stream is a new data type introduced with Redis 5.0, which models a log data structure in a more abstract way. Redis Streams doubles as a communication channel for building streaming architectures and as a log-like data structure for persisting data, making Streams the perfect solution for event sourcing.

The stream type published in redis5.0 is also used to implement typical message queues. The emergence of this stream type meets almost all the requirements of message queues, including but not limited to:

  • Serialization generation of message ID
  • Message traversal
  • Blocking and non blocking reading of messages
  • Group consumption of messages
  • Unfinished message processing
  • Message queue monitoring

Comparison of basic concepts

Common distributed message system, including RabbitMQ 、 RocketMQ 、 Kafka 、Pulsar 、Redis streams

Redis streams vs Kafka

Kafka Redis Streams Description
Record Message Objects to be processed in the message engine
Producer Producer Clients that publish new messages to topics
Consumer Consumer Clients that subscribe to new messages from topics
Consumer Group Consumer Group A group composed of multiple consumer instances can consume the same topic at the same time to achieve high throughput.
Broker Cluster Node servers form the storage layer. Leader-Follower replica
Topic Stream Data type Topics are logical containers that carry messages
partitions Different Redis keys Redis Streams Differences with Kafka (TM) partitions

Performance

You can use the following tools for performance testing.

OpenMessaging Benchmark Framework

API Reference

MQClient

client for message system, can manage and query messages.

  • __init__(redis_name: Optional[str] = None, redis_url: Optional[str] = None, redis_pool: aioredis.client.Redis = None, **kwargs)

    create MQ Client instance

    • redis_name: name for cache redis client
    • redis_url: redis server url
    • redis_pool: aioredis.client.Redis instance, defaults to None
  • get_stream_length(stream_key: KeyT)

    Returns the number of elements in a given stream.

    • stream_key: key of the stream.
  • query_messages(stream_key: KeyT, min_id: StreamIdT = "-", max_id: StreamIdT = "+", count: Optional[int] = None)

    query message value from min_id to max_id with count limit in a given stream.

    • stream_key: key of the stream.
    • min_id: first stream ID. defaults to '-', meaning the earliest available.
    • max_id: last stream ID. defaults to '+', meaning the latest available.
    • count: if set, only return this many items, beginning with the earliest available.
  • reverse_query_messages(stream_key: KeyT, min_id: StreamIdT = "-", max_id: StreamIdT = "+", count: Optional[int] = None)

    query message value in reverse order from min_id to max_id with count limit in a given stream.

  • get_stream_info(stream_key: KeyT)

    Returns general information about the stream.

  • delete_message(stream_key: KeyT, *ids: StreamIdT)

    Deletes one or more messages from a stream.

    • stream_key: key of the stream.
    • *ids: message ids to delete.
  • trim_stream(stream_key: KeyT, maxlen: int, approximate: bool = True)

    Deletes one or more messages from a stream.

    • stream_key: key of the stream.
    • maxlen: truncate old stream messages beyond this size
    • maxlen: actual stream length may be slightly more than maxlen
    client = MQClient(redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0')
    
    # get stream length
    stream_length = await client.get_stream_length('_test_stream1')
    
    # get stream info
    stream_info = await client.get_stream_info('_test_stream1')
    
    assert stream_info.get('length') == stream_length
    
    # get first_message_info
    first_message_info = await client.query_messages('_test_stream1', count=1)
    # get last_message_info
    last_message_info = await client.reverse_query_messages('_test_stream1', count=1)
    
    assert first_message_info[0] == stream_info.get('first-entry')
    assert last_message_info[0] == stream_info.get('last-entry')

MQProducer <- MQClient

message producer, MQClient with a specific stream key

  • __init__(stream_key: KeyT, redis_name: str = None, redis_pool: aioredis.client.Redis = None, **kwargs)

    message producer in message system based on a specific stream key.

    • stream_key: key of stream
    • redis_name: name for cache redis client
    • redis_url: redis server url
    • redis_pool: aioredis.client.Redis instance, defaults to None
  • send_message(message: Dict[FieldT, EncodableT], msg_id: StreamIdT = "*", maxlen: int = None, approximate: bool = True)

    Coroutine. send message content to a stream which is a message container, and return message id.

    • message: dict of field/value pairs to insert into the stream
    • msg_id: Location to insert this record. By default it is appended.
    • maxlen: max number of messages, truncate old stream members beyond this size
    • approximate: actual stream length may be slightly more than maxlen
    producer = MQProducer('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0')
    send_msg_id = await producer.send_message({'msg_key1': 'value1', 'msg_key2': 'value2'})

MQConsumer <- MQClient

message consumer, MQClient with a specific stream key

  • __init__(stream_key: KeyT, redis_name: str = None, redis_pool: aioredis.client.Redis = None, **kwargs)

    message consumer in message system based on a specific stream key.

    • stream_key: key of stream
    • redis_name: name for cache redis client
    • redis_url: redis server url
    • redis_pool: aioredis.client.Redis instance, defaults to None
  • read_messages(streams: Dict[KeyT, StreamIdT], count: Optional[int] = None)

    Coroutine. read messages from streams as message containers

    • streams: a dict of stream keys to stream IDs, where IDs indicate the last ID already seen.
    • count: if set, only return this many items, beginning with the earliest available.
  • block_read_messages(*stream_key: KeyT, count: Optional[int] = None, block: Optional[int] = None,)

    Coroutine. Block and monitor multiple streams for new data.

    • stream_key: key of the stream.
    • count: if set, only return this many items, beginning with the earliest available.
    • block: number of milliseconds to wait, if nothing already present.
    consumer = MQConsumer('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0')
    
    # block read new message
    new_msg = await consumer.block_read_messages(block=1500)
    
    # read messages from msg_id(0 or other id)  in single stream (pub_stream)
    read_msgs = await consumer.read_messages({'pub_stream': 0}, count=10)

GroupManager

  • __init__(stream_key: KeyT, redis_name: str = None, **kwargs)

    group manager in message system based on a specific stream key.

    • stream_key: key of stream
    • redis_name: name for cache redis client
    • redis_url: redis server url
  • create_group(group_name: GroupT, msg_id: StreamIdT = "$", mkstream: bool = True)

    Create a new group consumer associated with a stream

    • group_name: name of the consumer group
    • msg_id: ID of the last item in the stream to consider already delivered.
    • mkstream: a boolean indicating whether to create new stream
  • destroy_group(group_name: GroupT)

    Destroy a consumer group

    • group_name: name of the consumer group
  • get_groups_info()

    Returns general information about the consumer groups of the stream.

    group_manager = GroupManager('pub_stream', redis_name='my_redis', redis_url='redis://root:xxxxx@localhost/0')
    
    # create group
    group = await group_manager.create_group('group')

Group

  • create_consumer(consumer_id: ConsumerT)

    create a consumer instance in group

    • consumer_id: id of consumer.
  • delete_consumer(consumer_id: ConsumerT)

    Remove a specific consumer from a consumer group.

    • consumer_id: id of consumer.
  • set_msg_id(msg_id: StreamIdT)

    Set the consumer group last delivered ID to something else.

    • msg_id: ID of the last item in the stream to consider already delivered
  • get_groups_info()

    Returns general information about the consumer groups of the stream.

  • get_consumers_info()

    Returns general information about the consumers in the group. only return consumer which has read message

  • get_pending_info()

    Returns information about pending messages of a group.

  • query_pending_messages(min_msg_id: Optional[StreamIdT], max_msg_id: Optional[StreamIdT], count: Optional[int], consumer_id: Optional[ConsumerT] = None)

    Returns information about pending messages, in a range.

    • min_msg_id: minimum message ID
    • max_msg_id: maximum message ID
    • count: number of messages to return
    • consumer_id: id of a consumer to filter by (optional)
  • ack_message(*msg_id: StreamIdT)

    Acknowledges the successful processing of one or more messages.

    • msg_id: message ids to acknowledge.
  • claim_message(consumer_id: ConsumerT, min_idle_time: int, msg_ids: Union[List[StreamIdT], Tuple[StreamIdT]], idle: Optional[int] = None, time: Optional[int] = None, retrycount: Optional[int] = None, force: bool = False, justid: bool = False)

    Changes the ownership of a pending message. In the context of a stream consumer group, this command changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.

    • consumer_id: name of a consumer that claims the message.
    • min_idle_time: filter messages that were idle less than this amount of milliseconds
    • msg_ids: non-empty list or tuple of message IDs to claim
    • idle: Set the idle time (last time it was delivered) of the message in ms
    • time: optional integer. This is the same as idle but instead of a relative amount of milliseconds, it sets the idle time to a specific Unix time (in milliseconds).
    • retrycount: optional integer. set the retry counter to the specified value. This counter is incremented every time a message is delivered again.
    • force: optional boolean, false by default. Creates the pending message entry in the PEL even if certain specified IDs are not already in the PEL assigned to a different client.
    • justid: optional boolean, false by default. Return just an array of IDs of messages successfully claimed, without returning the actual message

GroupConsumer

  • read_messages(streams: Dict[KeyT, StreamIdT], count: Optional[int] = None, noack: bool = False)

    Read from a stream via a consumer group.

    • streams: a dict of stream names to stream IDs, where IDs indicate the last ID already seen.
    • count: if set, only return this many items, beginning with the earliest available
    • noack: do not add messages to the PEL (Pending Entries List)
  • block_read_messages(*stream_key: KeyT, block: Optional[int] = None, count: Optional[int] = None, noack: bool = False)

    Block read from a stream via a consumer group.

    • stream_key: a list of stream key
    • block: number of milliseconds to wait, if nothing already present.
    • count: if set, only return this many items, beginning with the earliest available
    • noack: do not add messages to the PEL (Pending Entries List)
  • query_pending_messages(min_msg_id: Optional[StreamIdT], max_msg_id: Optional[StreamIdT], count: Optional[int])

    Returns information about pending messages, in a range.

    • min_msg_id: minimum message ID
    • max_msg_id: maximum message ID
    • count: number of messages to return
  • ack_message(*msg_id: StreamIdT)

    Acknowledges the successful processing of one or more messages.

    • msg_id: message ids to acknowledge.

Test

cd tests
pytest --u root --p password --url localhost -vs --cov --cov-report=html

Developer

kavinbj