Skip to main content

Redis Streams client implementation for high availability usage including consumer, monitor and scaler implementation

Project description

Redis-Streams

This package builds on Redis Streams and provides an easy to use interface for batch collection and processing. Simplifies the consumer group and consumers management. Designed for a highly available, scalable and distributed environment, it thus offers, in addition to the main functionality, monitoring and scaling capabilities.

The main idea is that Redis Streams supports several message producers. The messages then organized into consumer groups where multiple consumers can collect a batch of items, process them and acknowledge the successfully processed ones. If processing fails, the message has not been acknowledged will be part of the next batch. In case of consumer failure the monitor component will re-assign the related messages to a healthy consumer this way messages don't get lost. Optional scaling component monitors incoming/processed message rate and suggests consumer scale if necessary

Installation

Latest version:

pip3 install redis-streams

Components

Overview of the components Redis Streams Image source: tgrall.github.io

Producer

As its name suggests, this component is responsible for providing the messages in the stream. Redis supports multiple producers.

Example code

from redis import Redis
from redis_streams.producer import Producer

redis_conn = Redis()
producer = Producer(redis_conn=redis_conn, stream=STREAM)

# publish a single message
msg_id = producer.add({"message": "stuff goes here"})
print(f"Published {msg_id}")

# optional: cap the stream length with approximate trimming
producer_with_trim = Producer(redis_conn=redis_conn, stream=STREAM, maxlen=10000)
producer_with_trim.add({"message": "older entries will be trimmed"})

Consumer

The consumer registers in the consumer group and start fetching for available messages. Once a preconfigured batch size is reached, it gives back the list of items to the caller which then can acknowledge this way remove from the Stream the message. The consumer implementation returns after the preconfigured maximum weight time, even if the lot is not full. This way the items won't wait long in the stream

Example code

# It is crucial to enable "decode_response" feature of Redis
redis_conn = Redis(decode_responses=True)
consumer = Consumer(
        redis_conn=redis_conn,
        stream=STREAM,
        consumer_group=GROUP,
        batch_size=10,
        max_wait_time_ms=30000
    )
while True:
    messages = consumer.get_items()
    total_no_of_messages = len(messages)
    for i, item in enumerate(messages):
        print(f"Pocessing {i}/{total_no_of_messages} message:{item}")
        process_message(item=item)
        consumer.remove_item_from_consumer_group(item_id=item.msgid)

Monitor

Periodically check the activity of the consumers warns if they are idle - not fetching message from the Stream for longer than the preconfigured inactivity threshold or have more assigned messages than the batch size. Automatic or on-demand cleanup are also supported.

Example code

monitor = Monitor(
    redis_conn=Redis(),
    stream=STREAM,
    consumer_group=GROUP,
    batch_size=10,   # batch size has to be tha same as for consumers 
)
monitor.collect_monitoring_data(auto_cleanup=True)
monitor.print_monitoring_data()

Output

+-------------------------+-------------+-----------------+----------------------------------+
|             Consumer id |   Idle time |   Pending items | Status                           |
+=========================+=============+=================+==================================+
| b'29102140026848155456' |         923 |               7 | OK                               |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29104139791624517440' |      294191 |               5 | WARNING - idle for long time     |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29144140168467982144' |      361502 |               8 | WARNING - idle for long time     |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29304140033034540864' |        8658 |              11 | WARNING - too many pending items |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29312139940580673344' |       11734 |              58 | WARNING - too many pending items |
+-------------------------+-------------+-----------------+----------------------------------+
| b'29314139867734665024' |       14216 |               1 | OK                               |
+-------------------------+-------------+-----------------+----------------------------------+

Scaler

By checking the number of messages waiting to be assigned and the number of pending items, utilization ratio can be calculated. Once this rate crosses a lower (scale in) or higher (scale out) the code will give a suggestion of scale in / out.

Example code

scaler = Scaler(
    redis_conn=Redis(decode_responses=True),
    stream=STREAM,
    consumer_group=GROUP
)
scaler.collect_metrics()
rate, suggestion = scaler.get_scale_decision(
    scale_out_rate=60, scale_in_rate=20
)
print(
    f"Consumers should be {suggestion} as stream length "
    f"({scaler.stream_lenght}) / pending ({scaler.stream_pending}) "
    f"rate is {rate}%"
)

Output

Consumers should be IN as stream length (11) / pending (83) rate is 13.253%
Consumers should be NO_SCALE as stream length (18) / pending (79) rate is 22.7848%

License

This project is licensed under the terms of the GPL3.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redis_streams-0.3.0.tar.gz (25.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redis_streams-0.3.0-py3-none-any.whl (24.9 kB view details)

Uploaded Python 3

File details

Details for the file redis_streams-0.3.0.tar.gz.

File metadata

  • Download URL: redis_streams-0.3.0.tar.gz
  • Upload date:
  • Size: 25.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for redis_streams-0.3.0.tar.gz
Algorithm Hash digest
SHA256 42f373fbe504859b3a6a846c0be45a15f227536cd3646ecedeaece891a4135e3
MD5 bde3f35a4abe257ffe74ded35169bd44
BLAKE2b-256 ee2697d16056a1db0a41c46d9c08fa6d73bd32daceac4171f9f5472668bd7f79

See more details on using hashes here.

File details

Details for the file redis_streams-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: redis_streams-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 24.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for redis_streams-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 04b4e110e6438a37f32dbd9e53cbcbeec76e30853070587a3c7daa1c812b73c4
MD5 aa720edf06e21f158967c8ee4ac86c13
BLAKE2b-256 07afb051fd038130229d9653f49cf1a9b52ef261d32f4165a981d948422613f5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page