Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions Tharga.Cache.Redis.Tests/RedisResiliencePolicyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using FluentAssertions;
using Polly;
using Polly.CircuitBreaker;
using Xunit;

namespace Tharga.Cache.Redis.Tests;

public class RedisResiliencePolicyTests
{
[Fact]
public async Task Circuit_opens_after_threshold_and_then_fails_fast_without_invoking_backend()
{
//Arrange — no retry so the breaker trips deterministically after exactly `threshold` failures.
var options = new RedisCacheOptions
{
RetryCount = 0,
CircuitBreakerFailureThreshold = 2,
CircuitBreakerDuration = TimeSpan.FromMinutes(1)
};
var policy = RedisResiliencePolicy.Create(options, null);

var invocations = 0;
Func<Task> failing = async () =>
{
invocations++;
await Task.Yield();
throw new TimeoutException("simulated outage");
};

//Act — trip the breaker.
for (var i = 0; i < options.CircuitBreakerFailureThreshold; i++)
{
await policy.Invoking(p => p.ExecuteAsync(failing)).Should().ThrowAsync<TimeoutException>();
}

var invocationsBeforeOpen = invocations;

//Assert — the circuit is now open: the next call fails fast without touching the backend.
await policy.Invoking(p => p.ExecuteAsync(failing)).Should().ThrowAsync<BrokenCircuitException>();
invocations.Should().Be(invocationsBeforeOpen, "an open circuit must short-circuit instead of invoking the backend");
}

[Fact]
public async Task Successful_call_passes_through()
{
//Arrange
var policy = RedisResiliencePolicy.Create(new RedisCacheOptions { RetryCount = 0, CircuitBreakerFailureThreshold = 5 }, null);

//Act
var result = await policy.ExecuteAsync(() => Task.FromResult(42));

//Assert
result.Should().Be(42);
}

[Fact]
public async Task Retries_transient_failures_then_succeeds()
{
//Arrange
var options = new RedisCacheOptions { RetryCount = 2, CircuitBreakerFailureThreshold = 5 };
var policy = RedisResiliencePolicy.Create(options, null);

var attempts = 0;

//Act
var result = await policy.ExecuteAsync(async () =>
{
attempts++;
await Task.Yield();
if (attempts < 3) throw new TimeoutException("transient");
return "ok";
});

//Assert
result.Should().Be("ok");
attempts.Should().Be(3, "the call should be retried twice before succeeding on the third attempt");
}
}
22 changes: 22 additions & 0 deletions Tharga.Cache.Redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@ Any type registered with `IRedis` is persisted to Redis. Unregistered types defa
- **Survives restarts** — cached data is not lost on deploy
- **High throughput** with low latency

## Resilience (fail-open)

If Redis becomes unreachable, the cache **fails open**: a backend read error is treated as a miss so the
call falls through to your source loader, and a backend write error is swallowed. A cache outage therefore
never faults the caller as long as the source of truth is healthy. This is on by default and can be turned
off with `CacheOptions.FailOpenOnBackendError = false` (which restores the previous throwing behavior).

A Polly **circuit breaker** sits in front of the Redis connection so a sustained outage short-circuits
immediately instead of paying retry latency on every call (which is what prevents thread-pool starvation).
The breaker recovers automatically once Redis is healthy again.

```csharp
o.AddRedisDBOptions(r =>
{
r.ConnectionStringLoader = sp => "localhost:6379";
r.RetryCount = 3; // transient-error retries before a call fails (default 3)
r.CircuitBreakerFailureThreshold = 5; // consecutive failures before the circuit opens (default 5)
r.CircuitBreakerDuration = TimeSpan.FromSeconds(30); // how long it stays open before probing again (default 30s)
r.CommandTimeout = TimeSpan.FromSeconds(1); // optional shorter per-command timeout for fast fail-open
});
```

## Documentation

