Skip to content

Commit

Permalink
dh: Add tokenBucket retry handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rmandvikar committed Mar 3, 2024
1 parent 1a82a40 commit 720da57
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private bool CanRetry(
&& !string.IsNullOrWhiteSpace(retryAfterValue);

#if DEBUG
Console.WriteLine($"retryAfterValue: {retryAfterValue}");
//Console.WriteLine($"retryAfterValue: {retryAfterValue}");
#endif

// retry on 503, 429 only on valid retry-after value
Expand Down Expand Up @@ -167,8 +167,8 @@ private bool CanRetry(
}

#if DEBUG
Console.WriteLine($"sleepDurationWithJitter: {sleepDurationWithJitter}");
Console.WriteLine($"retry: {retry}");
//Console.WriteLine($"sleepDurationWithJitter: {sleepDurationWithJitter}");
//Console.WriteLine($"retry: {retry}");
#endif

return retry;
Expand Down
82 changes: 82 additions & 0 deletions src/rm.DelegatingHandlers/TokenBucketRetryHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace rm.DelegatingHandlers;

/// <summary>
/// TODO
/// </summary>
public class TokenBucketRetryHandler : DelegatingHandler
{
private readonly ITokenBucketRetryHandlerSettings tokenBucketRetryHandlerSettings;

private long callsCount;
private long retryCallsCount;

/// <inheritdoc cref="TokenBucketRetryHandler" />
public TokenBucketRetryHandler(
ITokenBucketRetryHandlerSettings tokenBucketRetryHandlerSettings)
{
this.tokenBucketRetryHandlerSettings = tokenBucketRetryHandlerSettings
?? throw new ArgumentNullException(nameof(tokenBucketRetryHandlerSettings));
}

protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
var calls = Interlocked.Increment(ref callsCount);
var retryAttempt = (int)request.Properties[RequestProperties.PollyRetryAttempt];
double percentage = 0;
if (retryAttempt >= 1)
{
var retryCalls = Interlocked.Increment(ref retryCallsCount);
if (calls > 0
//&& calls > tokenBucketRetryHandlerSettings.MinimumVolume
&& (percentage = retryCalls / (double)calls) > tokenBucketRetryHandlerSettings.Percentage)
{
throw new TokenBucketRetryException(
$"percentage (threshold): {tokenBucketRetryHandlerSettings.Percentage}, but was percentage: {percentage}");
}
}
#if DEBUG
Console.WriteLine($"percentage (threshold): {tokenBucketRetryHandlerSettings.Percentage}, but was percentage: {percentage}");
#endif

var response = await base.SendAsync(request, cancellationToken)
.ConfigureAwait(false);

if (retryAttempt >= 1)
{
Interlocked.Decrement(ref retryCallsCount);
}
Interlocked.Decrement(ref callsCount);

return response;
}
}

public interface ITokenBucketRetryHandlerSettings
{
double Percentage { get; }
double MinimumVolume { get; }
}

public record class TokenBucketRetryHandlerSettings : ITokenBucketRetryHandlerSettings
{
public double Percentage { get; init; }
public double MinimumVolume { get; init; }
}

[Serializable]
public class TokenBucketRetryException : Exception
{
public TokenBucketRetryException() { }
public TokenBucketRetryException(string message) : base(message) { }
public TokenBucketRetryException(string message, Exception inner) : base(message, inner) { }
protected TokenBucketRetryException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context) : base(info, context) { }
}
180 changes: 180 additions & 0 deletions tests/rm.DelegatingHandlersTest/TokenBucketRetryHandlerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
using System.Net;
using System.Net.Http;
using AutoFixture;
using AutoFixture.AutoMoq;
using Moq;
using NUnit.Framework;
using rm.Clock;
using rm.DelegatingHandlers;
using rm.Random2;

namespace rm.DelegatingHandlersTest;

