Redis Streams is a new data type introduced by Redis 5.0, which provides a powerful way to store log structured data. The Streams type is ideal for building message queues, event logs, and other applications that require persistence and efficient processing of time series data.
1 Basic Features
- Persistence: Unlike traditional publish/subscribe, messages in Streams are persistent, and the previous messages can still be accessed even if the client disconnects and reconnects.
- Multi-consumer support: Supports multiple consumer groups, each group can independently consume messages in the stream. Consumer groups allow different consumers to process the same message, but each message can only be processed once by one consumer within a group.
- Message ID and range query: Each message has a unique ID, consisting of a timestamp and a sequence number. You can obtain messages within a specific time period by specifying the message ID range.
- Blocking read: Supports blocking read (
XREAD
andXREADGROUP
CommandBLOCK
Option), allowing the client to wait for a while when there is no new message. - Automatic deletion: The maximum length can be set (
MAXLEN
Option) to limit the size of the stream, messages exceeding the length will be automatically deleted. - Flexible message format: Each message can contain multiple field-value pairs, similar to a hash table, which allows messages to carry rich information.
2 Main operation commands
2.1 XADD key ID field value [field value ...]
Adds a new message to the specified stream, the ID can be * (indicated to automatic generation) or the specified timestamp and sequence number.
127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5 "1729306027171-0"
The returned structure can be divided into two parts:
-
Timestamp:
1729306027171 (
Indicates the time when the entry was added, in milliseconds. You can convert this timestamp to a readable date and time format.)
-
Serial number:
0 (
Indicates that this is the first entry within the same millisecond. If multiple entries are added within the same millisecond, the sequence number will be incremented, e.g.1729306027171-1
、1729306027171-2
wait.)
2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- Read data from one or more Streams.
-
COUNT
Specifies the maximum number of entries returned. -
BLOCK
Specifies the time (milliseconds) to block when there is no new message. -
STREAMS
Specifies the Stream and start ID to be read.
COUNT
Specifies the maximum number of entries returned.BLOCK
Specifies the time (milliseconds) to block when there is no new message.STREAMS
Specifies the Stream and start ID to be read.
127.0.0.1:6379> xread count 2 streams mystream 0-0 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.3 XRANGE key start end [COUNT count]
- Returns an entry within the specified ID range.
-
start
andend
Yes ID, can be used-
Indicates the minimum ID,+
Indicates the maximum ID.
127.0.0.1:6379> xrange mystream - + 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.4 XREVRANGE key end start [COUNT count]
Returns entries within the specified ID range, but in reverse order.
127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5 "1729329067777-0" 127.0.0.1:6379> xadd mystream * sensor_id 345 "1729329079135-0" 127.0.0.1:6379> xrevrange mystream + - count 2 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]
- Create a new consumer group.
-
id-or-$
It is the starting position, which can be a specific ID or$
Indicates that only new entries are consumed. -
MKSTREAM
Create Stream if it does not exist.
127.0.0.1:6379> xgroup create mystream mygroup 0 OK
2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- Read data from the consumer group.
- GROUP: Specify the name of the consumer group.
- consumer: Specify the name of the consumer.
- COUNT count: Optional parameter, specifying the maximum number of messages to be read at a time.
- BLOCK milliseconds: Optional parameter. If there is currently no message available, the command will block the specified time (in milliseconds) and wait for the arrival of a new message.
- NOACK: Indicates that the message is not confirmed, and is usually used for fast consumption.
- STREAMS: Specifies the stream to be read and its corresponding ID.
- ID is usually a special value
>
, means that only new messages are read; it can also be a specific ID, which means that reading starts from this ID.
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.7 XACK key group ID [ID ...]
Confirm that the processed message. XACK
The command is used to confirm that the message in the consumer group has been successfully processed. When you useXACK
When commanded, Redis will convert the specified message from the "pending" state to the "accepted" state and remove it from the consumer's pending list.
127.0.0.1:6379> xack mystream mygroup 1729329067777-0 (integer) 1 127.0.0.1:6379> xack mystream mygroup 1729329079135-0 (integer) 0
When the XACK command successfully acknowledges a message, the return value is 1, indicating that the message has been acknowledged and removed from the pending list. For example, if the message 1729329067777-0 is processed by consumer1 and now calls XACK to confirm it, the message will no longer appear in the pending list of consumer1.
2.8 XPENDING key group [start end count] [IDLE idle]
View pending messages.
127.0.0.1:6379> xpending mystream mygroup 1) (integer) 1 2) "1729306027171-0" 3) "1729306027171-0" 4) 1) 1) "consumer1" 2) "1" 127.0.0.1:6379> xack mystream mygroup 1729306027171-0 (integer) 1 127.0.0.1:6379> xpending mystream mygroup 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]
Used to transfer one or more messages from one consumer to another. This command is usually used to handle messages timeouts or reassign messages.XCLAIM
Allows you to manually move messages from one consumer's pending list to another consumer's pending list.
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
- mystream: The name of the stream. mygroup: The name of the consumer group.
- consumer2: The name of the target consumer, that is, the message will be transferred to this consumer.
- 10000: The idle time of the message (in milliseconds). Only messages whose free time exceeds this value will be transferred.
- 1729329079135-0: Message ID to be transferred.
2.10 XINFO
Get information about Stream or consumer group.
127.0.0.1:6379> xinfo stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 9) "last-generated-id" 10) "1729329079135-0" 11) "first-entry" 12) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 13) "last-entry" 14) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xinfo groups mystream 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 1 7) "last-delivered-id" 8) "1729329079135-0" 127.0.0.1:6379> xinfo consumers mystream mygroup 1) 1) "name" 2) "consumer1" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 255317 2) 1) "name" 2) "consumer2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 191940
XINFO STREAM mystream
length
:
Total number of messages in the stream: 3.
radix-tree-keys
:
Number of keys in radix tree used to store stream data: 1.
radix-tree-nodes
:
Number of nodes in the radix tree used to store streaming data: 2.
groups
:
Number of consumer groups associated with this stream: 1.
last-generated-id
:
The last message ID generated in the stream:1729329079135-0
。
first-entry
:
The first message in the stream: Message ID:1729306027171-0
Message content:sensor_id
: 123
temmperature
: 22.5
last-entry
:
The last message in the stream: Message ID:1729329079135-0
Message content:sensor_id
: 345
XINFO GROUPS mystream
name
:
Name of the consumer group:mygroup
。
consumers
:
Number of consumers in this group: 2.
pending
:
Number of messages pending in this group: 1.
last-delivered-id
:
The last message in the group was delivered ID:1729329079135-0
。
XINFO CONSUMERS mystream mygroup
The first consumer:
The first consumer:
-
name
:consumer1
-
pending
: Number of messages pending: 0 -
idle
: Idle time (in milliseconds): 255,317 milliseconds (approximately 4 minutes 15 seconds)
The second consumer:
-
name
:consumer2
-
pending
: Number of messages pending: 1 -
idle
: Idle time (in milliseconds): 191,940 milliseconds (approximately 3 minutes 12 seconds)
2.11 XDEL key ID [ID ...]
Remove one or more entries from Stream.
127.0.0.1:6379> xdel mystream 1729306027171-0 (integer) 1 127.0.0.1:6379> xrange mystrea - + (empty list or set) 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
2.12 XTRIM key MAXLEN [~] len
Trim Stream, keep up to len entries, ~ represents the approximate length.
127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xtrim mystream maxlen 1 (integer) 1 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
3 Use scenarios
- Logging: It can be used to store the system's log information for easy subsequent analysis and processing.
- Event flow: Handle real-time events such as sensor data, user behavior, etc.
- Message Queue: Implement a reliable messaging system and support multiple consumer groups.
- Task Queue: Manage background tasks to ensure that tasks are processed correctly.
For more commands, please refer to:Commands | Docs
This is all about this article about Redis data type Streams. For more related Redis data type Streams, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!