Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.21.0" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
<PackageVersion Include="Azure.Storage.Queues" Version="12.23.0" />
<PackageVersion Include="Confluent.Kafka" Version="2.14.0" />
<PackageVersion Include="Dapr.Client" Version="1.17.9" />
<PackageVersion Include="FluentValidation" Version="12.1.1" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="5.3.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.0" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="10.0.6" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.6" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.6" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.6" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.6" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.6" />
Expand Down
1 change: 1 addition & 0 deletions Pulse.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<Project Path="src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj" />
<Project Path="src/NetEvolve.Pulse.SourceGeneration/NetEvolve.Pulse.SourceGeneration.csproj" />
<Project Path="src/NetEvolve.Pulse.AzureServiceBus/NetEvolve.Pulse.AzureServiceBus.csproj" />
<Project Path="src/NetEvolve.Pulse.AzureQueueStorage/NetEvolve.Pulse.AzureQueueStorage.csproj" />
<Project Path="src/NetEvolve.Pulse.SQLite/NetEvolve.Pulse.SQLite.csproj" />
<Project Path="src/NetEvolve.Pulse.Kafka/NetEvolve.Pulse.Kafka.csproj" />
<Project Path="src/NetEvolve.Pulse.Redis/NetEvolve.Pulse.Redis.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
namespace NetEvolve.Pulse;

using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Outbox;

/// <summary>
/// Extension methods for registering the Azure Queue Storage transport with the Pulse mediator.
/// </summary>
public static class AzureQueueStorageExtensions
{
/// <summary>
/// Configures the outbox to deliver messages to Azure Queue Storage using a connection string.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="connectionString">The Azure Storage connection string.</param>
/// <param name="configureOptions">Optional action to further configure <see cref="AzureQueueStorageTransportOptions"/>.</param>
/// <returns>The configurator for chaining.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="configurator"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentException">Thrown when <paramref name="connectionString"/> is <see langword="null"/> or whitespace.</exception>
public static IMediatorBuilder UseAzureQueueStorageTransport(
this IMediatorBuilder configurator,
string connectionString,
Action<AzureQueueStorageTransportOptions>? configureOptions = null
)
{
ArgumentNullException.ThrowIfNull(configurator);
ArgumentException.ThrowIfNullOrWhiteSpace(connectionString);

return configurator.UseAzureQueueStorageTransportCore(
options => options.ConnectionString = connectionString,
configureOptions
);
}

/// <summary>
/// Configures the outbox to deliver messages to Azure Queue Storage using a service URI and managed identity.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="queueServiceUri">The Azure Queue Storage service URI (e.g., <c>https://account.queue.core.windows.net</c>).</param>
/// <param name="configureOptions">Optional action to further configure <see cref="AzureQueueStorageTransportOptions"/>.</param>
/// <returns>The configurator for chaining.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="configurator"/> or <paramref name="queueServiceUri"/> is <see langword="null"/>.</exception>
public static IMediatorBuilder UseAzureQueueStorageTransport(
this IMediatorBuilder configurator,
Uri queueServiceUri,
Action<AzureQueueStorageTransportOptions>? configureOptions = null
)
{
ArgumentNullException.ThrowIfNull(configurator);
ArgumentNullException.ThrowIfNull(queueServiceUri);

return configurator.UseAzureQueueStorageTransportCore(
options => options.QueueServiceUri = queueServiceUri,
configureOptions
);
}

private static IMediatorBuilder UseAzureQueueStorageTransportCore(
this IMediatorBuilder configurator,
Action<AzureQueueStorageTransportOptions> coreOptions,
Action<AzureQueueStorageTransportOptions>? configureOptions
)
{
var services = configurator.Services;

_ = services.AddOptions<AzureQueueStorageTransportOptions>().Configure(coreOptions);

if (configureOptions is not null)
{
_ = services.Configure(configureOptions);
}

services.TryAddEnumerable(
ServiceDescriptor.Singleton<
IValidateOptions<AzureQueueStorageTransportOptions>,
AzureQueueStorageTransportOptionsValidator
>()
);

_ = services.AddOptions<AzureQueueStorageTransportOptions>().ValidateOnStart();

var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport));
if (existing is not null)
{
_ = services.Remove(existing);
}