Full documentation, configuration options, and samples are available on the [GitHub project page](https://github.com/Tharga/Cache).
Expand Down
56 changes: 34 additions & 22 deletions Tharga.Cache.Redis/Redis.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using System.Diagnostics;
using System.Net.Sockets;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Retry;
using Polly.CircuitBreaker;
using StackExchange.Redis;
using Tharga.Cache.Core;

Expand All @@ -17,7 +16,7 @@ internal class Redis : IRedis
private readonly IHostEnvironment _hostEnvironment;
private readonly RedisCacheOptions _options;
private readonly ILogger<Redis> _logger;
private readonly AsyncRetryPolicy _retryPolicy;
private readonly IAsyncPolicy _resiliencePolicy;
private ConnectionMultiplexer _redisConnection;

public Redis(IServiceProvider serviceProvider, IHostEnvironment hostEnvironment, IManagedCacheMonitor cacheMonitor, IOptions<RedisCacheOptions> options, ILogger<Redis> logger)
Expand All @@ -26,14 +25,7 @@ public Redis(IServiceProvider serviceProvider, IHostEnvironment hostEnvironment,
_hostEnvironment = hostEnvironment;
_options = options.Value;
_logger = logger;
_retryPolicy = Policy
.Handle<RedisException>()
.Or<TimeoutException>()
.Or<SocketException>()
.WaitAndRetryAsync(
3,
attempt => TimeSpan.FromMilliseconds(200 * Math.Pow(2, attempt)),
(exception, timeSpan, retryCount, _) => { _logger.LogWarning($"Retry {retryCount} after {timeSpan.TotalMilliseconds}ms due to: {exception.Message}"); });
_resiliencePolicy = RedisResiliencePolicy.Create(_options, logger);

cacheMonitor.RequestEvictEvent += async (_, e) =>
{
Expand All @@ -50,7 +42,7 @@ public Redis(IServiceProvider serviceProvider, IHostEnvironment hostEnvironment,

public async Task<CacheItem<T>> GetAsync<T>(Key key)
{
return await _retryPolicy.ExecuteAsync(async () =>
return await _resiliencePolicy.ExecuteAsync(async () =>
{
var redisConnection = await GetConnection();
if (redisConnection.Multiplexer == null) return null;
Expand All @@ -74,7 +66,7 @@ public async Task<CacheItem<T>> GetAsync<T>(Key key)

public async Task SetAsync<T>(Key key, CacheItem<T> cacheItem, bool staleWhileRevalidate)
{
await _retryPolicy.ExecuteAsync(async () =>
await _resiliencePolicy.ExecuteAsync(async () =>
{
var item = JsonSerializer.Serialize(cacheItem);
if (Debugger.IsAttached)
Expand All @@ -101,17 +93,17 @@ await _retryPolicy.ExecuteAsync(async () =>

public async Task<bool> BuyMoreTime<T>(Key key)
{
return await _retryPolicy.ExecuteAsync(async () => await SetUpdateTime<T>(key, DateTime.UtcNow));
return await _resiliencePolicy.ExecuteAsync(async () => await SetUpdateTime<T>(key, DateTime.UtcNow));
}

public async Task<bool> Invalidate<T>(Key key)
{
return await _retryPolicy.ExecuteAsync(async () => await SetUpdateTime<T>(key, DateTime.MinValue));
return await _resiliencePolicy.ExecuteAsync(async () => await SetUpdateTime<T>(key, DateTime.MinValue));
}

public async Task<bool> DropAsync<T>(Key key)
{
return await _retryPolicy.ExecuteAsync(async () =>
return await _resiliencePolicy.ExecuteAsync(async () =>
{
var redisConnection = await GetConnection();
if (redisConnection.Multiplexer == null) return false;
Expand All @@ -124,13 +116,20 @@ public async Task<bool> DropAsync<T>(Key key)

public async Task<(bool Success, string Message)> CanConnectAsync()
{
return await _retryPolicy.ExecuteAsync(async () =>
try
{
var redisConnection = await GetConnection();
if (redisConnection.Multiplexer == null) return (false, redisConnection.Message);
return await _resiliencePolicy.ExecuteAsync(async () =>
{
var redisConnection = await GetConnection();
if (redisConnection.Multiplexer == null) return (false, redisConnection.Message);

return (redisConnection.Multiplexer.IsConnected, redisConnection.Message);
});
return (redisConnection.Multiplexer.IsConnected, redisConnection.Message);
});
}
catch (BrokenCircuitException e)
{
return (false, $"Redis circuit is open: {e.Message}");
}
}

private async Task<bool> SetUpdateTime<T>(Key key, DateTime updateTime)
Expand Down Expand Up @@ -174,7 +173,20 @@ private async Task<bool> SetUpdateTime<T>(Key key, DateTime updateTime)

try
{
_redisConnection = await ConnectionMultiplexer.ConnectAsync(connectionString);
if (_options.CommandTimeout is { } commandTimeout)
{
var config = ConfigurationOptions.Parse(connectionString);
var milliseconds = (int)commandTimeout.TotalMilliseconds;
config.AsyncTimeout = milliseconds;
config.SyncTimeout = milliseconds;
config.ConnectTimeout = milliseconds;
_redisConnection = await ConnectionMultiplexer.ConnectAsync(config);
}
else
{
_redisConnection = await ConnectionMultiplexer.ConnectAsync(connectionString);
}

return (_redisConnection, "Connected to Redis.");
}
catch (Exception e)
Expand Down
31 changes: 29 additions & 2 deletions Tharga.Cache.Redis/RedisCacheOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
namespace Tharga.Cache.Redis;
namespace Tharga.Cache.Redis;

public record RedisCacheOptions
{
public Func<IServiceProvider, string> ConnectionStringLoader;
}

/// <summary>
/// Number of retry attempts on a transient Redis error before a call is considered failed. Default 3.
/// Set to 0 to disable retries (each call is attempted once).
/// </summary>
public int RetryCount { get; set; } = 3;

/// <summary>
/// Number of consecutive failed calls before the circuit opens and subsequent calls fail fast
/// (throwing <see cref="Polly.CircuitBreaker.BrokenCircuitException"/>) instead of paying retry latency.
/// Combined with <see cref="Tharga.Cache.CacheOptions.FailOpenOnBackendError"/>, an open circuit means the
/// cache falls straight through to the source loader. Default 5.
/// </summary>
public int CircuitBreakerFailureThreshold { get; set; } = 5;

/// <summary>
/// How long the circuit stays open before it transitions to half-open and probes the backend with a
/// single trial call. Default 30 seconds.
/// </summary>
public TimeSpan CircuitBreakerDuration { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Optional per-command timeout applied to the StackExchange.Redis connection
/// (Async / Sync / Connect timeout). A shorter timeout makes the fail-open path fast even before the
/// circuit breaker opens. When null, the connection-string / library defaults apply.
/// </summary>
public TimeSpan? CommandTimeout { get; set; }
}
42 changes: 42 additions & 0 deletions Tharga.Cache.Redis/RedisResiliencePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using Polly;
using StackExchange.Redis;

namespace Tharga.Cache.Redis;

/// <summary>
/// Builds the resilience pipeline used by the Redis persist backend: a retry policy wrapped by a circuit breaker.
/// The breaker is the outer policy, so once it is open calls fail fast (<see cref="Polly.CircuitBreaker.BrokenCircuitException"/>)
/// without paying any retry latency — which is what prevents a sustained outage from starving the thread pool.
/// </summary>
internal static class RedisResiliencePolicy
{
internal static IAsyncPolicy Create(RedisCacheOptions options, ILogger logger)
{
var retryCount = Math.Max(0, options.RetryCount);
var retryPolicy = Policy
.Handle<RedisException>()
.Or<TimeoutException>()
.Or<SocketException>()
.WaitAndRetryAsync(
retryCount,
attempt => TimeSpan.FromMilliseconds(200 * Math.Pow(2, attempt)),
(exception, timeSpan, retryCount, _) => logger?.LogWarning("Redis retry {RetryCount} after {Delay}ms due to: {Message}", retryCount, timeSpan.TotalMilliseconds, exception.Message));

var threshold = Math.Max(1, options.CircuitBreakerFailureThreshold);
var circuitBreakerPolicy = Policy
.Handle<RedisException>()
.Or<TimeoutException>()
.Or<SocketException>()
.CircuitBreakerAsync(
threshold,
options.CircuitBreakerDuration,
onBreak: (exception, breakDelay) => logger?.LogWarning("Redis circuit opened for {Delay}ms after {Threshold} consecutive failures: {Message}", breakDelay.TotalMilliseconds, threshold, exception.Message),
onReset: () => logger?.LogInformation("Redis circuit reset; calls are flowing to the backend again."),
onHalfOpen: () => logger?.LogInformation("Redis circuit half-open; probing the backend with a trial call."));

// Breaker (outer) wraps retry (inner): when the breaker is open, the retry never runs.
return Policy.WrapAsync(circuitBreakerPolicy, retryPolicy);
}
}
Loading
Loading