In this exercise, you will implement Streams in an Azure Cache for Redis instance.
If your application requires stronger delivery guarantees, you may want to learn about Redis Streams. Messages in streams are persisted, and support both at-most-once as well as at-least-once delivery semantics. Streams also support consumer groups, which allow multiple consumers to read from the same stream. For more information, see Redis Streams.
Entries are added to a new or existing stream using the XADD
command. The stream is automatically created if it does not already exists.
Sign in to the Azure portal. Navigate to your Azure Cache for Redis instance.
In the Overview pane, select Console. This will open a Redis Console, which enables you to enter low-level Redis commands.
In the console, use the XADD
command to add two entries to the org.logs.clientapp
stream:
XADD org.logs.clientapp 1324092248593-0 device-id mobile error unknown-crash
XADD org.logs.clientapp 1481945061467-0 worker-process 1788 status success
The first argument to the
XADD
command is the name of the stream. The second argument is the key of the entry. The key is a combination of the current Unix time in milliseconds and a sequence number. The remaining arguments are the fields and values of the entry.
Observe the output from the two invocations of the XADD
command. The output will include the key of the newly added entries.
"1324092248593-0"
"1481945061467-0"
Use the XADD
command to add another new entry with an automatically generated identifier:
XADD org.logs.clientapp * application-status started
The
*
argument to theXADD
command indicates that the key should be automatically generated.
Observe the output from the invocation of the XADD
command. The output includes a newly generated key based on the current Unix time in milliseconds and a sequence number. For example, if the key is “1638502526759-0”, then the output would be:
"1638502526759-0"
The XLEN
command counts the number of entries in a stream. Once you are ready to query the entries, you can use the XRANGE
command to get entries within the stream.
Use the XLEN
command to count the number of entries in the org.logs.clientapp
stream:
XLEN org.logs.clientapp
Observe the output from the XLEN
command. The output will be an integer with a value of 3 for the entries created earlier in this exercise.
(integer) 3
Use the XRANGE
command and both the +
and -
operators to get a range of all data in the org.logs.clientapp
stream:
XRANGE org.logs.clientapp - +
Observe the output from invoking the XRANGE
command. This output will include all three entries in the stream.
1) 1) "1324092248593-0"
2) 1) "device-id"
2) "mobile"
3) "error"
4) "unknown-crash"
2) 1) "1481945061467-0"
2) 1) "worker-process"
2) "1788"
3) "status"
4) "success"
3) 1) "1638502526759-0"
2) 1) "application-status"
2) "started"
Your last key will not exactly match the identifier used in this example.
The XRANGE
command includes a +
and -
operator. These operators can be used with keys to query a subset of the data in a stream based on a time range.
Invoke the XRANGE command using the - operator and the key of the second entry (1481945061467-0):
XRANGE org.logs.clientapp - 1481945061467-0
Observe the output of the invocation of the XRANGE
command. The output includes all entries from the start of the stream, chronologically, up to the second entry (1481945061467-0).
1) 1) "1324092248593-0"
2) 1) "device-id"
2) "mobile"
3) "error"
4) "unknown-crash"
2) 1) "1481945061467-0"
2) 1) "worker-process"
2) "1788"
3) "status"
4) "success"
Invoke the XRANGE
command using the key of the second entry (1481945061467-0) and the + operator:
XRANGE org.logs.clientapp 1481945061467-0 +
Observe the output of the invocation of the XRANGE
command. The output includes the second entry, and then all entries up to the end of the stream, chronologically.
1) 1) "1481945061467-0"
2) 1) "worker-process"
2) "1788"
3) "status"
4) "success"
2) 1) "1638502526759-0"
2) 1) "application-status"
2) "started"
The last key will not exactly match the one used in this example.
Create a new .NET Core console application and open the project in Visual Studio Code.
dotnet new console --name redis-streams
cd redis-streams
code .
Add the NuGet package StackExchange.Redis
using the terminal shell.:
dotnet add package StackExchange.Redis
Update Program.cs
to create a ConnectionMultiplexer
:
using StackExchange.Redis;
var connectionString = "[cache-name].redis.cache.windows.net:6380,password=[password-here],ssl=True,abortConnect=False";
var redisConnection = ConnectionMultiplexer.Connect(connectionString);
You can obtain the
connectionString
from Access keys section of the Azure Cache for Redis instance in the Azure portal.
The Redis database is represented by the IDatabase
type. You can retrieve one using the GetDatabase()
method:
IDatabase db = redisConnection.GetDatabase();
Use the following to add a simple message with a single name/value pair to a stream:
var messageId = db.StreamAdd("events_stream", "foo_name", "bar_value");
Console.WriteLine($"messageId = {messageId}");
// messageId = 1518951480106-0
Each message or entry in the stream is represented by the
StreamEntry
type. Each stream entry contains a unique ID and an array of name/value pairs. The name/value pairs are represented by theNameValueEntry
type.
Multiple name/value pairs can be written to a stream using the following:
var values = new NameValueEntry[]
{
new NameValueEntry("sensor_id", "1234"),
new NameValueEntry("temp", "19.8")
};
var sensorMessageId = db.StreamAdd("sensor_stream", values);
Console.WriteLine($"sensorMessageId = {sensorMessageId}");
// sensorMessageId = 1681829523719-0
Reading from a stream is done by using either the StreamRead
or StreamRange
methods. Read all messages from the ID “0-0” to the end of the stream.
var messages = db.StreamRead("events_stream", "0-0");
var writeMessage = (string stream, StreamEntry message) => {
Console.WriteLine($"stream = {stream}");
Console.WriteLine($"messageId = {message.Id}");
foreach (var entry in message.Values)
{
Console.WriteLine($"entry = {entry.Name}:{entry.Value}");
}
};
foreach (var message in messages)
{
writeMessage("events_stream", message);
}
The StreamRead
method also allows you to read from multiple streams at once:
var streams = db.StreamRead(new StreamPosition[]
{
new StreamPosition("events_stream", "0-0"),
new StreamPosition("sensor_stream", "0-0")
});
Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");
foreach (var stream in streams)
{
foreach (var message in stream.Entries)
{
writeMessage(stream.Key.ToString(), message);
}
}
The StreamRange
method allows you to return a range of entries within a stream.
db.StreamRange("events_stream", minId: "-", maxId: "+");
The “-“ and “+” special characters indicate the smallest and greatest IDs possible. These values are the default values that will be used if no value is passed for the respective parameter.