_ = services.AddSingleton<IMessageTransport, AzureQueueStorageMessageTransport>();

return configurator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(_ProjectTargetFrameworks)</TargetFrameworks>
<Description>Azure Queue Storage transport for the Pulse CQRS mediator outbox. Delivers outbox messages to Azure Queue Storage queues using the Azure SDK, supporting single and batched sends with managed identity or connection string authentication.</Description>
<PackageTags>$(PackageTags);azure;queuestorage;</PackageTags>
<RootNamespace>NetEvolve.Pulse</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Storage.Queues" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\NetEvolve.Pulse.Extensibility\NetEvolve.Pulse.Extensibility.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
namespace NetEvolve.Pulse.Outbox;

using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Text.Json;
using Azure.Identity;
using Azure.Storage.Queues;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;

/// <summary>
/// Azure Queue Storage transport implementation for the outbox processor.
/// </summary>
/// <remarks>
/// The <see cref="QueueClient"/> is lazily initialized on first use. If
/// <see cref="AzureQueueStorageTransportOptions.CreateQueueIfNotExists"/> is <see langword="true"/>,
/// the queue is created automatically during initialization.
/// Messages are JSON-serialized and Base64-encoded before sending.
/// Raw message size must not exceed 48 KB (the Azure Queue Storage Base64-encoded limit of 64 KB).
/// </remarks>
public sealed class AzureQueueStorageMessageTransport : IMessageTransport, IDisposable
{
internal const int MaxMessageSizeInBytes = 48 * 1024; // Raw 48 KB limit (64 KB after Base64 encoding)

private readonly AzureQueueStorageTransportOptions _options;
private readonly QueueClient? _queueClientOverride;
private readonly SemaphoreSlim _initLock = new SemaphoreSlim(1, 1);
private QueueClient? _queueClient;

/// <summary>
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class.
/// </summary>
/// <param name="options">The configured transport options.</param>
internal AzureQueueStorageMessageTransport(IOptions<AzureQueueStorageTransportOptions> options)
{
ArgumentNullException.ThrowIfNull(options);
_options = options.Value;
}

/// <summary>
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class
/// with a pre-built queue client. Used for testing.
/// </summary>
/// <param name="options">The configured transport options.</param>
/// <param name="queueClient">A pre-built queue client to use instead of creating one from options.</param>
internal AzureQueueStorageMessageTransport(
IOptions<AzureQueueStorageTransportOptions> options,
QueueClient queueClient
)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(queueClient);
_options = options.Value;
_queueClientOverride = queueClient;
}

/// <inheritdoc />
public void Dispose() => _initLock.Dispose();

/// <inheritdoc />
public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);

var json = SerializeMessage(message);
var rawBytes = Encoding.UTF8.GetBytes(json);

if (rawBytes.Length > MaxMessageSizeInBytes)
{
throw new InvalidOperationException(
$"Message size {rawBytes.Length} bytes exceeds the Azure Queue Storage limit of {MaxMessageSizeInBytes} bytes (48 KB raw / 64 KB Base64-encoded)."
);
}

var base64 = Convert.ToBase64String(rawBytes);
var queueClient = await GetQueueClientAsync(cancellationToken).ConfigureAwait(false);
_ = await queueClient
.SendMessageAsync(
base64,
visibilityTimeout: _options.MessageVisibilityTimeout,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);
}

/// <inheritdoc />
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(messages);

foreach (var message in messages)
{
await SendAsync(message, cancellationToken).ConfigureAwait(false);
}
}

private static string SerializeMessage(OutboxMessage message) =>
JsonSerializer.Serialize(
new
{
id = message.Id,
eventType = message.EventType.ToOutboxEventTypeName(),
payload = message.Payload,
correlationId = message.CorrelationId,
causationId = message.CausationId,
createdAt = message.CreatedAt,
}
);