[TestFixture]
public class TokenBucketRetryHandlerTests
{
private static readonly Random rng = RandomFactory.GetThreadStaticRandom();

[Test]
public void Throws_TokenBucketRetryException()
{
var fixture = new Fixture().Customize(new AutoMoqCustomization());

var content = fixture.Create<string>();
var shortCircuitingResponseHandler = new ShortCircuitingResponseHandler(
new ShortCircuitingResponseHandlerSettings
{
StatusCode = (HttpStatusCode)500,
Content = content,
});
var tokenBucketRetryHandler = new TokenBucketRetryHandler(
new TokenBucketRetryHandlerSettings
{
Percentage = 0.10d,
});
var clockMock = fixture.Freeze<Mock<ISystemClock>>();
clockMock.Setup(x => x.UtcNow).Returns(DateTimeOffsetValues.Chernobyl);
var retryHandler = new ExponentialBackoffWithJitterRetryHandler(
new RetrySettings
{
RetryCount = 2,
RetryDelayInMilliseconds = 0,
},
clockMock.Object);

using var invoker = HttpMessageInvokerFactory.Create(
retryHandler, tokenBucketRetryHandler, shortCircuitingResponseHandler);

using var requestMessage = fixture.Create<HttpRequestMessage>();
var ex = Assert.ThrowsAsync<TokenBucketRetryException>(async () =>
{
using var _ = await invoker.SendAsync(requestMessage, CancellationToken.None);
});
Console.WriteLine(ex!.Message);
}

[Test]
public async Task Does_Not_Throw_TokenBucketRetryException()
{
var fixture = new Fixture().Customize(new AutoMoqCustomization());

var content = fixture.Create<string>();
var shortCircuitingResponseHandler = new ShortCircuitingResponseHandler(
new ShortCircuitingResponseHandlerSettings
{
StatusCode = (HttpStatusCode)200,
Content = content,
});
var tokenBucketRetryHandler = new TokenBucketRetryHandler(
new TokenBucketRetryHandlerSettings
{
Percentage = 0.10d,
});
var clockMock = fixture.Freeze<Mock<ISystemClock>>();
clockMock.Setup(x => x.UtcNow).Returns(DateTimeOffsetValues.Chernobyl);
var retryHandler = new ExponentialBackoffWithJitterRetryHandler(
new RetrySettings
{
RetryCount = 2,
RetryDelayInMilliseconds = 0,
},
clockMock.Object);

using var invoker = HttpMessageInvokerFactory.Create(
retryHandler, tokenBucketRetryHandler, shortCircuitingResponseHandler);

using var requestMessage = fixture.Create<HttpRequestMessage>();
using var _ = await invoker.SendAsync(requestMessage, CancellationToken.None);
}

[Explicit]
[Test]
public async Task Does_Not_Throw_TokenBucketRetryException_Iterations()
{
var fixture = new Fixture().Customize(new AutoMoqCustomization());

var content = fixture.Create<string>();
var shortCircuitingResponseHandler = new ShortCircuitingResponseHandler(
new ShortCircuitingResponseHandlerSettings
{
StatusCode = (HttpStatusCode)200,
Content = content,
});
var tokenBucketRetryHandler = new TokenBucketRetryHandler(
new TokenBucketRetryHandlerSettings
{
Percentage = 0.05d,
});
var clockMock = fixture.Freeze<Mock<ISystemClock>>();
clockMock.Setup(x => x.UtcNow).Returns(DateTimeOffsetValues.Chernobyl);
var retryHandler = new ExponentialBackoffWithJitterRetryHandler(
new RetrySettings
{
RetryCount = 2,
RetryDelayInMilliseconds = 0,
},
clockMock.Object);

using var invoker = HttpMessageInvokerFactory.Create(
retryHandler, tokenBucketRetryHandler, shortCircuitingResponseHandler);

const int iterations = 1_000;
for (int i = 0; i < iterations; i++)
{
using var requestMessage = fixture.Create<HttpRequestMessage>();
using var _ = await invoker.SendAsync(requestMessage, CancellationToken.None);
}
}

[Explicit]
[Test]
public async Task Does_Not_Throw_TokenBucketRetryException_Probability_Iterations()
{
var fixture = new Fixture().Customize(new AutoMoqCustomization());

var shortCircuitingResponseHandler = new ShortCircuitingResponseHandler(
new ShortCircuitingResponseHandlerSettings
{
StatusCode = (HttpStatusCode)200,
Content = fixture.Create<string>(),
});
var shortCircuitingResponseWithProbabilityHandler = new ShortCircuitingResponseWithProbabilityHandler(
new ShortCircuitingResponseWithProbabilityHandlerSettings
{
ProbabilityPercentage = 0.1d,
StatusCode = (HttpStatusCode)500,
Content = fixture.Create<string>(),
},
rng);
var tokenBucketRetryHandler = new TokenBucketRetryHandler(
new TokenBucketRetryHandlerSettings
{
Percentage = 0.10d,
});
var clockMock = fixture.Freeze<Mock<ISystemClock>>();
clockMock.Setup(x => x.UtcNow).Returns(DateTimeOffsetValues.Chernobyl);
var retryHandler = new ExponentialBackoffWithJitterRetryHandler(
new RetrySettings
{
RetryCount = 2,
RetryDelayInMilliseconds = 0,
},
clockMock.Object);

using var invoker = HttpMessageInvokerFactory.Create(
retryHandler, tokenBucketRetryHandler, shortCircuitingResponseWithProbabilityHandler, shortCircuitingResponseHandler);

const int iterations = 1_000;
const int batchSize = 100;
for (int i = 0; i < iterations; i += batchSize)
{
var tasks = new List<Task>(batchSize);
for (int b = 0; b < batchSize; b++)
{
using var requestMessage = fixture.Create<HttpRequestMessage>();
tasks.Add(invoker.SendAsync(requestMessage, CancellationToken.None));
}
await Task.WhenAll(tasks);
}
}
}

0 comments on commit 720da57

Please sign in to comment.