diff --git a/.github/actions/run-tests/action.yml b/.github/actions/run-tests/action.yml
index e52cd7a2f..dfb80ffa2 100644
--- a/.github/actions/run-tests/action.yml
+++ b/.github/actions/run-tests/action.yml
@@ -66,6 +66,7 @@ runs:
run: >-
dotnet test tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj
-c Release
+ -f net10.0
--logger trx
--logger "GitHubActions;summary-include-passed=false;summary-include-skipped=false"
--results-directory ./test-results/
diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml
index 9089f65c0..3a56437b0 100644
--- a/.github/workflows/CI.yml
+++ b/.github/workflows/CI.yml
@@ -119,7 +119,15 @@ jobs:
- name: .NET Build
run: dotnet build Build.csproj -c Release /p:CI=true
- name: StackExchange.Redis.Tests
- run: dotnet test tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj -c Release --logger trx --logger GitHubActions --results-directory ./test-results/ /p:CI=true
+ run: |
+ $exitCode = 0
+ foreach ($tfm in @("net10.0", "net481")) {
+ dotnet test tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj -c Release -f $tfm --logger trx --logger GitHubActions --results-directory ./test-results/ /p:CI=true
+ if ($LASTEXITCODE -ne 0) {
+ $exitCode = $LASTEXITCODE
+ }
+ }
+ exit $exitCode
- uses: dorny/test-reporter@v3
continue-on-error: true
if: success() || failure()
@@ -127,10 +135,10 @@ jobs:
name: Tests Results - Windows Server 2022
path: 'test-results/*.trx'
reporter: dotnet-trx
- # Package and upload to MyGet only on pushes to main, not on PRs
+ # Package and upload to MyGet only on pushes to main/v3, not on PRs
- name: .NET Pack
- if: (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.ref == 'refs/heads/main'
+ if: (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/v3')
run: dotnet pack Build.csproj --no-build -c Release /p:PackageOutputPath=${env:GITHUB_WORKSPACE}\.nupkgs /p:CI=true
- name: Upload to MyGet
- if: (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.ref == 'refs/heads/main'
+ if: (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/v3')
run: dotnet nuget push ${env:GITHUB_WORKSPACE}\.nupkgs\*.nupkg -s https://www.myget.org/F/stackoverflow/api/v2/package -k ${{ secrets.MYGET_API_KEY }}
diff --git a/Directory.Build.props b/Directory.Build.props
index 4e62d5e0f..7d2e9c6c5 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -26,6 +26,7 @@
true
false
true
+ true
00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 71230ccbf..759993786 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -1,44 +1,40 @@
-
-
-
+
+
-
+
-
-
+
+
+
-
-
+
-
-
+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
-
+
-
-
-
+
+
-
-
-
-
-
+
+
+
+
\ No newline at end of file
diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings
index 8dd9095d9..339a73c59 100644
--- a/StackExchange.Redis.sln.DotSettings
+++ b/StackExchange.Redis.sln.DotSettings
@@ -26,4 +26,5 @@
True
True
True
- True
\ No newline at end of file
+ True
+ True
\ No newline at end of file
diff --git a/StackExchange.Redis.slnx b/StackExchange.Redis.slnx
index 02f110acd..f981b8ef0 100644
--- a/StackExchange.Redis.slnx
+++ b/StackExchange.Redis.slnx
@@ -24,6 +24,7 @@
+
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 43e48cfc7..ce3ab93f6 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -72,11 +72,11 @@ var conn = ConnectionMultiplexer.Connect("contoso5.redis.cache.windows.net,ssl=t
The `ConfigurationOptions` object has a wide range of properties, all of which are fully documented in intellisense. Some of the more common options to use include:
| Configuration string | `ConfigurationOptions` | Default | Meaning |
-| ---------------------- | ---------------------- | ---------------------------- | --------------------------------------------------------------------------------------------------------- |
+| ---------------------- | ---------------------- |------------------------------| --------------------------------------------------------------------------------------------------------- |
| abortConnect={bool} | `AbortOnConnectFail` | `true` (`false` on Azure) | If true, `Connect` will not create a connection while no servers are available |
| allowAdmin={bool} | `AllowAdmin` | `false` | Enables a range of commands that are considered risky |
| channelPrefix={string} | `ChannelPrefix` | `null` | Optional channel prefix for all pub/sub operations |
-| checkCertificateRevocation={bool} | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. |
+| checkCertificateRevocation={bool} | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. |
| connectRetry={int} | `ConnectRetry` | `3` | The number of times to repeat connect attempts during initial `Connect` |
| connectTimeout={int} | `ConnectTimeout` | `5000` | Timeout (ms) for connect operations |
| configChannel={string} | `ConfigurationChannel` | `__Booksleeve_MasterChanged` | Broadcast channel name for communicating configuration changes |
@@ -96,7 +96,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a
| syncTimeout={int} | `SyncTimeout` | `5000` | Time (ms) to allow for synchronous operations |
| asyncTimeout={int} | `AsyncTimeout` | `SyncTimeout` | Time (ms) to allow for asynchronous operations |
| tiebreaker={string} | `TieBreaker` | `__Booksleeve_TieBreak` | Key to use for selecting a server in an ambiguous primary scenario |
-| version={string} | `DefaultVersion` | (`4.0` in Azure, else `2.0`) | Redis version level (useful when the server does not make this available) |
+| version={string} | `DefaultVersion` | (`7.4` in AMR, else `6.0`) | Redis version level (useful when the server does not make this available) |
| tunnel={string} | `Tunnel` | `null` | Tunnel for connections (use `http:{proxy url}` for "connect"-based proxy server) |
| setlib={bool} | `SetClientLibrary` | `true` | Whether to attempt to use `CLIENT SETINFO` to set the library name/version on the connection |
| protocol={string} | `Protocol` | `null` | Redis protocol to use; see section below |
diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md
index e23d492b2..5a752e7bb 100644
--- a/docs/ReleaseNotes.md
+++ b/docs/ReleaseNotes.md
@@ -6,7 +6,14 @@ Current package versions:
| ------------ | ----------------- | ----- |
| [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |
-## Unreleased
+## 3.0
+
+From 3.0, [release notes will be maintained in GitHub only](https://github.com/StackExchange/StackExchange.Redis/releases) to avoid duplication.
+
+---
+
+
+## 2.12.14
- (none)
diff --git a/docs/exp/SER004.md b/docs/exp/SER004.md
new file mode 100644
index 000000000..91f5d87c4
--- /dev/null
+++ b/docs/exp/SER004.md
@@ -0,0 +1,15 @@
+# RESPite
+
+RESPite is an experimental library that provides high-performance low-level RESP (Redis, etc) parsing and serialization.
+It is used as the IO core for StackExchange.Redis v3+. You should not (yet) use it directly unless you have a very
+good reason to do so.
+
+```xml
+$(NoWarn);SER004
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER004
+```
diff --git a/docs/exp/SER005.md b/docs/exp/SER005.md
new file mode 100644
index 000000000..03e7b7bb4
--- /dev/null
+++ b/docs/exp/SER005.md
@@ -0,0 +1,21 @@
+# Unit Testing
+
+Unit testing is great! Yay, do more of that!
+
+This type is provided for external unit testing, in particular by people using modules or server features
+not directly implemented by SE.Redis - for example to verify messsage parsing or formatting without
+talking to a RESP server.
+
+These types are considered slightly more... *mercurial*. We encourage you to use them, but *occasionally*
+(not just for fun) you might need to update your test code if we tweak something. This should not impact
+"real" library usage.
+
+```xml
+$(NoWarn);SER005
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER005
+```
diff --git a/src/RESPite.Benchmark/BenchmarkBase.cs b/src/RESPite.Benchmark/BenchmarkBase.cs
new file mode 100644
index 000000000..50994b23f
--- /dev/null
+++ b/src/RESPite.Benchmark/BenchmarkBase.cs
@@ -0,0 +1,690 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.ComponentModel;
+using System.Diagnostics;
+using System.Linq;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+
+// influenced by redis-benchmark, see .md file
+namespace RESPite.Benchmark;
+
+public abstract class BenchmarkBase : IDisposable
+{
+ protected const string
+ GetSetKey = "key:__rand_int__",
+ CounterKey = "counter:__rand_int__",
+ ListKey = "mylist",
+ SetKey = "myset",
+ HashKey = "myhash",
+ SortedSetKey = "myzset",
+ StreamKey = "mystream";
+
+ public PipelineStrategy PipelineMode { get; } =
+ PipelineStrategy.Batch; // the default, for parity with how redis-benchmark works
+
+ public enum PipelineStrategy
+ {
+ ///
+ /// Build a batch of operations, send them all at once.
+ ///
+ Batch,
+
+ ///
+ /// Use a queue to pipeline operations - when we hit the pipeline depth, we pop one, push one, await the popped.
+ ///
+ Queue,
+ }
+
+ private readonly HashSet _tests = new(StringComparer.OrdinalIgnoreCase);
+ protected bool RunTest(string name) => _tests.Count == 0 || _tests.Contains(name);
+ public virtual void Dispose() => GC.SuppressFinalize(this);
+ public int Port { get; } = 6379;
+ public string HostName { get; } = "127.0.0.1";
+ public int PipelineDepth { get; } = 1;
+ public bool Multiplexed { get; }
+ public bool SupportCancel { get; }
+ public int? WriteMode { get; }
+ public bool Loop { get; }
+ public bool Quiet { get; }
+ public int ClientCount { get; } = 50;
+ private int _operationsPerClient;
+ public int OperationsPerClient(int divisor = 1) => _operationsPerClient / divisor;
+
+ public int TotalOperations(int divisor = 1) => OperationsPerClient(divisor) * ClientCount;
+
+ protected readonly byte[] Payload;
+
+ protected BenchmarkBase(string[] args)
+ {
+ int operations = 100_000;
+
+ string tests = "";
+ for (int i = 0; i < args.Length; i++)
+ {
+ switch (args[i])
+ {
+ case "-h" when i != args.Length - 1:
+ HostName = args[++i];
+ break;
+ case "-p" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
+ Port = tmp;
+ break;
+ case "-c" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
+ ClientCount = tmp;
+ break;
+ case "-n" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
+ operations = tmp;
+ break;
+ case "-P" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
+ PipelineDepth = tmp;
+ break;
+ case "-w" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp):
+ WriteMode = tmp;
+ break;
+ case "+m":
+ Multiplexed = true;
+ break;
+ case "-m":
+ Multiplexed = false;
+ break;
+ case "+x":
+ SupportCancel = true;
+ break;
+ case "-x":
+ SupportCancel = false;
+ break;
+ case "-l":
+ Loop = true;
+ break;
+ case "-q":
+ Quiet = true;
+ break;
+ case "-t" when i != args.Length - 1:
+ tests = args[++i];
+ break;
+ case "--batch":
+ PipelineMode = PipelineStrategy.Batch;
+ break;
+ case "--queue":
+ PipelineMode = PipelineStrategy.Queue;
+ break;
+ }
+ }
+
+ if (!string.IsNullOrWhiteSpace(tests))
+ {
+ foreach (var test in tests.Split(','))
+ {
+ var t = test.Trim();
+ if (!string.IsNullOrWhiteSpace(t)) _tests.Add(t);
+ }
+ }
+
+ _operationsPerClient = operations / ClientCount;
+
+ Payload = "abc"u8.ToArray();
+ }
+
+ public abstract Task RunAll();
+
+ public async Task RunBasicLoopAsync()
+ {
+ await DeleteAsync(CounterKey).ConfigureAwait(false);
+
+ if (ClientCount <= 1)
+ {
+ await RunBasicLoopAsync(0);
+ }
+ else
+ {
+ Task[] tasks = new Task[ClientCount];
+ for (int i = 0; i < ClientCount; i++)
+ {
+ var loopSnapshot = i;
+ tasks[i] = Task.Run(() => RunBasicLoopAsync(loopSnapshot));
+ }
+
+ await Task.WhenAll(tasks);
+ }
+ }
+
+ protected abstract Task RunBasicLoopAsync(int clientId);
+ protected abstract Task DeleteAsync(string key);
+}
+
+public abstract class BenchmarkBase(string[] args) : BenchmarkBase(args)
+{
+ protected override Task DeleteAsync(string key) => DeleteAsync(GetClient(0), key);
+
+ protected virtual Task OnCleanupAsync(TClient client) => Task.CompletedTask;
+
+ protected virtual Task InitAsync(TClient client) => Task.CompletedTask;
+
+ public async Task CleanupAsync()
+ {
+ try
+ {
+ var client = GetClient(0);
+ await DeleteAsync(client, GetSetKey).ConfigureAwait(false);
+ await DeleteAsync(client, CounterKey).ConfigureAwait(false);
+ await DeleteAsync(client, ListKey).ConfigureAwait(false);
+ await DeleteAsync(client, SetKey).ConfigureAwait(false);
+ await DeleteAsync(client, HashKey).ConfigureAwait(false);
+ await DeleteAsync(client, SortedSetKey).ConfigureAwait(false);
+ await DeleteAsync(client, StreamKey).ConfigureAwait(false);
+ await OnCleanupAsync(client).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ await Console.Error.WriteLineAsync($"Cleanup: {ex.Message}");
+ }
+ }
+
+ protected virtual ValueTask Flush(TClient client) => default;
+ protected virtual void PrepareBatch(TClient client, int count) { }
+
+ private async Task PipelineUntyped(
+ TClient client,
+ Func operation,
+ int divisor)
+ {
+ var opsPerClient = OperationsPerClient(divisor);
+ int i = 0;
+ try
+ {
+ if (PipelineDepth <= 1)
+ {
+ for (; i < opsPerClient; i++)
+ {
+ await operation(client).ConfigureAwait(false);
+ }
+ }
+ else if (PipelineMode == PipelineStrategy.Queue)
+ {
+ var queue = new Queue(opsPerClient);
+ for (; i < opsPerClient; i++)
+ {
+ if (queue.Count == opsPerClient)
+ {
+ await queue.Dequeue().ConfigureAwait(false);
+ }
+
+ queue.Enqueue(operation(client));
+ }
+
+ while (queue.Count > 0)
+ {
+ await queue.Dequeue().ConfigureAwait(false);
+ }
+ }
+ else if (PipelineMode == PipelineStrategy.Batch)
+ {
+ int count = 0;
+ var oversized = ArrayPool.Shared.Rent(PipelineDepth);
+ PrepareBatch(client, Math.Min(opsPerClient, PipelineDepth));
+ for (; i < opsPerClient; i++)
+ {
+ oversized[count++] = operation(client);
+ if (count == PipelineDepth)
+ {
+ await Flush(client).ConfigureAwait(false);
+ PrepareBatch(client, Math.Min(opsPerClient - i, PipelineDepth));
+ for (int j = 0; j < count; j++)
+ {
+ await oversized[j].ConfigureAwait(false);
+ }
+
+ count = 0;
+ }
+ }
+
+ await Flush(client).ConfigureAwait(false);
+ for (int j = 0; j < count; j++)
+ {
+ await oversized[j].ConfigureAwait(false);
+ }
+
+ ArrayPool.Shared.Return(oversized);
+ }
+ else
+ {
+ throw new InvalidOperationException($"Unexpected pipeline mode: {PipelineMode}");
+ }
+ }
+ catch (Exception ex)
+ {
+ await Console.Error.WriteLineAsync($"{operation.Method.Name} failed after {i} operations");
+ Program.WriteException(ex);
+ }
+
+ return DBNull.Value;
+ }
+
+ private async Task PipelineTyped(TClient client, Func> operation, int divisor)
+ {
+ var opsPerClient = OperationsPerClient(divisor);
+ int i = 0;
+ T result = default!;
+ try
+ {
+ if (PipelineDepth == 1)
+ {
+ for (; i < opsPerClient; i++)
+ {
+ result = await operation(client).ConfigureAwait(false);
+ }
+ }
+ else if (PipelineMode == PipelineStrategy.Queue)
+ {
+ var queue = new Queue>(opsPerClient);
+ for (; i < opsPerClient; i++)
+ {
+ if (queue.Count == opsPerClient)
+ {
+ _ = await queue.Dequeue().ConfigureAwait(false);
+ }
+
+ queue.Enqueue(operation(client));
+ }
+
+ while (queue.Count > 0)
+ {
+ result = await queue.Dequeue().ConfigureAwait(false);
+ }
+ }
+ else if (PipelineMode == PipelineStrategy.Batch)
+ {
+ int count = 0;
+ var oversized = ArrayPool>.Shared.Rent(PipelineDepth);
+ PrepareBatch(client, Math.Min(opsPerClient, PipelineDepth));
+ for (; i < opsPerClient; i++)
+ {
+ oversized[count++] = operation(client);
+ if (count == PipelineDepth)
+ {
+ await Flush(client).ConfigureAwait(false);
+ PrepareBatch(client, Math.Min(opsPerClient - (i + 1), PipelineDepth));
+ for (int j = 0; j < count; j++)
+ {
+ result = await oversized[j].ConfigureAwait(false);
+ }
+
+ count = 0;
+ }
+ }
+
+ await Flush(client).ConfigureAwait(false);
+ for (int j = 0; j < count; j++)
+ {
+ result = await oversized[j].ConfigureAwait(false);
+ }
+
+ ArrayPool>.Shared.Return(oversized);
+ }
+ else
+ {
+ throw new InvalidOperationException($"Unexpected pipeline mode: {PipelineMode}");
+ }
+ }
+ catch (Exception ex)
+ {
+ await Console.Error.WriteLineAsync($"{operation.Method.Name} failed after {i} operations");
+ Program.WriteException(ex);
+ }
+
+ return result;
+ }
+
+ public async Task InitAsync()
+ {
+ for (int i = 0; i < ClientCount; i++)
+ {
+ await InitAsync(GetClient(i)).ConfigureAwait(false);
+ }
+ }
+
+ protected abstract TClient GetClient(int index);
+ protected virtual TClient WithCancellation(TClient client, CancellationToken cancellationToken) => client;
+ protected abstract Task DeleteAsync(TClient client, string key);
+
+ protected abstract TClient CreateBatch(TClient client);
+
+ protected Task RunAsync(
+ string? key,
+ Func> action,
+ bool deleteKey,
+ int divisor = 1)
+ => RunAsyncCore(
+ key,
+ GetNameCore(action, out var desc),
+ desc,
+ client => action(client).AsUntypedValueTask(),
+ client => PipelineTyped(client, action, divisor),
+ [],
+ deleteKey,
+ divisor);
+
+ protected Task RunAsync(
+ string? key,
+ Func> action,
+ params string[] consumers)
+ => RunAsyncCore(
+ key,
+ GetNameCore(action, out var desc),
+ desc,
+ client => action(client).AsUntypedValueTask(),
+ client => PipelineTyped(client, action, 1),
+ consumers,
+ consumers.Length != 0,
+ 1);
+
+ protected Task RunAsync(
+ string? key,
+ Func action,
+ bool deleteKey,
+ int divisor = 1)
+ => RunAsyncCore(
+ key,
+ GetNameCore(action, out var desc),
+ desc,
+ action,
+ client => PipelineUntyped(client, action, divisor),
+ [],
+ deleteKey,
+ divisor);
+
+ protected Task RunAsync(
+ string? key,
+ Func action,
+ params string[] consumers)
+ => RunAsyncCore(
+ key,
+ GetNameCore(action, out var desc),
+ desc,
+ action,
+ client => PipelineUntyped(client, action, 1),
+ consumers,
+ consumers.Length != 0,
+ 1);
+
+ private static string GetNameCore(Delegate underlyingAction, out string description)
+ {
+ string name = underlyingAction.Method.Name;
+
+ if (underlyingAction.Method.GetCustomAttribute(typeof(DisplayNameAttribute)) is DisplayNameAttribute
+ {
+ DisplayName: { Length: > 0 }
+ } dna)
+ {
+ name = dna.DisplayName;
+ }
+
+ description = "";
+ if (underlyingAction.Method.GetCustomAttribute(typeof(DescriptionAttribute)) is DescriptionAttribute
+ {
+ Description: { Length: > 0 }
+ } da)
+ {
+ description = da.Description;
+ }
+
+ return name;
+ }
+
+ protected static string GetName(Func> action) => GetNameCore(action, out _);
+ protected static string GetName(Func action) => GetNameCore(action, out _);
+
+ private async Task RunAsyncCore(
+ string? key,
+ string name,
+ string description,
+ Func test,
+ Func> pipeline,
+ string[] consumers,
+ bool deleteKey,
+ int divisor)
+ {
+ // skip test if not needed
+ string auxReason = "";
+ if (!RunTest(name))
+ {
+ auxReason = string.Join(", ", consumers.Where(x => RunTest(x)));
+ if (auxReason.Length == 0) return; // not needed by any consumers either
+ auxReason = $" (required for {auxReason})";
+ }
+
+ // include additional test metadata
+ if (description is { Length: > 0 })
+ {
+ description = $" ({description})";
+ }
+
+ if (Quiet)
+ {
+ Console.Write($"{name}:");
+ }
+ else
+ {
+ Console.Write(
+ $"====== {name}{description}{auxReason} ====== (clients: {ClientCount:#,##0}, ops: {TotalOperations(divisor):#,##0}");
+ if (Multiplexed)
+ {
+ Console.Write(", mux");
+ }
+
+ if (SupportCancel)
+ {
+ Console.Write(", cancel");
+ }
+
+ Console.Write(PipelineDepth > 1 ? $", {PipelineMode}: {PipelineDepth:#,##0}" : ", sequential");
+
+ Console.WriteLine(")");
+ }
+
+ bool didNotRun = false;
+ try
+ {
+ if (key is not null && deleteKey)
+ {
+ await DeleteAsync(GetClient(0), key).ConfigureAwait(false);
+ }
+
+ try
+ {
+ await test(GetClient(0)).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ await Console.Error.WriteLineAsync($"\t{ex.Message}");
+ didNotRun = true;
+ return;
+ }
+
+ var pending = new Task[ClientCount];
+ int index = 0;
+#if DEBUG && NEWCORE
+ Internal.DebugCounters.Flush();
+#endif
+ // optionally support cancellation, applied per-test
+ CancellationToken cancellationToken = CancellationToken.None;
+ using var cts = SupportCancel ? new CancellationTokenSource(TimeSpan.FromSeconds(20)) : null;
+ if (SupportCancel) cancellationToken = cts!.Token;
+
+ var watch = Stopwatch.StartNew();
+ for (int i = 0; i < ClientCount; i++)
+ {
+ var client = GetClient(i);
+ if (PipelineMode == PipelineStrategy.Batch && PipelineDepth > 1)
+ {
+ client = CreateBatch(client);
+ }
+
+ pending[index++] = Task.Run(
+ () => pipeline(WithCancellation(client, cancellationToken)),
+ cancellationToken);
+ }
+
+ await Task.WhenAll(pending).ConfigureAwait(false);
+ watch.Stop();
+
+ var seconds = watch.Elapsed.TotalSeconds;
+ // ReSharper disable once PossibleLossOfFraction
+ var rate = TotalOperations(divisor) / seconds;
+ if (Quiet)
+ {
+ Console.WriteLine($"\t{rate:###,###,##0} requests per second");
+ return;
+ }
+ else
+ {
+ Console.WriteLine(
+ $"{TotalOperations(divisor):###,###,##0} requests completed in {seconds:0.00} seconds, {rate:###,###,##0} ops/sec");
+ }
+
+ if (!Quiet & typeof(T) != typeof(DBNull))
+ {
+ const string format = "Typical result: {0}";
+
+ T result = await pending[^1];
+ Console.WriteLine(format, result);
+ }
+ }
+ catch (Exception ex)
+ {
+ if (Quiet) Console.WriteLine();
+ Program.WriteException(ex, name);
+ }
+ finally
+ {
+ _ = didNotRun;
+#if DEBUG && NEWCORE
+ var counters = Internal.DebugCounters.Flush(); // flush even if not showing
+ if (!Quiet & !didNotRun)
+ {
+ if (counters.WriteBytes != 0)
+ {
+ Console.Write($"Write: {FormatBytes(counters.WriteBytes)}");
+ if (counters.SyncWriteCount != 0) Console.Write($"; {counters.SyncWriteCount:#,##0} sync");
+ if (counters.AsyncWriteInlineCount != 0)
+ Console.Write($"; {counters.AsyncWriteInlineCount:#,##0} async-inline");
+ if (counters.AsyncWriteCount != 0) Console.Write($"; {counters.AsyncWriteCount:#,##0} full-async");
+ Console.WriteLine();
+ }
+
+ if (counters.ReadBytes != 0)
+ {
+ Console.Write($"Read: {FormatBytes(counters.ReadBytes)}");
+ if (counters.ReadCount != 0) Console.Write($"; {counters.ReadCount:#,##0} sync");
+ if (counters.AsyncReadInlineCount != 0)
+ Console.Write($"; {counters.AsyncReadInlineCount:#,##0} async-inline");
+ if (counters.AsyncReadCount != 0) Console.Write($"; {counters.AsyncReadCount:#,##0} full-async");
+ Console.WriteLine();
+ }
+
+ if (counters.DiscardFullCount + counters.DiscardPartialCount != 0)
+ {
+ Console.Write($"Discard average: {FormatBytes(counters.DiscardAverage)}");
+ if (counters.DiscardFullCount != 0) Console.Write($"; {counters.DiscardFullCount} full");
+ if (counters.DiscardPartialCount != 0) Console.Write($"; {counters.DiscardPartialCount} partial");
+ Console.WriteLine();
+ }
+
+ if (counters.CopyOutCount != 0)
+ {
+ Console.WriteLine(
+ $"Copy out: {FormatBytes(counters.CopyOutBytes)}; {counters.CopyOutCount:#,##0} times");
+ }
+
+ if (counters.PipelineFullAsyncCount != 0
+ | counters.PipelineSendAsyncCount != 0
+ | counters.PipelineFullSyncCount != 0)
+ {
+ Console.Write("Pipelining");
+ if (counters.PipelineFullSyncCount != 0)
+ Console.Write($"; full sync: {counters.PipelineFullSyncCount:#,##0}");
+ if (counters.PipelineSendAsyncCount != 0)
+ Console.Write($"; send async: {counters.PipelineSendAsyncCount:#,##0}");
+ if (counters.PipelineFullAsyncCount != 0)
+ Console.Write($"; full async: {counters.PipelineFullAsyncCount:#,##0}");
+ Console.WriteLine();
+ }
+
+ if (counters.BatchWriteCount != 0)
+ {
+ Console.Write($"Batching; {counters.BatchWriteCount:#,##0} batches");
+ if (counters.BatchWriteFullPageCount != 0)
+ Console.Write($"; {counters.BatchWriteFullPageCount:#,###,##0} full pages");
+ if (counters.BatchWritePartialPageCount != 0)
+ Console.Write($"; {counters.BatchWritePartialPageCount:#,###,##0} partial pages");
+ if (counters.BatchWriteMessageCount != 0)
+ Console.Write($"; {counters.BatchWriteMessageCount:#,###,##0} messages");
+ Console.WriteLine();
+ }
+
+ if (counters.BatchGrowCount != 0)
+ {
+ Console.WriteLine(
+ $"Batch growth; {counters.BatchGrowCount:#,##0} events, {counters.BatchGrowCopyCount:#,###,##0} elements copied");
+ }
+
+ if (counters.BatchBufferLeaseCount != 0 | counters.BatchMultiRootMessageCount != 0)
+ {
+ Console.Write(
+ $"Multi-message batching: {counters.BatchMultiRootMessageCount:#,###,##0} batches, {counters.BatchMultiChildMessageCount:#,###,##0} sub-messages");
+ if (counters.BatchBufferLeaseCount != 0)
+ {
+ Console.Write(
+ $"; {counters.BatchBufferLeaseCount:#,###,##0} blocks leased, {counters.BatchBufferReturnCount:#,###,##0} blocks returned, {counters.BatchBufferElementsOutstanding:#,###,##0} elements outstanding");
+ }
+ Console.WriteLine();
+ }
+
+ if (counters.BufferCreatedCount != 0 ||
+ counters.BufferRecycledCount != 0 | counters.BufferMessageCount != 0)
+ {
+ Console.Write("Buffers");
+ if (counters.BufferCreatedCount != 0)
+ {
+ Console.Write(
+ $"; created: {counters.BufferCreatedCount:#,###,##0}, {FormatBytes(counters.BufferTotalBytes)}");
+ // always write recycled count - it being zero is important
+ Console.Write(
+ $"; recycled: {counters.BufferRecycledCount:#,###,##0}, {FormatBytes(counters.BufferRecycledBytes)}");
+ }
+
+ if (counters.BufferMessageCount != 0)
+ {
+ Console.Write(
+ $"; {counters.BufferMessageCount:#,###,##0} messages, {FormatBytes(counters.BufferMessageBytes)}");
+ }
+
+ Console.Write(
+ $"; max working {FormatBytes(counters.BufferMaxOutstandingBytes)}; {counters.BufferPinCount:#,###,##0} pins; {counters.BufferLeakCount:#,###,##0} leaks");
+ Console.WriteLine();
+ }
+
+ static string FormatBytes(long bytes)
+ {
+ // ReSharper disable InconsistentNaming
+ const long k = 1024, M = k * k, G = M * k, T = G * k;
+
+ // ReSharper restore InconsistentNaming
+ return bytes switch
+ {
+ < k => $"{bytes:#,##0} B",
+ < M => $"{bytes / (double)k:#,##0.00} KiB",
+ < G => $"{bytes / (double)M:#,##0.00} MiB",
+ < T => $"{bytes / (double)G:#,##0.00} GiB",
+ _ => $"{bytes / (double)T:#,##0.00} TiB",
+ };
+ }
+ }
+#endif
+ if (!Quiet) Console.WriteLine();
+ }
+ }
+}
diff --git a/src/RESPite.Benchmark/BridgeBenchmark.cs b/src/RESPite.Benchmark/BridgeBenchmark.cs
new file mode 100644
index 000000000..0e530050f
--- /dev/null
+++ b/src/RESPite.Benchmark/BridgeBenchmark.cs
@@ -0,0 +1,17 @@
+#if NEWCORE
+using RESPite.StackExchange.Redis;
+using StackExchange.Redis;
+
+namespace RESPite.Benchmark;
+
+public sealed class BridgeBenchmark(string[] args) : OldCoreBenchmarkBase(args)
+{
+ public override string ToString() => "bridge SE.Redis";
+ protected override IConnectionMultiplexer Create(int port)
+ {
+ var obj = new RespMultiplexer();
+ obj.Connect("{HostName}:{Port}");
+ return obj;
+ }
+}
+#endif
diff --git a/src/RESPite.Benchmark/NewCoreBenchmark.cs b/src/RESPite.Benchmark/NewCoreBenchmark.cs
new file mode 100644
index 000000000..649b0b984
--- /dev/null
+++ b/src/RESPite.Benchmark/NewCoreBenchmark.cs
@@ -0,0 +1,435 @@
+#if NEWCORE
+using System;
+using System.ComponentModel;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using RESPite.Connections;
+using RESPite.Messages;
+
+namespace RESPite.Benchmark;
+
+public sealed class NewCoreBenchmark : BenchmarkBase
+{
+ public override string ToString() => "new IO core";
+
+ private readonly RespConnectionPool _connectionPool;
+
+ private readonly RespContext[] _clients;
+ private readonly (string Key, byte[] Value)[] _pairs;
+
+ protected override RespContext GetClient(int index) => _clients[index];
+
+ protected override Task DeleteAsync(RespContext client, string key) => client.DelAsync(key).AsTask();
+
+ protected override RespContext WithCancellation(RespContext client, CancellationToken cancellationToken)
+ => client.WithCancellationToken(cancellationToken);
+
+ protected override Task InitAsync(RespContext client) => client.PingAsync().AsTask();
+
+ public NewCoreBenchmark(string[] args) : base(args)
+ {
+ _clients = new RespContext[ClientCount];
+
+ _connectionPool = new(count: Multiplexed ? 1 : ClientCount);
+ _connectionPool.ConnectionError += (_, e) => Program.WriteException(e.Exception, e.Operation);
+ _pairs = new (string, byte[])[10];
+
+ for (var i = 0; i < 10; i++)
+ {
+ _pairs[i] = ($"key:__rand_int__{i}", Payload);
+ }
+
+ if (Multiplexed)
+ {
+ var conn = _connectionPool.GetConnection().Synchronized();
+ var ctx = conn.Context;
+ for (int i = 0; i < ClientCount; i++) // init all
+ {
+ _clients[i] = ctx;
+ }
+ }
+ else
+ {
+ for (int i = 0; i < ClientCount; i++) // init all
+ {
+ var conn = _connectionPool.GetConnection();
+ if (PipelineDepth > 1)
+ {
+ conn = conn.Synchronized();
+ }
+
+ _clients[i] = conn.Context;
+ }
+ }
+ }
+
+ public override void Dispose()
+ {
+ _connectionPool.Dispose();
+ foreach (var client in _clients)
+ {
+ client.Connection.Dispose();
+ }
+ }
+
+ protected override async Task OnCleanupAsync(RespContext client)
+ {
+ foreach (var pair in _pairs)
+ {
+ await client.DelAsync(pair.Key).ConfigureAwait(false);
+ }
+ }
+
+ public override async Task RunAll()
+ {
+ await InitAsync().ConfigureAwait(false);
+ // await RunAsync(PingInline).ConfigureAwait(false);
+ await RunAsync(null, PingBulk).ConfigureAwait(false);
+
+ await RunAsync(GetSetKey, Set, GetName(Get)).ConfigureAwait(false);
+ await RunAsync(GetSetKey, Get).ConfigureAwait(false);
+
+ await RunAsync(CounterKey, Incr, true).ConfigureAwait(false);
+
+ await RunAsync(ListKey, LPush, GetName(LPop)).ConfigureAwait(false);
+ await RunAsync(ListKey, LPop).ConfigureAwait(false);
+
+ await RunAsync(ListKey, RPush, GetName(RPop)).ConfigureAwait(false);
+ await RunAsync(ListKey, RPop).ConfigureAwait(false);
+
+ await RunAsync(SetKey, SAdd, GetName(SPop)).ConfigureAwait(false);
+ await RunAsync(SetKey, SPop).ConfigureAwait(false);
+
+ await RunAsync(HashKey, HSet).ConfigureAwait(false);
+
+ await RunAsync(SortedSetKey, ZAdd, GetName(ZPopMin)).ConfigureAwait(false);
+ await RunAsync(SortedSetKey, ZPopMin).ConfigureAwait(false);
+
+ await RunAsync(null, MSet).ConfigureAwait(false);
+ await RunAsync(StreamKey, XAdd).ConfigureAwait(false);
+
+ // leave until last, they're slower
+ if (RunTest(GetName(LRange100)) ||
+ RunTest(GetName(LRange300)) ||
+ RunTest(GetName(LRange500)) ||
+ RunTest(GetName(LRange600)))
+ {
+ await LRangeInit650(GetClient(0)).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange100, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange300, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange500, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange600, false, 10).ConfigureAwait(false);
+ }
+
+ await CleanupAsync().ConfigureAwait(false);
+ }
+
+ protected override RespContext CreateBatch(RespContext client) => client.CreateBatch(PipelineDepth).Context;
+
+ protected override ValueTask Flush(RespContext client)
+ {
+ if (client.Connection is RespBatch batch)
+ {
+ return new(batch.FlushAsync());
+ }
+
+ return default;
+ }
+
+ protected override void PrepareBatch(RespContext client, int count)
+ {
+ if (client.Connection is RespBatch batch)
+ {
+ batch.EnsureCapacity(count);
+ }
+ }
+
+ [DisplayName("PING_INLINE")]
+ // ReSharper disable once UnusedMember.Local
+ private ValueTask PingInline(RespContext ctx) => ctx.PingInlineAsync(Payload);
+
+ [DisplayName("PING_BULK")]
+ private ValueTask PingBulk(RespContext ctx) => ctx.PingAsync(Payload);
+
+ [DisplayName("INCR")]
+ private ValueTask Incr(RespContext ctx) => ctx.IncrAsync(CounterKey);
+
+ [DisplayName("GET")]
+ private ValueTask Get(RespContext ctx) => ctx.GetAsync(GetSetKey);
+
+ [DisplayName("SET")]
+ private ValueTask Set(RespContext ctx) => ctx.SetAsync(GetSetKey, Payload);
+
+ [DisplayName("LPUSH")]
+ private ValueTask LPush(RespContext ctx) => ctx.LPushAsync(ListKey, Payload);
+
+ [DisplayName("RPUSH")]
+ private ValueTask RPush(RespContext ctx) => ctx.RPushAsync(ListKey, Payload);
+
+ [DisplayName("LRANGE_100")]
+ private ValueTask LRange100(RespContext ctx) => ctx.LRangeAsync(ListKey, 0, 99);
+
+ [DisplayName("LRANGE_300")]
+ private ValueTask LRange300(RespContext ctx) => ctx.LRangeAsync(ListKey, 0, 299);
+
+ [DisplayName("LRANGE_500")]
+ private ValueTask LRange500(RespContext ctx) => ctx.LRangeAsync(ListKey, 0, 499);
+
+ [DisplayName("LRANGE_600")]
+ private ValueTask LRange600(RespContext ctx) => ctx.LRangeAsync(ListKey, 0, 599);
+
+ [DisplayName("LPOP")]
+ private ValueTask LPop(RespContext ctx) => ctx.LPopAsync(ListKey);
+
+ [DisplayName("RPOP")]
+ private ValueTask RPop(RespContext ctx) => ctx.RPopAsync(ListKey);
+
+ [DisplayName("SADD")]
+ private ValueTask SAdd(RespContext ctx) => ctx.SAddAsync(SetKey, "element:__rand_int__");
+
+ [DisplayName("HSET")]
+ private ValueTask HSet(RespContext ctx) => ctx.HSetAsync(HashKey, "element:__rand_int__", Payload);
+
+ [DisplayName("ZADD")]
+ private ValueTask ZAdd(RespContext ctx) => ctx.ZAddAsync(SortedSetKey, 0, "element:__rand_int__");
+
+ [DisplayName("ZPOPMIN")]
+ private ValueTask ZPopMin(RespContext ctx) => ctx.ZPopMinAsync(SortedSetKey);
+
+ [DisplayName("SPOP")]
+ private ValueTask SPop(RespContext ctx) => ctx.SPopAsync(SetKey);
+
+ [DisplayName("MSET"), Description("10 keys")]
+ private ValueTask MSet(RespContext ctx) => ctx.MSetAsync(_pairs);
+
+ private async ValueTask LRangeInit650(RespContext ctx)
+ {
+ await ctx.DelAsync(ListKey).ConfigureAwait(false);
+ await ctx.LPushAsync(ListKey, Payload, 650);
+ if (await ctx.LLenAsync(ListKey).ConfigureAwait(false) != 650)
+ {
+ throw new InvalidOperationException();
+ }
+ }
+
+ [DisplayName("XADD")]
+ private ValueTask XAdd(RespContext ctx) =>
+ ctx.XAddAsync(StreamKey, "*", "myfield", Payload);
+
+ protected override async Task RunBasicLoopAsync(int clientId)
+ {
+ // The purpose of this is to represent a more realistic loop using natural code
+ // rather than code that is drowning in test infrastructure.
+ var client = GetClient(clientId);
+ var depth = PipelineDepth;
+ int tickCount = 0; // this is just so we don't query DateTime.
+ long previousValue = (await client.GetInt32Async(CounterKey).ConfigureAwait(false)) ?? 0,
+ currentValue = previousValue;
+ var watch = Stopwatch.StartNew();
+ long previousMillis = watch.ElapsedMilliseconds;
+
+ bool Tick()
+ {
+ var currentMillis = watch.ElapsedMilliseconds;
+ var elapsedMillis = currentMillis - previousMillis;
+ if (elapsedMillis >= 1000)
+ {
+ if (clientId == 0) // only one client needs to update the UI
+ {
+ var qty = currentValue - previousValue;
+ var seconds = elapsedMillis / 1000.0;
+ Console.WriteLine(
+ $"{qty:#,###,##0} ops in {seconds:#0.00}s, {qty / seconds:#,###,##0}/s\ttotal: {currentValue:#,###,###,##0}");
+
+ // reset for next UI update
+ previousValue = currentValue;
+ previousMillis = currentMillis;
+ }
+
+ if (currentMillis >= 20_000)
+ {
+ if (clientId == 0)
+ {
+ Console.WriteLine();
+ Console.WriteLine(
+ $"\t Overall: {currentValue:#,###,###,##0} ops in {currentMillis / 1000:#0.00}s, {currentValue / (currentMillis / 1000.0):#,###,##0}/s");
+ Console.WriteLine();
+ }
+
+ return true; // stop after some time
+ }
+ }
+
+ tickCount = 0;
+ return false;
+ }
+
+ if (depth <= 1)
+ {
+ while (true)
+ {
+ currentValue = await client.IncrAsync(CounterKey).ConfigureAwait(false);
+
+ if (++tickCount >= 1000 && Tick()) break; // only check whether to output every N iterations
+ }
+ }
+ else
+ {
+ ValueTask[] pending = new ValueTask[depth];
+ await using var batch = client.CreateBatch(depth);
+ var ctx = batch.Context;
+ while (true)
+ {
+ for (int i = 0; i < depth; i++)
+ {
+ pending[i] = ctx.IncrAsync(CounterKey);
+ }
+
+ await batch.FlushAsync().ConfigureAwait(false);
+ batch.EnsureCapacity(depth); // batches don't assume re-use
+ for (var i = 0; i < depth; i++)
+ {
+ currentValue = await pending[i].ConfigureAwait(false);
+ }
+
+ tickCount += depth;
+ if (tickCount >= 1000 && Tick()) break; // only check whether to output every N iterations
+ }
+ }
+ }
+}
+
+internal static partial class RedisCommands
+{
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary Ping(this in RespContext ctx);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary SPop(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial int SAdd(this in RespContext ctx, string key, string payload);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary Set(this in RespContext ctx, string key, byte[] payload);
+
+ [RespCommand]
+ internal static partial int LLen(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial int LPush(this in RespContext ctx, string key, byte[] payload);
+
+ [RespCommand(Formatter = LPushFormatter.Name)]
+ internal static partial int LPush(this in RespContext ctx, string key, byte[] payload, int count);
+
+ private sealed class LPushFormatter : IRespFormatter<(string Key, byte[] Payload, int Count)>
+ {
+ public const string Name = $"{nameof(LPushFormatter)}.{nameof(Instance)}";
+ private LPushFormatter() { }
+ public static readonly LPushFormatter Instance = new();
+
+ public void Format(
+ scoped ReadOnlySpan command,
+ ref RespWriter writer,
+ in (string Key, byte[] Payload, int Count) request)
+ {
+ writer.WriteCommand(command, request.Count + 1);
+ writer.WriteKey(request.Key);
+ for (int i = 0; i < request.Count; i++)
+ {
+ // duplicate for lazy bulk load
+ writer.WriteBulkString(request.Payload);
+ }
+ }
+ }
+
+ [RespCommand]
+ internal static partial int RPush(this in RespContext ctx, string key, byte[] payload);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary LPop(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary RPop(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary
+ LRange(this in RespContext ctx, string key, int start, int stop);
+
+ [RespCommand]
+ internal static partial int HSet(this in RespContext ctx, string key, string field, byte[] payload);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary Ping(this in RespContext ctx, byte[] payload);
+
+ [RespCommand]
+ internal static partial int Incr(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary Del(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary ZPopMin(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial int ZAdd(this in RespContext ctx, string key, double score, string payload);
+
+ [RespCommand("get")]
+ internal static partial int? GetInt32(this in RespContext ctx, string key);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary XAdd(
+ this in RespContext ctx,
+ string key,
+ string id,
+ string field,
+ byte[] value);
+
+ [RespCommand]
+ internal static partial RespParsers.ResponseSummary Get(this in RespContext ctx, string key);
+
+ [RespCommand(Formatter = PairsFormatter.Name)] // custom command formatter
+ internal static partial bool MSet(this in RespContext ctx, (string, byte[])[] pairs);
+
+ internal static RespParsers.ResponseSummary PingInline(this in RespContext ctx, byte[] payload)
+ => ctx.Command("ping"u8, payload, InlinePingFormatter.Instance).Wait(RespParsers.ResponseSummary.Parser);
+
+ internal static ValueTask PingInlineAsync(this in RespContext ctx, byte[] payload)
+ => ctx.Command("ping"u8, payload, InlinePingFormatter.Instance)
+ .Send(RespParsers.ResponseSummary.Parser);
+
+ private sealed class InlinePingFormatter : IRespFormatter
+ {
+ private InlinePingFormatter() { }
+ public static readonly InlinePingFormatter Instance = new();
+
+ public void Format(scoped ReadOnlySpan command, ref RespWriter writer, in byte[] request)
+ {
+ writer.WriteRaw(command);
+ writer.WriteRaw(" "u8);
+ writer.WriteRaw(request);
+ writer.WriteRaw("\r\n"u8);
+ }
+ }
+
+ private sealed class PairsFormatter : IRespFormatter<(string Key, byte[] Value)[]>
+ {
+ public const string Name = $"{nameof(PairsFormatter)}.{nameof(Instance)}";
+ public static readonly PairsFormatter Instance = new PairsFormatter();
+
+ public void Format(
+ scoped ReadOnlySpan command,
+ ref RespWriter writer,
+ in (string Key, byte[] Value)[] request)
+ {
+ writer.WriteCommand(command, 2 * request.Length);
+ foreach (var pair in request)
+ {
+ writer.WriteKey(pair.Key);
+ writer.WriteBulkString(pair.Value);
+ }
+ }
+ }
+}
+#endif
diff --git a/src/RESPite.Benchmark/OldCoreBenchmark.cs b/src/RESPite.Benchmark/OldCoreBenchmark.cs
new file mode 100644
index 000000000..6f1039330
--- /dev/null
+++ b/src/RESPite.Benchmark/OldCoreBenchmark.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Reflection;
+using StackExchange.Redis;
+
+namespace RESPite.Benchmark;
+
+public sealed class OldCoreBenchmark(string[] args) : OldCoreBenchmarkBase(args)
+{
+ private static readonly string withVersion = $"classic SE.Redis {GetLibVersion()}";
+ public override string ToString() => withVersion;
+
+ protected override IConnectionMultiplexer Create(int port)
+ {
+ var options = ConfigurationOptions.Parse($"{HostName}:{Port}");
+ if (WriteMode is { } wm && options.GetType().GetProperty(
+ "WriteMode",
+ BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic) is { CanWrite: true, CanRead: true } prop)
+ {
+ prop.SetValue(options, wm);
+ Console.WriteLine($"Set WriteMode to {prop.GetValue(options)}");
+ }
+ return ConnectionMultiplexer.Connect(options);
+ }
+
+ private static string? _libVersion;
+ internal static string GetLibVersion()
+ {
+ if (_libVersion == null)
+ {
+ var assembly = typeof(ConnectionMultiplexer).Assembly;
+ _libVersion = ((AssemblyFileVersionAttribute)Attribute.GetCustomAttribute(assembly, typeof(AssemblyFileVersionAttribute))!)?.Version
+ ?? assembly.GetName().Version!.ToString();
+ }
+ return _libVersion;
+ }
+}
diff --git a/src/RESPite.Benchmark/OldCoreBenchmarkBase.cs b/src/RESPite.Benchmark/OldCoreBenchmarkBase.cs
new file mode 100644
index 000000000..0bab0ea4c
--- /dev/null
+++ b/src/RESPite.Benchmark/OldCoreBenchmarkBase.cs
@@ -0,0 +1,299 @@
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+using System.Diagnostics;
+using System.Threading.Tasks;
+using StackExchange.Redis;
+
+namespace RESPite.Benchmark;
+
+public abstract class OldCoreBenchmarkBase : BenchmarkBase
+{
+ private readonly IConnectionMultiplexer _connectionMultiplexer;
+ private readonly IDatabase _client;
+ private readonly KeyValuePair[] _pairs;
+
+ public OldCoreBenchmarkBase(string[] args) : base(args)
+ {
+ // ReSharper disable once VirtualMemberCallInConstructor
+ _connectionMultiplexer = Create(Port);
+ _client = _connectionMultiplexer.GetDatabase();
+ _pairs = new KeyValuePair[10];
+
+ for (var i = 0; i < 10; i++)
+ {
+ _pairs[i] = new($"{"key:__rand_int__"}{i}", Payload);
+ }
+ }
+
+ protected abstract IConnectionMultiplexer Create(int port);
+
+ protected override async Task OnCleanupAsync(IDatabaseAsync client)
+ {
+ foreach (var pair in _pairs)
+ {
+ await client.KeyDeleteAsync(pair.Key);
+ }
+ }
+
+ protected override Task InitAsync(IDatabaseAsync client) => client.PingAsync();
+
+ public override void Dispose()
+ {
+ _connectionMultiplexer.Dispose();
+ }
+
+ protected override IDatabaseAsync GetClient(int index) => _client;
+ protected override Task DeleteAsync(IDatabaseAsync client, string key) => client.KeyDeleteAsync(key);
+
+ public override async Task RunAll()
+ {
+ await InitAsync().ConfigureAwait(false);
+ // await RunAsync(PingInline).ConfigureAwait(false);
+ await RunAsync(null, PingBulk).ConfigureAwait(false);
+
+ await RunAsync(GetSetKey, Set, GetName(Get)).ConfigureAwait(false);
+ await RunAsync(GetSetKey, Get).ConfigureAwait(false);
+
+ await RunAsync(CounterKey, Incr, true).ConfigureAwait(false);
+
+ await RunAsync(ListKey, LPush, GetName(LPop)).ConfigureAwait(false);
+ await RunAsync(ListKey, LPop).ConfigureAwait(false);
+
+ await RunAsync(ListKey, RPush, GetName(RPop)).ConfigureAwait(false);
+ await RunAsync(ListKey, RPop).ConfigureAwait(false);
+
+ await RunAsync(SetKey, SAdd, GetName(SPop)).ConfigureAwait(false);
+ await RunAsync(SetKey, SPop).ConfigureAwait(false);
+
+ await RunAsync(HashKey, HSet).ConfigureAwait(false);
+
+ await RunAsync(SortedSetKey, ZAdd, GetName(ZPopMin)).ConfigureAwait(false);
+ await RunAsync(SortedSetKey, ZPopMin).ConfigureAwait(false);
+
+ await RunAsync(null, MSet).ConfigureAwait(false);
+ await RunAsync(StreamKey, XAdd).ConfigureAwait(false);
+
+ // leave until last, they're slower
+ if (RunTest(GetName(LRange100)) ||
+ RunTest(GetName(LRange300)) ||
+ RunTest(GetName(LRange500)) ||
+ RunTest(GetName(LRange600)))
+ {
+ await LRangeInit650(GetClient(0)).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange100, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange300, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange500, false, 10).ConfigureAwait(false);
+ await RunAsync(ListKey, LRange600, false, 10).ConfigureAwait(false);
+ }
+
+ await CleanupAsync().ConfigureAwait(false);
+ }
+
+ protected override IDatabaseAsync CreateBatch(IDatabaseAsync client) => ((IDatabase)client).CreateBatch();
+
+ protected override ValueTask Flush(IDatabaseAsync client)
+ {
+ if (client is IBatch batch)
+ {
+ batch.Execute();
+ }
+
+ return default;
+ }
+
+ protected override async Task RunBasicLoopAsync(int clientId)
+ {
+ // The purpose of this is to represent a more realistic loop using natural code
+ // rather than code that is drowning in test infrastructure.
+ var client = (IDatabase)GetClient(clientId); // need IDatabase for CreateBatch
+ var depth = PipelineDepth;
+ int tickCount = 0; // this is just so we don't query DateTime.
+ var tmp = await client.StringGetAsync(CounterKey).ConfigureAwait(false);
+ long previousValue = tmp.IsNull ? 0 : (long)tmp, currentValue = previousValue;
+ var watch = Stopwatch.StartNew();
+ long previousMillis = watch.ElapsedMilliseconds;
+
+ bool Tick()
+ {
+ var currentMillis = watch.ElapsedMilliseconds;
+ var elapsedMillis = currentMillis - previousMillis;
+ if (elapsedMillis >= 1000)
+ {
+ if (clientId == 0) // only one client needs to update the UI
+ {
+ var qty = currentValue - previousValue;
+ var seconds = elapsedMillis / 1000.0;
+ Console.WriteLine(
+ $"{qty:#,###,##0} ops in {seconds:#0.00}s, {qty / seconds:#,###,##0}/s\ttotal: {currentValue:#,###,###,##0}");
+
+ // reset for next UI update
+ previousValue = currentValue;
+ previousMillis = currentMillis;
+ }
+
+ if (currentMillis >= 20_000)
+ {
+ if (clientId == 0)
+ {
+ Console.WriteLine();
+ Console.WriteLine(
+ $"\t Overall: {currentValue:#,###,###,##0} ops in {currentMillis / 1000:#0.00}s, {currentValue / (currentMillis / 1000.0):#,###,##0}/s");
+ Console.WriteLine();
+ }
+
+ return true; // stop after some time
+ }
+ }
+
+ tickCount = 0;
+ return false;
+ }
+
+ if (depth <= 1)
+ {
+ while (true)
+ {
+ currentValue = await client.StringIncrementAsync(CounterKey).ConfigureAwait(false);
+
+ if (++tickCount >= 1000 && Tick()) break; // only check whether to output every N iterations
+ }
+ }
+ else
+ {
+ Task[] pending = new Task[depth];
+ var batch = client.CreateBatch(depth);
+ while (true)
+ {
+ for (int i = 0; i < depth; i++)
+ {
+ pending[i] = batch.StringIncrementAsync(CounterKey);
+ }
+
+ batch.Execute();
+ for (int i = 0; i < depth; i++)
+ {
+ currentValue = await pending[i].ConfigureAwait(false);
+ }
+
+ tickCount += depth;
+ if (tickCount >= 1000 && Tick()) break; // only check whether to output every N iterations
+ }
+ }
+ }
+
+ [DisplayName("GET")]
+ private ValueTask Get(IDatabaseAsync client) => GetAndMeasureString(client);
+
+ private async ValueTask GetAndMeasureString(IDatabaseAsync client)
+ {
+ using var lease = await client.StringGetLeaseAsync(GetSetKey).ConfigureAwait(false);
+ return lease?.Length ?? -1;
+ }
+
+ [DisplayName("SET")]
+ private ValueTask Set(IDatabaseAsync client) => client.StringSetAsync(GetSetKey, Payload).AsValueTask();
+
+ [DisplayName("PING_BULK")]
+ private ValueTask PingBulk(IDatabaseAsync client) => client.PingAsync().AsValueTask();
+
+ [DisplayName("INCR")]
+ private ValueTask Incr(IDatabaseAsync client) => client.StringIncrementAsync(CounterKey).AsValueTask();
+
+ [DisplayName("HSET")]
+ private ValueTask HSet(IDatabaseAsync client) =>
+ client.HashSetAsync(HashKey, "element:__rand_int__", Payload).AsValueTask();
+
+ [DisplayName("SADD")]
+ private ValueTask SAdd(IDatabaseAsync client) =>
+ client.SetAddAsync(SetKey, "element:__rand_int__").AsValueTask();
+
+ [DisplayName("LPUSH")]
+ private ValueTask LPush(IDatabaseAsync client) => client.ListLeftPushAsync(ListKey, Payload).AsValueTask();
+
+ [DisplayName("RPUSH")]
+ private ValueTask RPush(IDatabaseAsync client) => client.ListRightPushAsync(ListKey, Payload).AsValueTask();
+
+ [DisplayName("LPOP")]
+ private ValueTask LPop(IDatabaseAsync client) => client.ListLeftPopAsync(ListKey).AsValueTask();
+
+ [DisplayName("RPOP")]
+ private ValueTask RPop(IDatabaseAsync client) => client.ListRightPopAsync(ListKey).AsValueTask();
+
+ [DisplayName("SPOP")]
+ private ValueTask SPop(IDatabaseAsync client) => client.SetPopAsync(SetKey).AsValueTask();
+
+ [DisplayName("ZADD")]
+ private ValueTask ZAdd(IDatabaseAsync client) =>
+ client.SortedSetAddAsync(SortedSetKey, "element:__rand_int__", 0).AsValueTask();
+
+ [DisplayName("ZPOPMIN")]
+ private ValueTask ZPopMin(IDatabaseAsync client) => HasSortedSetElement(client.SortedSetPopAsync(SortedSetKey));
+
+ private async ValueTask HasSortedSetElement(Task pending)
+ {
+ var result = await pending.ConfigureAwait(false);
+ return result.HasValue ? 1 : 0;
+ }
+
+ [DisplayName("MSET")]
+ private ValueTask MSet(IDatabaseAsync client) => client.StringSetAsync(_pairs).AsValueTask();
+
+ [DisplayName("XADD")]
+ private ValueTask XAdd(IDatabaseAsync client) =>
+ client.StreamAddAsync(StreamKey, "myfield", Payload).AsValueTask();
+
+ [DisplayName("LRANGE_100")]
+ private ValueTask LRange100(IDatabaseAsync client) => CountAsync(client.ListRangeAsync(ListKey, 0, 99));
+
+ [DisplayName("LRANGE_300")]
+ private ValueTask LRange300(IDatabaseAsync client) => CountAsync(client.ListRangeAsync(ListKey, 0, 299));
+
+ [DisplayName("LRANGE_500")]
+ private ValueTask LRange500(IDatabaseAsync client) => CountAsync(client.ListRangeAsync(ListKey, 0, 499));
+
+ [DisplayName("LRANGE_600")]
+ private ValueTask LRange600(IDatabaseAsync client) =>
+ CountAsync(client.ListRangeAsync(ListKey, 0, 599));
+
+ private static ValueTask CountAsync(Task task) => task.ContinueWith(
+ t => t.Result.Length, TaskContinuationOptions.ExecuteSynchronously).AsValueTask();
+
+ private async ValueTask LRangeInit650(IDatabaseAsync client)
+ {
+ var batch = CreateBatch(client);
+ _ = batch.KeyDeleteAsync(ListKey, flags: CommandFlags.FireAndForget);
+ for (int i = 0; i < 650; i++)
+ {
+ _ = batch.ListLeftPushAsync(ListKey, Payload, flags: CommandFlags.FireAndForget);
+ }
+
+ await Flush(batch).ConfigureAwait(false);
+ if (await client.ListLengthAsync(ListKey).ConfigureAwait(false) != 650)
+ {
+ throw new InvalidOperationException();
+ }
+ }
+}
+
+internal static class TaskExtensions
+{
+ public static ValueTask AsValueTask(this Task task) => new(task);
+
+ /*
+ public static ValueTask AsUntypedValueTask(this Task task) => new(task);
+ public static ValueTask AsValueTask(this Task task) => new(task);
+ */
+
+ public static ValueTask AsUntypedValueTask(this ValueTask task)
+ {
+ if (!task.IsCompleted) return Awaited(task);
+ task.GetAwaiter().GetResult();
+ return default;
+
+ static async ValueTask Awaited(ValueTask task)
+ {
+ await task.ConfigureAwait(false);
+ }
+ }
+}
diff --git a/src/RESPite.Benchmark/Program.cs b/src/RESPite.Benchmark/Program.cs
new file mode 100644
index 000000000..a0797b787
--- /dev/null
+++ b/src/RESPite.Benchmark/Program.cs
@@ -0,0 +1,109 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+namespace RESPite.Benchmark;
+
+internal static class Program
+{
+ private static async Task Main(string[] args)
+ {
+ bool basic = false;
+ try
+ {
+ List benchmarks = [];
+ foreach (var arg in args)
+ {
+ switch (arg)
+ {
+ case "--old":
+ benchmarks.Add(new OldCoreBenchmark(args));
+ break;
+#if NEWCORE
+ case "--bridge":
+ benchmarks.Add(new BridgeBenchmark(args));
+ break;
+ case "--new":
+ benchmarks.Add(new NewCoreBenchmark(args));
+ break;
+#endif
+ case "--basic":
+ basic = true;
+ break;
+ }
+ }
+
+ if (benchmarks.Count == 0)
+ {
+#if NEWCORE
+ benchmarks.Add(new NewCoreBenchmark(args));
+#else
+ benchmarks.Add(new OldCoreBenchmark(args));
+#endif
+ }
+
+#if DEBUG
+ Console.WriteLine("### Debug ###");
+#endif
+ bool isFirst = true;
+ do
+ {
+ foreach (var bench in benchmarks)
+ {
+ if (benchmarks.Count > 1 || isFirst)
+ {
+ Console.WriteLine($"### {bench} ###");
+ isFirst = false;
+ }
+
+ if (basic)
+ {
+ await bench.RunBasicLoopAsync().ConfigureAwait(false);
+ }
+ else
+ {
+ await bench.RunAll().ConfigureAwait(false);
+ }
+ }
+ }
+ // ReSharper disable once LoopVariableIsNeverChangedInsideLoop
+ while (benchmarks[0].Loop);
+
+ foreach (var bench in benchmarks)
+ {
+ bench.Dispose();
+ }
+ return 0;
+ }
+ catch (Exception ex)
+ {
+ WriteException(ex);
+ return -1;
+ }
+ }
+
+ internal static void WriteException(Exception? ex, [CallerMemberName] string operation = "")
+ {
+ Console.Error.WriteLine();
+ Console.Error.WriteLine($"### EXCEPTION: {operation}");
+ while (ex is not null)
+ {
+ Console.Error.WriteLine();
+ Console.Error.WriteLine($"{ex.GetType().Name}: {ex.Message}");
+ Console.Error.WriteLine($"\t{ex.StackTrace}");
+ var data = ex.Data;
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
+ if (data is not null)
+ {
+ foreach (var key in data.Keys)
+ {
+ Console.Error.WriteLine($"\t{key}: {data[key]}");
+ }
+ }
+
+ ex = ex.InnerException;
+ }
+ Console.Error.WriteLine();
+ }
+}
diff --git a/src/RESPite.Benchmark/RESPite.Benchmark.csproj b/src/RESPite.Benchmark/RESPite.Benchmark.csproj
new file mode 100644
index 000000000..b9047024f
--- /dev/null
+++ b/src/RESPite.Benchmark/RESPite.Benchmark.csproj
@@ -0,0 +1,44 @@
+
+
+
+ enable
+
+ Exe
+ net8.0;net10.0
+ resp-benchmark
+ true
+ command-line "RESP" benchmark client, comparable to redis-benchmark
+ True
+ readme.md
+ False
+ false
+ false
+ 2025 - $([System.DateTime]::Now.Year) Marc Gravell
+
+ 3
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/RESPite.Benchmark/RespBenchmark.md b/src/RESPite.Benchmark/RespBenchmark.md
new file mode 100644
index 000000000..b579d84de
--- /dev/null
+++ b/src/RESPite.Benchmark/RespBenchmark.md
@@ -0,0 +1,352 @@
+# Influenced by redis-benchmark, which has typical output (with the default config) as below.
+
+Keys used (by default):
+
+- `key:__rand_int__`
+- `counter:__rand_int__`
+- `mylist`
+
+====== PING_INLINE ======
+100000 requests completed in 2.45 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+98.22% <= 1 milliseconds
+99.88% <= 2 milliseconds
+99.93% <= 3 milliseconds
+99.99% <= 4 milliseconds
+100.00% <= 5 milliseconds
+100.00% <= 5 milliseconds
+40849.68 requests per second
+
+====== PING_BULK ======
+100000 requests completed in 2.45 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+97.27% <= 1 milliseconds
+99.86% <= 2 milliseconds
+99.92% <= 3 milliseconds
+99.94% <= 4 milliseconds
+99.95% <= 23 milliseconds
+99.96% <= 24 milliseconds
+99.98% <= 25 milliseconds
+100.00% <= 25 milliseconds
+40866.37 requests per second
+
+====== SET ======
+100000 requests completed in 2.46 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+96.99% <= 1 milliseconds
+99.47% <= 2 milliseconds
+99.71% <= 3 milliseconds
+99.86% <= 4 milliseconds
+99.87% <= 9 milliseconds
+99.88% <= 10 milliseconds
+99.92% <= 11 milliseconds
+99.93% <= 12 milliseconds
+99.94% <= 13 milliseconds
+99.96% <= 14 milliseconds
+99.97% <= 15 milliseconds
+99.97% <= 16 milliseconds
+100.00% <= 27 milliseconds
+40650.41 requests per second
+
+====== GET ======
+100000 requests completed in 3.00 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+90.56% <= 1 milliseconds
+98.90% <= 2 milliseconds
+99.46% <= 3 milliseconds
+99.61% <= 4 milliseconds
+99.70% <= 5 milliseconds
+99.73% <= 6 milliseconds
+99.75% <= 7 milliseconds
+99.75% <= 9 milliseconds
+99.77% <= 10 milliseconds
+99.79% <= 12 milliseconds
+99.80% <= 14 milliseconds
+99.80% <= 15 milliseconds
+99.83% <= 16 milliseconds
+99.90% <= 17 milliseconds
+99.93% <= 18 milliseconds
+99.96% <= 19 milliseconds
+99.98% <= 20 milliseconds
+99.98% <= 22 milliseconds
+99.98% <= 30 milliseconds
+99.99% <= 31 milliseconds
+100.00% <= 31 milliseconds
+33377.84 requests per second
+
+====== INCR ======
+100000 requests completed in 2.94 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+93.21% <= 1 milliseconds
+99.21% <= 2 milliseconds
+99.70% <= 3 milliseconds
+99.81% <= 4 milliseconds
+99.86% <= 5 milliseconds
+99.89% <= 6 milliseconds
+99.93% <= 7 milliseconds
+99.94% <= 8 milliseconds
+99.96% <= 11 milliseconds
+99.96% <= 12 milliseconds
+99.96% <= 13 milliseconds
+99.97% <= 14 milliseconds
+99.97% <= 24 milliseconds
+100.00% <= 24 milliseconds
+34048.35 requests per second
+
+====== LPUSH ======
+100000 requests completed in 2.98 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+92.58% <= 1 milliseconds
+99.21% <= 2 milliseconds
+99.57% <= 3 milliseconds
+99.71% <= 4 milliseconds
+99.82% <= 5 milliseconds
+99.85% <= 6 milliseconds
+99.85% <= 7 milliseconds
+99.88% <= 9 milliseconds
+99.93% <= 10 milliseconds
+99.93% <= 13 milliseconds
+99.93% <= 14 milliseconds
+99.95% <= 16 milliseconds
+99.95% <= 31 milliseconds
+99.99% <= 32 milliseconds
+100.00% <= 32 milliseconds
+33512.07 requests per second
+
+====== LPOP ======
+100000 requests completed in 2.91 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+92.81% <= 1 milliseconds
+99.33% <= 2 milliseconds
+99.89% <= 3 milliseconds
+99.94% <= 4 milliseconds
+99.96% <= 5 milliseconds
+99.97% <= 15 milliseconds
+99.98% <= 16 milliseconds
+100.00% <= 17 milliseconds
+34317.09 requests per second
+
+====== SADD ======
+100000 requests completed in 2.87 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+94.26% <= 1 milliseconds
+99.58% <= 2 milliseconds
+99.87% <= 3 milliseconds
+99.93% <= 4 milliseconds
+99.98% <= 17 milliseconds
+99.98% <= 18 milliseconds
+100.00% <= 19 milliseconds
+34855.35 requests per second
+
+====== SPOP ======
+100000 requests completed in 2.99 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+91.00% <= 1 milliseconds
+99.30% <= 2 milliseconds
+99.69% <= 3 milliseconds
+99.80% <= 4 milliseconds
+99.85% <= 5 milliseconds
+99.85% <= 8 milliseconds
+99.86% <= 9 milliseconds
+99.89% <= 10 milliseconds
+99.92% <= 13 milliseconds
+99.94% <= 14 milliseconds
+99.95% <= 16 milliseconds
+100.00% <= 16 milliseconds
+33456.00 requests per second
+
+====== LPUSH (needed to benchmark LRANGE) ======
+100000 requests completed in 2.92 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+93.25% <= 1 milliseconds
+99.45% <= 2 milliseconds
+99.75% <= 3 milliseconds
+99.86% <= 4 milliseconds
+99.89% <= 5 milliseconds
+99.91% <= 6 milliseconds
+99.93% <= 9 milliseconds
+99.95% <= 10 milliseconds
+99.96% <= 11 milliseconds
+99.97% <= 14 milliseconds
+99.98% <= 15 milliseconds
+99.99% <= 17 milliseconds
+100.00% <= 18 milliseconds
+100.00% <= 20 milliseconds
+100.00% <= 20 milliseconds
+34258.31 requests per second
+
+====== LRANGE_100 (first 100 elements) ======
+100000 requests completed in 4.33 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+35.50% <= 1 milliseconds
+98.90% <= 2 milliseconds
+99.61% <= 3 milliseconds
+99.76% <= 4 milliseconds
+99.83% <= 5 milliseconds
+99.83% <= 7 milliseconds
+99.84% <= 8 milliseconds
+99.88% <= 9 milliseconds
+99.88% <= 10 milliseconds
+99.91% <= 11 milliseconds
+99.91% <= 12 milliseconds
+99.91% <= 13 milliseconds
+99.96% <= 15 milliseconds
+99.96% <= 34 milliseconds
+99.97% <= 35 milliseconds
+100.00% <= 39 milliseconds
+100.00% <= 39 milliseconds
+23089.36 requests per second
+
+====== LRANGE_300 (first 300 elements) ======
+100000 requests completed in 7.12 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+0.01% <= 1 milliseconds
+84.00% <= 2 milliseconds
+98.64% <= 3 milliseconds
+99.44% <= 4 milliseconds
+99.65% <= 5 milliseconds
+99.70% <= 6 milliseconds
+99.72% <= 7 milliseconds
+99.75% <= 8 milliseconds
+99.77% <= 9 milliseconds
+99.81% <= 10 milliseconds
+99.85% <= 11 milliseconds
+99.87% <= 12 milliseconds
+99.89% <= 13 milliseconds
+99.90% <= 14 milliseconds
+99.92% <= 15 milliseconds
+99.96% <= 16 milliseconds
+99.97% <= 17 milliseconds
+99.99% <= 18 milliseconds
+99.99% <= 26 milliseconds
+99.99% <= 32 milliseconds
+100.00% <= 37 milliseconds
+100.00% <= 38 milliseconds
+100.00% <= 39 milliseconds
+14039.03 requests per second
+
+====== LRANGE_500 (first 450 elements) ======
+100000 requests completed in 8.32 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+0.71% <= 1 milliseconds
+49.73% <= 2 milliseconds
+96.81% <= 3 milliseconds
+99.35% <= 4 milliseconds
+99.79% <= 5 milliseconds
+99.83% <= 6 milliseconds
+99.84% <= 7 milliseconds
+99.85% <= 8 milliseconds
+99.91% <= 9 milliseconds
+99.91% <= 10 milliseconds
+99.91% <= 12 milliseconds
+99.91% <= 27 milliseconds
+99.91% <= 28 milliseconds
+99.92% <= 29 milliseconds
+99.93% <= 30 milliseconds
+99.96% <= 31 milliseconds
+99.96% <= 49 milliseconds
+99.96% <= 50 milliseconds
+99.98% <= 99 milliseconds
+99.98% <= 100 milliseconds
+100.00% <= 100 milliseconds
+12022.12 requests per second
+
+====== LRANGE_600 (first 600 elements) ======
+100000 requests completed in 10.27 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+0.15% <= 1 milliseconds
+28.15% <= 2 milliseconds
+72.35% <= 3 milliseconds
+96.20% <= 4 milliseconds
+98.96% <= 5 milliseconds
+99.68% <= 6 milliseconds
+99.80% <= 7 milliseconds
+99.85% <= 8 milliseconds
+99.87% <= 9 milliseconds
+99.88% <= 10 milliseconds
+99.88% <= 11 milliseconds
+99.88% <= 12 milliseconds
+99.89% <= 13 milliseconds
+99.89% <= 14 milliseconds
+99.89% <= 15 milliseconds
+99.90% <= 16 milliseconds
+99.91% <= 17 milliseconds
+99.91% <= 18 milliseconds
+99.91% <= 19 milliseconds
+99.92% <= 20 milliseconds
+99.93% <= 21 milliseconds
+99.95% <= 22 milliseconds
+99.95% <= 23 milliseconds
+99.96% <= 24 milliseconds
+99.97% <= 25 milliseconds
+99.97% <= 26 milliseconds
+99.98% <= 27 milliseconds
+100.00% <= 28 milliseconds
+100.00% <= 29 milliseconds
+100.00% <= 29 milliseconds
+9736.15 requests per second
+
+====== MSET (10 keys) ======
+100000 requests completed in 2.94 seconds
+50 parallel clients
+3 bytes payload
+keep alive: 1
+
+92.48% <= 1 milliseconds
+99.33% <= 2 milliseconds
+99.91% <= 3 milliseconds
+99.93% <= 4 milliseconds
+99.94% <= 6 milliseconds
+99.94% <= 11 milliseconds
+99.96% <= 12 milliseconds
+99.97% <= 13 milliseconds
+99.98% <= 14 milliseconds
+99.98% <= 17 milliseconds
+99.99% <= 18 milliseconds
+99.99% <= 19 milliseconds
+99.99% <= 25 milliseconds
+100.00% <= 30 milliseconds
+100.00% <= 30 milliseconds
+34059.95 requests per second
\ No newline at end of file
diff --git a/src/RESPite.Benchmark/readme.md b/src/RESPite.Benchmark/readme.md
new file mode 100644
index 000000000..ea8fe2105
--- /dev/null
+++ b/src/RESPite.Benchmark/readme.md
@@ -0,0 +1,55 @@
+# resp-benchmark
+
+The `resp-benchmark` tool is a command-line "RESP" benchmark client, comparable to `redis-benchmark`, and
+many of the arguments are the same. This is mostly for internal team usage, but is included here for
+reference.
+
+Example usage:
+
+``` bash
+> dotnet tool install -g RESPite.Benchmark
+
+# basic usage
+> resp-benchmark
+
+# 50 clients, pipeline to 100, multiplexed, 1M operations, only test incr, loop
+> resp-benchmark -c 50 -P 100 -n 1000000 +m -t incr -l
+
+```
+
+## Options supported from `redis-benchmark`
+
+Basic options, for parity:
+
+- `-h ` Server hostname (default 127.0.0.1)
+- `-p ` Server port (default 6379)
+- `-c ` Number of parallel connections (default 50).
+- `-n ` Total number of requests (default 100000)
+- `-P ` Pipeline requests. Default 1 (no pipeline).
+- `-l` Loop. Run the tests forever
+- `-q` Quiet. Just show query/sec values
+- `-t ` Only run the comma separated list of tests. The test names are the same as the ones produced as output.
+
+## Custom options
+
+Additional options specific to this tool:
+
+- `+m` / `-m`: enable or disable (default) multiplexing: when enabled clients share a connection, otherwise each client has a separate connection
+- `--batch` / `--queue` pipelining should using batching (default) or queueing strategy
+- `--basic` : perform basic typical IO operations rather than synthetic benchmarks
+
+
+## Internal options
+
+These exist mostly for Marc's benefit:
+
+- `-w ` Specify the internal write-mode
+- `+x` / `-x`: enable or disable (default) cancellation support (irrelevant until later v3 tranche)
+
+## Local example
+
+To build and run from source, `dotnet run` can be used with everything after `--` being args to the command:
+
+```
+dotnet run -p:TargetVer=3 -f net10.0 -c Release -- -q +m --batch -n 500000
+```
\ No newline at end of file
diff --git a/src/RESPite/Messages/RespReader.Utils.cs b/src/RESPite/Messages/RespReader.Utils.cs
index 9aca671fb..4a504ec1f 100644
--- a/src/RESPite/Messages/RespReader.Utils.cs
+++ b/src/RESPite/Messages/RespReader.Utils.cs
@@ -103,7 +103,21 @@ private readonly void ThrowProtocolFailure(string message)
internal static void ThrowEof() => throw new EndOfStreamException();
[MethodImpl(MethodImplOptions.NoInlining), DoesNotReturn]
- private static void ThrowFormatException() => throw new FormatException();
+ private readonly void ThrowFormatException(string type)
+ {
+ var msg = $"Invalid format parsing {Prefix} as {type}";
+#if DEBUG
+ if (IsScalar && ScalarLength() <= 64)
+ {
+ try { msg += $": '{ReadString()}'"; }
+ catch (Exception ex)
+ {
+ Debug.WriteLine(ex);
+ }
+ }
+#endif
+ throw new FormatException(msg);
+ }
private int RawTryReadByte()
{
diff --git a/src/RESPite/Messages/RespReader.cs b/src/RESPite/Messages/RespReader.cs
index 2c93185ea..86ff08f40 100644
--- a/src/RESPite/Messages/RespReader.cs
+++ b/src/RESPite/Messages/RespReader.cs
@@ -746,7 +746,6 @@ private readonly unsafe bool TryParseSlow(
/// The parsed value if successful.
/// true if parsing succeeded; otherwise, false.
#pragma warning disable RS0016, RS0027 // public API
- [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#if DEBUG
[Obsolete("Please prefer the function-pointer API for library-internal use.")]
@@ -758,6 +757,52 @@ public readonly bool TryParseScalar(ScalarParser parser, out T value
return TryGetSpan(out var span) ? parser(span, out value) : TryParseSlow(parser, out value);
}
+ private readonly ReadOnlySpan BufferChars(Span target, out char[]? lease)
+ {
+ byte[] byteLease = [];
+ var bytes = Buffer(ref byteLease, byteLease);
+
+ int len = RespConstants.UTF8.GetMaxCharCount(bytes.Length);
+ if (len <= target.Length)
+ {
+ lease = null;
+ }
+ else
+ {
+ target = lease = ArrayPool.Shared.Rent(len);
+ }
+ len = RespConstants.UTF8.GetChars(bytes, target);
+ return target.Slice(0, len);
+ }
+
+ ///
+ /// Tries to read the current scalar element using a parser callback.
+ ///
+ /// The type of data being parsed.
+ /// The parser callback.
+ /// The parsed value if successful.
+ /// true if parsing succeeded; otherwise, false.
+ public readonly bool TryParseScalar(ScalarParser parser, out T value)
+ {
+ // note: no benefit in a function-ptr overload, after we've dealt with decoding bytes etc
+ var buffer = BufferChars(stackalloc char[128], out var lease);
+ try
+ {
+ return parser(buffer, out value);
+ }
+ finally
+ {
+ if (lease is not null) ArrayPool.Shared.Return(lease);
+ }
+ }
+
+ ///
+ /// Tries to read the current scalar element using a parser callback.
+ ///
+ /// The type of data being parsed.
+ /// The parser callback.
+ /// The parsed value if successful.
+ /// true if parsing succeeded; otherwise, false.
[MethodImpl(MethodImplOptions.NoInlining)]
private readonly bool TryParseSlow(ScalarParser parser, out T value)
{
@@ -1682,7 +1727,7 @@ public readonly long ReadInt64()
&& Utf8Parser.TryParse(span, out value, out int bytes)
&& bytes == span.Length))
{
- ThrowFormatException();
+ ThrowFormatException(nameof(Int64));
value = 0;
}
@@ -1716,7 +1761,7 @@ public readonly int ReadInt32()
&& Utf8Parser.TryParse(span, out value, out int bytes)
&& bytes == span.Length))
{
- ThrowFormatException();
+ ThrowFormatException(nameof(Int32));
value = 0;
}
@@ -1764,7 +1809,7 @@ public readonly double ReadDouble()
return double.NegativeInfinity;
}
- ThrowFormatException();
+ ThrowFormatException(nameof(Double));
return 0;
}
@@ -1844,7 +1889,7 @@ public readonly decimal ReadDecimal()
&& Utf8Parser.TryParse(span, out value, out int bytes)
&& bytes == span.Length))
{
- ThrowFormatException();
+ ThrowFormatException(nameof(Decimal));
value = 0;
}
@@ -1895,7 +1940,7 @@ public readonly bool ReadBoolean()
{
if (!TryReadBoolean(out var value))
{
- ThrowFormatException();
+ ThrowFormatException(nameof(Boolean));
}
return value;
}
diff --git a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
index ab058de62..9acf1fc40 100644
--- a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
+++ b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
@@ -1 +1,6 @@
#nullable enable
+[SER004]RESPite.Messages.RespReader.TryParseScalar(RESPite.Messages.RespReader.ScalarParser! parser, out T value) -> bool
+[SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
diff --git a/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt b/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt
new file mode 100644
index 000000000..ab058de62
--- /dev/null
+++ b/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt
@@ -0,0 +1 @@
+#nullable enable
diff --git a/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt b/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt
new file mode 100644
index 000000000..ab058de62
--- /dev/null
+++ b/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt
@@ -0,0 +1 @@
+#nullable enable
diff --git a/src/RESPite/RESPite.csproj b/src/RESPite/RESPite.csproj
index fef03625b..6b9798b0e 100644
--- a/src/RESPite/RESPite.csproj
+++ b/src/RESPite/RESPite.csproj
@@ -46,6 +46,4 @@
-
-
diff --git a/src/RESPite/Shared/AsciiHash.Public.cs b/src/RESPite/Shared/AsciiHash.Public.cs
deleted file mode 100644
index dd31cb415..000000000
--- a/src/RESPite/Shared/AsciiHash.Public.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-namespace RESPite;
-
-// in the shared file, these are declared without accessibility modifiers
-public sealed partial class AsciiHashAttribute
-{
-}
-
-public readonly partial struct AsciiHash
-{
-}
diff --git a/src/RESPite/Shared/AsciiHash.cs b/src/RESPite/Shared/AsciiHash.cs
index 37b3c5734..f0f134872 100644
--- a/src/RESPite/Shared/AsciiHash.cs
+++ b/src/RESPite/Shared/AsciiHash.cs
@@ -1,4 +1,3 @@
-using System;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -7,8 +6,6 @@
namespace RESPite;
-#pragma warning disable SA1205 // deliberately omit accessibility - see AsciiHash.Public.cs
-
///
/// This type is intended to provide fast hashing functions for small ASCII strings, for example well-known
/// RESP literals that are usually identifiable by their length and initial bytes; it is not intended
@@ -22,7 +19,7 @@ namespace RESPite;
Inherited = false)]
[Conditional("DEBUG")] // evaporate in release
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
-sealed partial class AsciiHashAttribute(string token = "") : Attribute
+public sealed partial class AsciiHashAttribute(string token = "") : Attribute
{
///
/// The token expected when parsing data, if different from the implied value. The implied
@@ -38,7 +35,7 @@ sealed partial class AsciiHashAttribute(string token = "") : Attribute
// note: instance members are in AsciiHash.Instance.cs.
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
-readonly partial struct AsciiHash
+public readonly partial struct AsciiHash
{
///
/// In-place ASCII upper-case conversion.
@@ -85,6 +82,9 @@ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
}
+ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ => EqualsCI(second, first);
+
public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -120,6 +120,9 @@ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpa
}
}
+ public static bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ => SequenceEqualsCI(second, first);
+
public static bool EqualsCS(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -139,6 +142,14 @@ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
}
+ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ {
+ var len = first.Length;
+ if (len != second.Length) return false;
+ // for very short values, the UC hash performs CI equality
+ return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
+ }
+
public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -174,6 +185,41 @@ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpa
}
}
+ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ {
+ var len = first.Length;
+ if (len != second.Length) return false;
+
+ // OK, don't be clever (SIMD, etc); the purpose of FashHash is to compare RESP key tokens, which are
+ // typically relatively short, think 3-20 bytes. That wouldn't even touch a SIMD vector, so:
+ // just loop (the exact thing we'd need to do *anyway* in a SIMD implementation, to mop up the non-SIMD
+ // trailing bytes).
+ fixed (char* firstPtr = &MemoryMarshal.GetReference(first))
+ {
+ fixed (byte* secondPtr = &MemoryMarshal.GetReference(second))
+ {
+ const int CS_MASK = 0b0101_1111;
+ for (int i = 0; i < len; i++)
+ {
+ int x = (byte)firstPtr[i];
+ var xCI = x & CS_MASK;
+ if (xCI >= 'A' & xCI <= 'Z')
+ {
+ // alpha mismatch
+ if (xCI != (secondPtr[i] & CS_MASK)) return false;
+ }
+ else if (x != secondPtr[i])
+ {
+ // non-alpha mismatch
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+ }
+
public static void Hash(scoped ReadOnlySpan value, out long cs, out long uc)
{
cs = HashCS(value);
diff --git a/src/RESPite/Shared/FrameworkShims.Encoding.cs b/src/RESPite/Shared/FrameworkShims.Encoding.cs
index 2f2c2e89d..b5937dd17 100644
--- a/src/RESPite/Shared/FrameworkShims.Encoding.cs
+++ b/src/RESPite/Shared/FrameworkShims.Encoding.cs
@@ -1,3 +1,5 @@
+using System.Runtime.InteropServices;
+
#if !NET
// ReSharper disable once CheckNamespace
namespace System.Text
@@ -7,7 +9,7 @@ internal static class EncodingExtensions
public static unsafe int GetBytes(this Encoding encoding, ReadOnlySpan source, Span destination)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = destination)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(destination))
{
fixed (char* cPtr = source)
{
@@ -19,9 +21,9 @@ public static unsafe int GetBytes(this Encoding encoding, ReadOnlySpan sou
public static unsafe int GetChars(this Encoding encoding, ReadOnlySpan source, Span destination)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
- fixed (char* cPtr = destination)
+ fixed (char* cPtr = &MemoryMarshal.GetReference(destination))
{
return encoding.GetChars(bPtr, source.Length, cPtr, destination.Length);
}
@@ -31,7 +33,7 @@ public static unsafe int GetChars(this Encoding encoding, ReadOnlySpan sou
public static unsafe int GetCharCount(this Encoding encoding, ReadOnlySpan source)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
return encoding.GetCharCount(bPtr, source.Length);
}
@@ -40,7 +42,7 @@ public static unsafe int GetCharCount(this Encoding encoding, ReadOnlySpan
public static unsafe string GetString(this Encoding encoding, ReadOnlySpan source)
{
if (source.IsEmpty) return "";
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
return encoding.GetString(bPtr, source.Length);
}
diff --git a/src/RESPite/Shared/FrameworkShims.Sockets.cs b/src/RESPite/Shared/FrameworkShims.Sockets.cs
new file mode 100644
index 000000000..25eccd0a2
--- /dev/null
+++ b/src/RESPite/Shared/FrameworkShims.Sockets.cs
@@ -0,0 +1,163 @@
+#if !NET
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// ReSharper disable once CheckNamespace
+namespace System.Net.Sockets;
+
+internal static class SocketExtensions
+{
+ internal static async ValueTask ConnectAsync(this Socket socket, EndPoint remoteEP, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(SocketFlags.None, cancellationToken);
+ args.RemoteEndPoint = remoteEP;
+ if (!socket.ConnectAsync(args))
+ {
+ args.Complete();
+ }
+ await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ internal static async ValueTask SendAsync(this Socket socket, ReadOnlyMemory buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
+ args.SetBuffer(buffer);
+ if (!socket.SendAsync(args))
+ {
+ args.Complete();
+ }
+
+ return await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ internal static async ValueTask ReceiveAsync(this Socket socket, Memory buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
+ args.SetBuffer(buffer);
+ if (!socket.ReceiveAsync(args))
+ {
+ args.Complete();
+ }
+
+ return await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ ///
+ /// Awaitable SocketAsyncEventArgs, where awaiting the args yields either the BytesTransferred or throws the relevant socket exception,
+ /// plus support for cancellation via .
+ ///
+ private sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion, IDisposable
+ {
+ public new void Dispose()
+ {
+ cancelRegistration.Dispose();
+ base.Dispose();
+ }
+
+ private CancellationTokenRegistration cancelRegistration;
+ public SocketAwaitableEventArgs(SocketFlags socketFlags, CancellationToken cancellationToken)
+ {
+ SocketFlags = socketFlags;
+ if (cancellationToken.CanBeCanceled)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ cancelRegistration = cancellationToken.Register(Timeout);
+ }
+ }
+
+ public void SetBuffer(ReadOnlyMemory buffer)
+ {
+ if (!MemoryMarshal.TryGetArray(buffer, out var segment)) ThrowNotSupported();
+ SetBuffer(segment.Array ?? [], segment.Offset, segment.Count);
+
+ [DoesNotReturn]
+ static void ThrowNotSupported() => throw new NotSupportedException("Only array-backed buffers are supported");
+ }
+
+ public void Timeout() => Abort(SocketError.TimedOut);
+
+ public void Abort(SocketError error)
+ {
+ _forcedError = error;
+ OnCompleted(this);
+ }
+
+ private volatile SocketError _forcedError; // Success = 0, no field init required
+
+ // ReSharper disable once InconsistentNaming
+ private static readonly Action _callbackCompleted = () => { };
+
+ private Action? _callback;
+
+ public SocketAwaitableEventArgs GetAwaiter() => this;
+
+ ///
+ /// Indicates whether the current operation is complete; used as part of "await".
+ ///
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ ///
+ /// Gets the result of the async operation is complete; used as part of "await".
+ ///
+ public int GetResult()
+ {
+ Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
+
+ _callback = null;
+
+ var error = _forcedError;
+ if (error is SocketError.Success) error = SocketError;
+ if (error is not SocketError.Success) ThrowSocketException(error);
+
+ return BytesTransferred;
+
+ static void ThrowSocketException(SocketError e) => throw new SocketException((int)e);
+ }
+
+ ///
+ /// Schedules a continuation for this operation; used as part of "await".
+ ///
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(Volatile.Read(ref _callback), _callbackCompleted)
+ || ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ // this is the rare "kinda already complete" case; push to worker to prevent possible stack dive,
+ // but prefer the custom scheduler when possible
+ RunOnThreadPool(continuation);
+ }
+ }
+
+ ///
+ /// Schedules a continuation for this operation; used as part of "await".
+ ///
+ public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
+
+ ///
+ /// Marks the operation as complete - this should be invoked whenever a SocketAsyncEventArgs operation returns false.
+ ///
+ public void Complete() => OnCompleted(this);
+
+ private static void RunOnThreadPool(Action action)
+ => ThreadPool.QueueUserWorkItem(static state => ((Action)state).Invoke(), action);
+
+ ///
+ /// Invoked automatically when an operation completes asynchronously.
+ ///
+ protected override void OnCompleted(SocketAsyncEventArgs e)
+ {
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+ if (continuation is not null)
+ {
+ // continue on the thread-pool
+ RunOnThreadPool(continuation);
+ }
+ }
+ }
+}
+#endif
diff --git a/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs b/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
index 003708e6a..0daa00377 100644
--- a/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
+++ b/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
@@ -1,4 +1,5 @@
using System;
+using RESPite.Messages;
namespace StackExchange.Redis;
@@ -11,18 +12,14 @@ public readonly struct LatencyHistoryEntry
private sealed class Processor : ArrayResultProcessor
{
- protected override bool TryParse(in RawResult raw, out LatencyHistoryEntry parsed)
+ protected override bool TryParse(ref RespReader reader, out LatencyHistoryEntry parsed)
{
- if (raw.Resp2TypeArray == ResultType.Array)
+ if (reader.IsAggregate
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration))
{
- var items = raw.GetItems();
- if (items.Length >= 2
- && items[0].TryGetInt64(out var timestamp)
- && items[1].TryGetInt64(out var duration))
- {
- parsed = new LatencyHistoryEntry(timestamp, duration);
- return true;
- }
+ parsed = new LatencyHistoryEntry(timestamp, duration);
+ return true;
}
parsed = default;
return false;
diff --git a/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs b/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
index d1bc70e42..c1f012495 100644
--- a/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
+++ b/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
@@ -1,4 +1,5 @@
using System;
+using RESPite.Messages;
namespace StackExchange.Redis;
@@ -11,17 +12,17 @@ public readonly struct LatencyLatestEntry
private sealed class Processor : ArrayResultProcessor
{
- protected override bool TryParse(in RawResult raw, out LatencyLatestEntry parsed)
+ protected override bool TryParse(ref RespReader reader, out LatencyLatestEntry parsed)
{
- if (raw.Resp2TypeArray == ResultType.Array)
+ if (reader.IsAggregate && reader.TryMoveNext() && reader.IsScalar)
{
- var items = raw.GetItems();
- if (items.Length >= 4
- && items[1].TryGetInt64(out var timestamp)
- && items[2].TryGetInt64(out var duration)
- && items[3].TryGetInt64(out var maxDuration))
+ var eventName = reader.ReadString()!;
+
+ if (reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var maxDuration))
{
- parsed = new LatencyLatestEntry(items[0].GetString()!, timestamp, duration, maxDuration);
+ parsed = new LatencyLatestEntry(eventName, timestamp, duration, maxDuration);
return true;
}
}
diff --git a/src/StackExchange.Redis/ArrayGrepRequest.cs b/src/StackExchange.Redis/ArrayGrepRequest.cs
index 642f6a060..5fd7b0dd2 100644
--- a/src/StackExchange.Redis/ArrayGrepRequest.cs
+++ b/src/StackExchange.Redis/ArrayGrepRequest.cs
@@ -253,7 +253,7 @@ internal Message CreateMessage(int db, RedisKey key, CommandFlags flags)
public abstract class Predicate
{
internal virtual int ArgCount => 2;
- internal abstract void WriteTo(PhysicalConnection physical);
+ internal abstract void WriteTo(in MessageWriter writer);
private protected Predicate() { }
///
@@ -288,10 +288,10 @@ private sealed class ExactPredicate(RedisValue value) : Predicate
{
public override string ToString() => $"EXACT '{value}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$5\r\nEXACT\r\n"u8);
- physical.WriteBulkString(value);
+ writer.WriteRaw("$5\r\nEXACT\r\n"u8);
+ writer.WriteBulkString(value);
}
}
@@ -299,10 +299,10 @@ private sealed class MatchPredicate(string pattern) : Predicate
{
public override string ToString() => $"MATCH '{pattern}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$5\r\nMATCH\r\n"u8);
- physical.WriteBulkString(pattern);
+ writer.WriteRaw("$5\r\nMATCH\r\n"u8);
+ writer.WriteBulkString(pattern);
}
}
@@ -310,10 +310,10 @@ private sealed class GlobPredicate(string pattern) : Predicate
{
public override string ToString() => $"GLOB '{pattern}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$4\r\nGLOB\r\n"u8);
- physical.WriteBulkString(pattern);
+ writer.WriteRaw("$4\r\nGLOB\r\n"u8);
+ writer.WriteBulkString(pattern);
}
}
@@ -321,10 +321,10 @@ private sealed class RegexPredicate(string re) : Predicate
{
public override string ToString() => $"RE '{re}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$2\r\nRE\r\n"u8);
- physical.WriteBulkString(re);
+ writer.WriteRaw("$2\r\nRE\r\n"u8);
+ writer.WriteBulkString(re);
}
}
}
@@ -352,45 +352,46 @@ public override int ArgCount
}
}
- private static void AddIndex(PhysicalConnection physical, RedisArrayIndex? index, ReadOnlySpan fallback)
+ private static void AddIndex(in MessageWriter writer, RedisArrayIndex? index, ReadOnlySpan fallback)
{
if (index.HasValue)
{
- physical.WriteBulkString(index.GetValueOrDefault().Value);
+ writer.WriteBulkString(index.GetValueOrDefault().Value);
}
else
{
- physical.WriteRaw(fallback);
+ writer.WriteRaw(fallback);
}
}
- protected override void WriteImpl(PhysicalConnection physical)
+
+ protected override void WriteImpl(in MessageWriter writer)
{
- physical.WriteHeader(Command, ArgCount);
- physical.WriteBulkString(key);
+ writer.WriteHeader(Command, ArgCount);
+ writer.WriteBulkString(key);
if (request.IsReversed)
{
- AddIndex(physical, request.End, "$1\r\n+\r\n"u8);
- AddIndex(physical, request.Start, "$1\r\n-\r\n"u8);
+ AddIndex(writer, request.End, "$1\r\n+\r\n"u8);
+ AddIndex(writer, request.Start, "$1\r\n-\r\n"u8);
}
else
{
- AddIndex(physical, request.Start, "$1\r\n-\r\n"u8);
- AddIndex(physical, request.End, "$1\r\n+\r\n"u8);
+ AddIndex(writer, request.Start, "$1\r\n-\r\n"u8);
+ AddIndex(writer, request.End, "$1\r\n+\r\n"u8);
}
var pCount = request.Count;
for (int i = 0; i < pCount; i++)
{
- request[i].WriteTo(physical);
+ request[i].WriteTo(in writer);
}
- if (request.IsIntersection) physical.WriteRaw("$3\r\nAND\r\n"u8);
- if (request.IsCaseInsensitive) physical.WriteRaw("$6\r\nNOCASE\r\n"u8);
- if (request.IncludeValues) physical.WriteRaw("$10\r\nWITHVALUES\r\n"u8);
+ if (request.IsIntersection) writer.WriteRaw("$3\r\nAND\r\n"u8);
+ if (request.IsCaseInsensitive) writer.WriteRaw("$6\r\nNOCASE\r\n"u8);
+ if (request.IncludeValues) writer.WriteRaw("$10\r\nWITHVALUES\r\n"u8);
var limit = request.Limit;
if (limit.HasValue)
{
- physical.WriteRaw("$5\r\nLIMIT\r\n"u8);
- physical.WriteBulkString(limit.GetValueOrDefault());
+ writer.WriteRaw("$5\r\nLIMIT\r\n"u8);
+ writer.WriteBulkString(limit.GetValueOrDefault());
}
}
}
diff --git a/src/StackExchange.Redis/AwaitableMutex.cs b/src/StackExchange.Redis/AwaitableMutex.cs
new file mode 100644
index 000000000..9bc054804
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+// abstract over the concept of awaiable mutex between platforms
+internal readonly partial struct AwaitableMutex : IDisposable
+{
+ // ReSharper disable once ConvertToAutoProperty
+ public partial int TimeoutMilliseconds { get; }
+ public static AwaitableMutex Create(int timeoutMilliseconds) => new(timeoutMilliseconds);
+
+ // define the API first here (think .h file)
+ private partial AwaitableMutex(int timeoutMilliseconds);
+ public partial void Dispose();
+ public partial bool IsAvailable { get; }
+ public partial bool TryTakeInstant();
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken = default);
+ public partial bool TryTakeSync();
+ public partial void Release();
+}
diff --git a/src/StackExchange.Redis/AwaitableMutex.net.cs b/src/StackExchange.Redis/AwaitableMutex.net.cs
new file mode 100644
index 000000000..501b6af36
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.net.cs
@@ -0,0 +1,36 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+#if NET
+namespace StackExchange.Redis;
+
+internal partial struct AwaitableMutex
+{
+ private readonly int _timeoutMilliseconds;
+
+ // note: this does not guarantee "fairness", but that's OK for our use-case - we mostly just want
+ // a sync+async awaitable mutex, which this does; the .NET Framework version has a hand-written
+ // implementation (see .netfx.cx for reasons), which *is* fair, but we'd rather not pay that overhead
+ // here. Good-enough-is.
+ private readonly SemaphoreSlim _mutex;
+
+ private partial AwaitableMutex(int timeoutMilliseconds)
+ {
+ _timeoutMilliseconds = timeoutMilliseconds;
+ _mutex = new(1, 1);
+ }
+
+ public partial void Dispose() => _mutex?.Dispose();
+ public partial bool IsAvailable => _mutex.CurrentCount != 0;
+ public partial int TimeoutMilliseconds => _timeoutMilliseconds;
+
+ public partial bool TryTakeInstant() => _mutex.Wait(0);
+
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ => new(_mutex.WaitAsync(_timeoutMilliseconds, cancellationToken));
+
+ public partial bool TryTakeSync() => _mutex.Wait(_timeoutMilliseconds);
+
+ public partial void Release() => _mutex.Release();
+}
+#endif
diff --git a/src/StackExchange.Redis/AwaitableMutex.netfx.cs b/src/StackExchange.Redis/AwaitableMutex.netfx.cs
new file mode 100644
index 000000000..7267f8043
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.netfx.cs
@@ -0,0 +1,389 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+#if !NET
+namespace StackExchange.Redis;
+
+/*
+Compensating for the fact that netfx SemaphoreSlim is kinda janky (https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html).
+
+This uses a simple queue of sync/async callers, and assumes a reasonable caller (the original MutexSlim is more defensive, as
+a general purpose public API).
+*/
+
+internal partial struct AwaitableMutex
+{
+ private readonly State _state;
+
+ private partial AwaitableMutex(int timeoutMilliseconds)
+ {
+ _state = new(timeoutMilliseconds);
+ }
+
+ public partial void Dispose() => _state?.Dispose();
+
+ public partial bool IsAvailable => _state.IsAvailable;
+ public partial int TimeoutMilliseconds => _state.TimeoutMilliseconds;
+
+ public partial bool TryTakeInstant() => _state.TryTakeInstant();
+
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ => _state.TryTakeAsync(cancellationToken);
+
+ public partial bool TryTakeSync() => _state.TryTakeSync();
+
+ public partial void Release() => _state.Release();
+
+ private sealed class State : IDisposable
+ {
+ private readonly Queue _queue = new();
+ private bool _isHeld;
+ private bool _isDisposed;
+
+ public State(int timeoutMilliseconds)
+ {
+ if (timeoutMilliseconds < Timeout.Infinite) ThrowOutOfRangeException();
+ TimeoutMilliseconds = timeoutMilliseconds;
+
+ static void ThrowOutOfRangeException() => throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
+ }
+
+ public int TimeoutMilliseconds { get; }
+
+ public bool IsAvailable
+ {
+ get
+ {
+ lock (_queue)
+ {
+ return !_isDisposed && !_isHeld && _queue.Count == 0;
+ }
+ }
+ }
+
+ public bool TryTakeInstant()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (!lockTaken) return false;
+
+ return TryTakeInsideLock();
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+ }
+
+ public bool TryTakeSync()
+ {
+ bool lockTaken = false;
+ try
+ {
+ // try to acquire uncontested lock - that way we can avoid checking the time
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (lockTaken && TryTakeInsideLock()) return true;
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return TryTakeSyncSlow();
+ }
+
+ public ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ {
+ bool lockTaken = false;
+ try
+ {
+ // try to acquire uncontested lock - that way we can avoid allocating the pending caller
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (lockTaken)
+ {
+ if (_isDisposed) return DisposedAsync();
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return CanceledAsync(cancellationToken);
+ }
+
+ if (TryTakeInsideLockCore()) return new ValueTask(true);
+ }
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return TryTakeAsyncSlow(cancellationToken);
+ }
+
+ private ValueTask TryTakeAsyncSlow(CancellationToken cancellationToken)
+ {
+ lock (_queue)
+ {
+ if (_isDisposed) return DisposedAsync();
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return CanceledAsync(cancellationToken);
+ }
+
+ if (TryTakeInsideLockCore()) return new ValueTask(true);
+ if (TimeoutMilliseconds == 0) return new ValueTask(false);
+ if (cancellationToken.IsCancellationRequested) return CanceledAsync(cancellationToken);
+
+ var pending = new AsyncPendingCaller(TimeoutMilliseconds, cancellationToken);
+ _queue.Enqueue(pending);
+ return new ValueTask(pending.Task);
+ }
+ }
+
+ public void Release()
+ {
+ lock (_queue)
+ {
+ ThrowIfDisposed();
+ if (!_isHeld) ThrowNotHeld();
+
+ while (_queue.Count != 0)
+ {
+ if (_queue.Dequeue().TryGrant()) return;
+ }
+
+ _isHeld = false;
+ }
+
+ static void ThrowNotHeld() => throw new SemaphoreFullException();
+ }
+
+ private bool TryTakeInsideLock()
+ {
+ ThrowIfDisposed();
+ return TryTakeInsideLockCore();
+ }
+
+ private bool TryTakeInsideLockCore()
+ {
+ if (_isHeld || _queue.Count != 0) return false;
+ _isHeld = true;
+ return true;
+ }
+
+ private bool TryTakeSyncSlow()
+ {
+ if (TimeoutMilliseconds == 0) return false;
+
+ var start = GetTime();
+ SyncPendingCaller? pending = null;
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_queue, TimeoutMilliseconds, ref lockTaken);
+ if (!lockTaken) return false;
+ if (TryTakeInsideLock()) return true;
+
+ var remaining = GetRemainingTimeout(start, TimeoutMilliseconds);
+ if (remaining == 0) return false;
+
+ pending = new SyncPendingCaller(start, TimeoutMilliseconds);
+ _queue.Enqueue(pending);
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return pending!.Wait();
+ }
+
+ public void Dispose()
+ {
+ if (_isDisposed) return;
+ _isDisposed = true;
+
+ lock (_queue)
+ {
+ _isHeld = false;
+ while (_queue.Count != 0)
+ {
+ _queue.Dequeue().Abort();
+ }
+ }
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed) ThrowDisposed();
+ }
+
+ private static void ThrowDisposed() => throw new ObjectDisposedException(nameof(AwaitableMutex));
+
+ private static ValueTask DisposedAsync()
+ => new(Task.FromException(new ObjectDisposedException(nameof(AwaitableMutex))));
+
+ private static ValueTask CanceledAsync(CancellationToken cancellationToken)
+ => new(Task.FromCanceled(cancellationToken));
+
+ private static uint GetTime() => (uint)Environment.TickCount;
+
+ private static int GetRemainingTimeout(uint startTime, int originalTimeoutMilliseconds)
+ {
+ if (originalTimeoutMilliseconds == Timeout.Infinite) return Timeout.Infinite;
+
+ var elapsedMilliseconds = GetTime() - startTime;
+ if (elapsedMilliseconds > int.MaxValue) return 0;
+
+ var remaining = originalTimeoutMilliseconds - (int)elapsedMilliseconds;
+ return remaining <= 0 ? 0 : remaining;
+ }
+
+ private interface IPendingCaller
+ {
+ bool TryGrant();
+ void Abort();
+ }
+
+ private sealed class SyncPendingCaller : IPendingCaller
+ {
+ private readonly uint _start;
+ private readonly int _timeoutMilliseconds;
+ private bool _isComplete;
+ private bool _wasGranted;
+ private bool _wasAborted;
+
+ public SyncPendingCaller(uint start, int timeoutMilliseconds)
+ {
+ _start = start;
+ _timeoutMilliseconds = timeoutMilliseconds;
+ }
+
+ public bool Wait()
+ {
+ lock (this)
+ {
+ while (!_isComplete)
+ {
+ var remaining = GetRemainingTimeout(_start, _timeoutMilliseconds);
+ if (remaining == 0)
+ {
+ _isComplete = true;
+ return false;
+ }
+
+ if (remaining == Timeout.Infinite)
+ {
+ Monitor.Wait(this);
+ }
+ else
+ {
+ Monitor.Wait(this, remaining);
+ }
+ }
+
+ if (_wasAborted) ThrowDisposed();
+ return _wasGranted;
+ }
+ }
+
+ public bool TryGrant()
+ {
+ lock (this)
+ {
+ if (_isComplete) return false;
+ _wasGranted = true;
+ _isComplete = true;
+ Monitor.Pulse(this);
+ return true;
+ }
+ }
+
+ public void Abort()
+ {
+ lock (this)
+ {
+ if (_isComplete) return;
+ _wasAborted = true;
+ _isComplete = true;
+ Monitor.Pulse(this);
+ }
+ }
+ }
+
+ private sealed class AsyncPendingCaller : TaskCompletionSource, IPendingCaller
+ {
+ private static readonly TimerCallback s_onTimeout = state => ((AsyncPendingCaller)state!).TryComplete(CompletionState.TimedOut);
+ private static readonly Action