[SuppressMessage(
"Maintainability",
"CA1508:Avoid dead conditional code",
Justification = "Double-checked locking: the inner null check guards against concurrent initialization after the semaphore is acquired."
)]
private async Task<QueueClient> GetQueueClientAsync(CancellationToken cancellationToken)
{
if (_queueClientOverride is not null)
{
return _queueClientOverride;
}

if (_queueClient is not null)
{
return _queueClient;
}

await _initLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Re-check after acquiring the lock (double-checked locking pattern).
if (_queueClient is not null)
{
return _queueClient;
}

QueueClient client;

if (!string.IsNullOrWhiteSpace(_options.ConnectionString))
{
client = new QueueClient(_options.ConnectionString, _options.QueueName);
}
else
{
var queueUri = new Uri(_options.QueueServiceUri!, _options.QueueName);
client = new QueueClient(queueUri, new DefaultAzureCredential());
}

if (_options.CreateQueueIfNotExists)
{
_ = await client.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

_queueClient = client;
return _queueClient;
}
finally
{
_initLock.Release();

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)

Check failure on line 158 in src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs

View workflow job for this annotation

GitHub Actions / Build & Tests / Run Tests / Testing .NET solution

Expression value is never used (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0058)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace NetEvolve.Pulse.Outbox;

/// <summary>
/// Configuration options for <see cref="AzureQueueStorageMessageTransport"/>.
/// </summary>
public sealed class AzureQueueStorageTransportOptions
{
/// <summary>
/// Gets or sets the connection string used to authenticate against Azure Queue Storage.
/// </summary>
/// <remarks>
/// When not provided, <see cref="QueueServiceUri"/> must be specified to use managed identity
/// through <c>DefaultAzureCredential</c>.
/// </remarks>
public string? ConnectionString { get; set; }

/// <summary>
/// Gets or sets the URI of the Azure Queue Storage service endpoint (e.g., <c>https://account.queue.core.windows.net</c>).
/// </summary>
/// <remarks>Required when <see cref="ConnectionString"/> is not supplied.</remarks>
public Uri? QueueServiceUri { get; set; }

/// <summary>
/// Gets or sets the name of the queue to which outbox messages are sent.
/// </summary>
/// <remarks>Defaults to <c>pulse-outbox</c>.</remarks>
public string QueueName { get; set; } = "pulse-outbox";

/// <summary>
/// Gets or sets the visibility timeout applied to each message sent to the queue.
/// </summary>
/// <remarks>
/// When <see langword="null"/>, the queue's default visibility timeout is used.
/// </remarks>
public TimeSpan? MessageVisibilityTimeout { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the queue should be created automatically if it does not exist.
/// </summary>
/// <remarks>Defaults to <see langword="true"/>.</remarks>
public bool CreateQueueIfNotExists { get; set; } = true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NetEvolve.Pulse.Outbox;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

/// <summary>
/// Binds the <c>Pulse:Transports:AzureQueueStorage</c> configuration section
/// to <see cref="AzureQueueStorageTransportOptions"/>.
/// </summary>
internal sealed class AzureQueueStorageTransportOptionsConfiguration
: IConfigureOptions<AzureQueueStorageTransportOptions>
{
private const string ConfigurationSection = "Pulse:Transports:AzureQueueStorage";

private readonly IConfiguration _configuration;

/// <summary>
/// Initializes a new instance of the <see cref="AzureQueueStorageTransportOptionsConfiguration"/> class.
/// </summary>
/// <param name="configuration">The configuration root.</param>
public AzureQueueStorageTransportOptionsConfiguration(IConfiguration configuration)
{
ArgumentNullException.ThrowIfNull(configuration);
_configuration = configuration;
}

/// <inheritdoc />
public void Configure(AzureQueueStorageTransportOptions options) =>
_configuration.GetSection(ConfigurationSection).Bind(options);
}
Loading
Loading