Skip to content

Commit

Permalink
- add: XAutoClaim
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Jul 4, 2023
1 parent c4d0910 commit d6455d1
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion src/FreeRedis/RedisClient/Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public Task<string> XAddAsync<T>(string key, long maxlen, string id, Dictionary<
.Input(string.IsNullOrEmpty(id) ? "*" : id)
.InputKv(fieldValues, false, SerializeRedisValue), rt => rt.ThrowOrValue<string>());

public Task<StreamsXAutoClaimResult> XAutoClaimAsync(string key, string group, string consumer, long minIdleTime, string start, long count = 0) => CallAsync("XAUTOCLAIM"
.InputKey(key, group, consumer)
.Input(minIdleTime, start)
.InputIf(count > 0, "COUNT", count), rt => rt.ThrowOrValueToXAutoClaim());

public Task<StreamsEntry[]> XClaimAsync(string key, string group, string consumer, long minIdleTime, params string[] id) => CallAsync("XCLAIM"
.InputKey(key, group, consumer)
.Input(minIdleTime, id), rt => rt.ThrowOrValueToStreamsEntryArray());
Expand Down Expand Up @@ -161,6 +166,11 @@ public string XAdd<T>(string key, long maxlen, string id, Dictionary<string, T>
.Input(string.IsNullOrEmpty(id) ? "*" : id)
.InputKv(fieldValues, false, SerializeRedisValue), rt => rt.ThrowOrValue<string>());

public StreamsXAutoClaimResult XAutoClaim(string key, string group, string consumer, long minIdleTime, string start, long count = 0) => Call("XAUTOCLAIM"
.InputKey(key, group, consumer)
.Input(minIdleTime, start)
.InputIf(count > 0, "COUNT", count), rt => rt.ThrowOrValueToXAutoClaim());

public StreamsEntry[] XClaim(string key, string group, string consumer, long minIdleTime, params string[] id) => Call("XCLAIM"
.InputKey(key, group, consumer)
.Input(minIdleTime, id), rt => rt.ThrowOrValueToStreamsEntryArray());
Expand Down Expand Up @@ -276,6 +286,20 @@ public long XTrim(string key, long count) => Call("XTRIM"
#region Model
static partial class RedisResultThrowOrValueExtensions
{
public static StreamsXAutoClaimResult ThrowOrValueToXAutoClaim(this RedisResult rt) =>
rt.ThrowOrValue((a, _) =>
{
if (a == null) return null;
var a1 = a[1] as object[];
var entries = new StreamsEntry[a.Length];
for (var z = 0; z < a1.Length; z++)
{
var objs1 = a1[z] as object[];
if (objs1 == null) continue;
entries[z] = new StreamsEntry { id = objs1[0].ConvertTo<string>(), fieldValues = objs1[1] as object[] };
}
return new StreamsXAutoClaimResult { start = a[0].ConvertTo<string>(), entries = entries, delIds = (a[2] as object[])?.Select(z => z.ConvertTo<string>()).ToArray() };
});

public static StreamsEntry[] ThrowOrValueToStreamsEntryArray(this RedisResult rt) =>
rt.ThrowOrValue((a, _) =>
Expand Down Expand Up @@ -414,6 +438,14 @@ public static StreamsEntryResult[] ThrowOrValueToXRead(this RedisResult rt) =>
});
}

public class StreamsXAutoClaimResult
{
public string start;
public StreamsEntry[] entries;
public string[] delIds;

public override string ToString() => $"{start}, [{string.Join("], [", entries.Select(a => a?.ToString()))}], [{string.Join(", ", delIds)}]";
}
public class StreamsXPendingResult
{
public long count;
Expand Down Expand Up @@ -517,7 +549,7 @@ public class StreamsEntryResult
public string key;
public StreamsEntry[] entries;

public override string ToString() => $"{key}, {string.Join("], [", entries.Select(a => a?.ToString()))}";
public override string ToString() => $"{key}, [{string.Join("], [", entries.Select(a => a?.ToString()))}]";
}
public class StreamsEntry
{
Expand Down

0 comments on commit d6455d1

Please sign in to comment.