diff --git a/Directory.Packages.props b/Directory.Packages.props index ffa41d21..4d3b5761 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -20,12 +20,15 @@ + + + diff --git a/Pulse.slnx b/Pulse.slnx index 107b8b61..7092cdff 100644 --- a/Pulse.slnx +++ b/Pulse.slnx @@ -37,6 +37,7 @@ + diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/AzureQueueStorageExtensions.cs b/src/NetEvolve.Pulse.AzureQueueStorage/AzureQueueStorageExtensions.cs new file mode 100644 index 00000000..898e3810 --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/AzureQueueStorageExtensions.cs @@ -0,0 +1,97 @@ +namespace NetEvolve.Pulse; + +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Outbox; + +/// +/// Extension methods for registering the Azure Queue Storage transport with the Pulse mediator. +/// +public static class AzureQueueStorageExtensions +{ + /// + /// Configures the outbox to deliver messages to Azure Queue Storage using a connection string. + /// + /// The mediator configurator. + /// The Azure Storage connection string. + /// Optional action to further configure . + /// The configurator for chaining. + /// Thrown when is . + /// Thrown when is or whitespace. + public static IMediatorBuilder UseAzureQueueStorageTransport( + this IMediatorBuilder configurator, + string connectionString, + Action? configureOptions = null + ) + { + ArgumentNullException.ThrowIfNull(configurator); + ArgumentException.ThrowIfNullOrWhiteSpace(connectionString); + + return configurator.UseAzureQueueStorageTransportCore( + options => options.ConnectionString = connectionString, + configureOptions + ); + } + + /// + /// Configures the outbox to deliver messages to Azure Queue Storage using a service URI and managed identity. + /// + /// The mediator configurator. + /// The Azure Queue Storage service URI (e.g., https://account.queue.core.windows.net). + /// Optional action to further configure . + /// The configurator for chaining. + /// Thrown when or is . + public static IMediatorBuilder UseAzureQueueStorageTransport( + this IMediatorBuilder configurator, + Uri queueServiceUri, + Action? configureOptions = null + ) + { + ArgumentNullException.ThrowIfNull(configurator); + ArgumentNullException.ThrowIfNull(queueServiceUri); + + return configurator.UseAzureQueueStorageTransportCore( + options => options.QueueServiceUri = queueServiceUri, + configureOptions + ); + } + + private static IMediatorBuilder UseAzureQueueStorageTransportCore( + this IMediatorBuilder configurator, + Action coreOptions, + Action? configureOptions + ) + { + var services = configurator.Services; + + _ = services.AddOptions().Configure(coreOptions); + + if (configureOptions is not null) + { + _ = services.Configure(configureOptions); + } + + services.TryAddEnumerable( + ServiceDescriptor.Singleton< + IValidateOptions, + AzureQueueStorageTransportOptionsValidator + >() + ); + + _ = services.AddOptions().ValidateOnStart(); + + var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport)); + if (existing is not null) + { + _ = services.Remove(existing); + } + + _ = services.AddSingleton(); + + return configurator; + } +} diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/NetEvolve.Pulse.AzureQueueStorage.csproj b/src/NetEvolve.Pulse.AzureQueueStorage/NetEvolve.Pulse.AzureQueueStorage.csproj new file mode 100644 index 00000000..14c04f3f --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/NetEvolve.Pulse.AzureQueueStorage.csproj @@ -0,0 +1,21 @@ + + + $(_ProjectTargetFrameworks) + Azure Queue Storage transport for the Pulse CQRS mediator outbox. Delivers outbox messages to Azure Queue Storage queues using the Azure SDK, supporting single and batched sends with managed identity or connection string authentication. + $(PackageTags);azure;queuestorage; + NetEvolve.Pulse + + + + + + + + + + + + + + + diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs new file mode 100644 index 00000000..6bb653c7 --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageMessageTransport.cs @@ -0,0 +1,161 @@ +namespace NetEvolve.Pulse.Outbox; + +using System.Diagnostics.CodeAnalysis; +using System.Text; +using System.Text.Json; +using Azure.Identity; +using Azure.Storage.Queues; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility.Outbox; + +/// +/// Azure Queue Storage transport implementation for the outbox processor. +/// +/// +/// The is lazily initialized on first use. If +/// is , +/// the queue is created automatically during initialization. +/// Messages are JSON-serialized and Base64-encoded before sending. +/// Raw message size must not exceed 48 KB (the Azure Queue Storage Base64-encoded limit of 64 KB). +/// +public sealed class AzureQueueStorageMessageTransport : IMessageTransport, IDisposable +{ + internal const int MaxMessageSizeInBytes = 48 * 1024; // Raw 48 KB limit (64 KB after Base64 encoding) + + private readonly AzureQueueStorageTransportOptions _options; + private readonly QueueClient? _queueClientOverride; + private readonly SemaphoreSlim _initLock = new SemaphoreSlim(1, 1); + private QueueClient? _queueClient; + + /// + /// Initializes a new instance of the class. + /// + /// The configured transport options. + internal AzureQueueStorageMessageTransport(IOptions options) + { + ArgumentNullException.ThrowIfNull(options); + _options = options.Value; + } + + /// + /// Initializes a new instance of the class + /// with a pre-built queue client. Used for testing. + /// + /// The configured transport options. + /// A pre-built queue client to use instead of creating one from options. + internal AzureQueueStorageMessageTransport( + IOptions options, + QueueClient queueClient + ) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(queueClient); + _options = options.Value; + _queueClientOverride = queueClient; + } + + /// + public void Dispose() => _initLock.Dispose(); + + /// + public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(message); + + var json = SerializeMessage(message); + var rawBytes = Encoding.UTF8.GetBytes(json); + + if (rawBytes.Length > MaxMessageSizeInBytes) + { + throw new InvalidOperationException( + $"Message size {rawBytes.Length} bytes exceeds the Azure Queue Storage limit of {MaxMessageSizeInBytes} bytes (48 KB raw / 64 KB Base64-encoded)." + ); + } + + var base64 = Convert.ToBase64String(rawBytes); + var queueClient = await GetQueueClientAsync(cancellationToken).ConfigureAwait(false); + _ = await queueClient + .SendMessageAsync( + base64, + visibilityTimeout: _options.MessageVisibilityTimeout, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + } + + /// + public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(messages); + + foreach (var message in messages) + { + await SendAsync(message, cancellationToken).ConfigureAwait(false); + } + } + + private static string SerializeMessage(OutboxMessage message) => + JsonSerializer.Serialize( + new + { + id = message.Id, + eventType = message.EventType.ToOutboxEventTypeName(), + payload = message.Payload, + correlationId = message.CorrelationId, + causationId = message.CausationId, + createdAt = message.CreatedAt, + } + ); + + [SuppressMessage( + "Maintainability", + "CA1508:Avoid dead conditional code", + Justification = "Double-checked locking: the inner null check guards against concurrent initialization after the semaphore is acquired." + )] + private async Task GetQueueClientAsync(CancellationToken cancellationToken) + { + if (_queueClientOverride is not null) + { + return _queueClientOverride; + } + + if (_queueClient is not null) + { + return _queueClient; + } + + await _initLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + // Re-check after acquiring the lock (double-checked locking pattern). + if (_queueClient is not null) + { + return _queueClient; + } + + QueueClient client; + + if (!string.IsNullOrWhiteSpace(_options.ConnectionString)) + { + client = new QueueClient(_options.ConnectionString, _options.QueueName); + } + else + { + var queueUri = new Uri(_options.QueueServiceUri!, _options.QueueName); + client = new QueueClient(queueUri, new DefaultAzureCredential()); + } + + if (_options.CreateQueueIfNotExists) + { + _ = await client.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + } + + _queueClient = client; + return _queueClient; + } + finally + { + _initLock.Release(); + } + } +} diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptions.cs b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptions.cs new file mode 100644 index 00000000..08d2045d --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptions.cs @@ -0,0 +1,42 @@ +namespace NetEvolve.Pulse.Outbox; + +/// +/// Configuration options for . +/// +public sealed class AzureQueueStorageTransportOptions +{ + /// + /// Gets or sets the connection string used to authenticate against Azure Queue Storage. + /// + /// + /// When not provided, must be specified to use managed identity + /// through DefaultAzureCredential. + /// + public string? ConnectionString { get; set; } + + /// + /// Gets or sets the URI of the Azure Queue Storage service endpoint (e.g., https://account.queue.core.windows.net). + /// + /// Required when is not supplied. + public Uri? QueueServiceUri { get; set; } + + /// + /// Gets or sets the name of the queue to which outbox messages are sent. + /// + /// Defaults to pulse-outbox. + public string QueueName { get; set; } = "pulse-outbox"; + + /// + /// Gets or sets the visibility timeout applied to each message sent to the queue. + /// + /// + /// When , the queue's default visibility timeout is used. + /// + public TimeSpan? MessageVisibilityTimeout { get; set; } + + /// + /// Gets or sets a value indicating whether the queue should be created automatically if it does not exist. + /// + /// Defaults to . + public bool CreateQueueIfNotExists { get; set; } = true; +} diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsConfiguration.cs b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsConfiguration.cs new file mode 100644 index 00000000..247a6d7a --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsConfiguration.cs @@ -0,0 +1,30 @@ +namespace NetEvolve.Pulse.Outbox; + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; + +/// +/// Binds the Pulse:Transports:AzureQueueStorage configuration section +/// to . +/// +internal sealed class AzureQueueStorageTransportOptionsConfiguration + : IConfigureOptions +{ + private const string ConfigurationSection = "Pulse:Transports:AzureQueueStorage"; + + private readonly IConfiguration _configuration; + + /// + /// Initializes a new instance of the class. + /// + /// The configuration root. + public AzureQueueStorageTransportOptionsConfiguration(IConfiguration configuration) + { + ArgumentNullException.ThrowIfNull(configuration); + _configuration = configuration; + } + + /// + public void Configure(AzureQueueStorageTransportOptions options) => + _configuration.GetSection(ConfigurationSection).Bind(options); +} diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsValidator.cs b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsValidator.cs new file mode 100644 index 00000000..381b3faa --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/Outbox/AzureQueueStorageTransportOptionsValidator.cs @@ -0,0 +1,29 @@ +namespace NetEvolve.Pulse.Outbox; + +using Microsoft.Extensions.Options; + +/// +/// Validates at application startup. +/// +internal sealed class AzureQueueStorageTransportOptionsValidator : IValidateOptions +{ + /// + public ValidateOptionsResult Validate(string? name, AzureQueueStorageTransportOptions options) + { + if (string.IsNullOrWhiteSpace(options.ConnectionString) && options.QueueServiceUri is null) + { + return ValidateOptionsResult.Fail( + $"Either {nameof(AzureQueueStorageTransportOptions.ConnectionString)} or {nameof(AzureQueueStorageTransportOptions.QueueServiceUri)} must be provided." + ); + } + + if (string.IsNullOrWhiteSpace(options.QueueName)) + { + return ValidateOptionsResult.Fail( + $"{nameof(AzureQueueStorageTransportOptions.QueueName)} must not be empty." + ); + } + + return ValidateOptionsResult.Success; + } +} diff --git a/src/NetEvolve.Pulse.AzureQueueStorage/README.md b/src/NetEvolve.Pulse.AzureQueueStorage/README.md new file mode 100644 index 00000000..b1ddafcf --- /dev/null +++ b/src/NetEvolve.Pulse.AzureQueueStorage/README.md @@ -0,0 +1,84 @@ +# NetEvolve.Pulse.AzureQueueStorage + +[![NuGet Version](https://img.shields.io/nuget/v/NetEvolve.Pulse.AzureQueueStorage.svg)](https://www.nuget.org/packages/NetEvolve.Pulse.AzureQueueStorage/) +[![NuGet Downloads](https://img.shields.io/nuget/dt/NetEvolve.Pulse.AzureQueueStorage.svg)](https://www.nuget.org/packages/NetEvolve.Pulse.AzureQueueStorage/) +[![License](https://img.shields.io/github/license/dailydevops/pulse.svg)](https://github.com/dailydevops/pulse/blob/main/LICENSE) + +Azure Queue Storage transport for the Pulse outbox pattern. A cost-effective alternative to Azure Service Bus, available in every Azure Storage account with a 48 KB raw message size limit. + +## Features + +- **Connection Flexibility**: Use a connection string or `DefaultAzureCredential` with a service URI. +- **Automatic Queue Creation**: Optionally creates the target queue on first use via `CreateQueueIfNotExists`. +- **Base64 Encoding**: Messages are JSON-serialized and Base64-encoded before sending, matching Azure Queue Storage requirements. +- **Size Guard**: Throws `InvalidOperationException` when a raw message exceeds 48 KB. +- **Sequential Batch Delivery**: `SendBatchAsync` iterates messages sequentially. +- **Dependency Injection**: Single call `UseAzureQueueStorageTransport` wires the transport. + +## Installation + +### .NET CLI + +```bash +dotnet add package NetEvolve.Pulse.AzureQueueStorage +``` + +### PackageReference + +```xml + +``` + +## Quick Start + +### Connection String + +```csharp +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Pulse; + +var services = new ServiceCollection(); + +services.AddPulse(config => config.UseAzureQueueStorageTransport( + connectionString: builder.Configuration["Storage:ConnectionString"]! +)); +``` + +### Managed Identity + +```csharp +services.AddPulse(config => config.UseAzureQueueStorageTransport( + queueServiceUri: new Uri("https://myaccount.queue.core.windows.net") +)); +``` + +## Configuration + +| Option | Description | +|---|---| +| `ConnectionString` | Azure Storage connection string. Required when `QueueServiceUri` is not set. | +| `QueueServiceUri` | Azure Queue Storage service URI used with managed identity (`DefaultAzureCredential`). Required when `ConnectionString` is not set. | +| `QueueName` | Name of the queue to send messages to. Defaults to `pulse-outbox`. | +| `MessageVisibilityTimeout` | Optional visibility timeout applied to each sent message. Defaults to the queue's default. | +| `CreateQueueIfNotExists` | Automatically creates the queue on first use. Defaults to `true`. | + +## Configuration via appsettings.json + +Register `AzureQueueStorageTransportOptionsConfiguration` to bind from the `Pulse:Transports:AzureQueueStorage` section: + +```csharp +services.AddSingleton, AzureQueueStorageTransportOptionsConfiguration>(); +``` + +```json +{ + "Pulse": { + "Transports": { + "AzureQueueStorage": { + "QueueName": "my-outbox", + "CreateQueueIfNotExists": true + } + } + } +} +``` diff --git a/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageExtensionsTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageExtensionsTests.cs new file mode 100644 index 00000000..51fe9907 --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageExtensionsTests.cs @@ -0,0 +1,191 @@ +namespace NetEvolve.Pulse.Tests.Unit.AzureQueueStorage; + +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Extensions.TUnit; +using NetEvolve.Pulse; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +[TestGroup("AzureQueueStorage")] +public sealed class AzureQueueStorageExtensionsTests +{ + private static readonly Uri FakeServiceUri = new("https://fakeaccount.queue.core.windows.net"); + private const string FakeConnectionString = + "DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;QueueEndpoint=https://127.0.0.1:10001/devstoreaccount1;"; + + [Test] + public void UseAzureQueueStorageTransport_ConnectionString_When_configurator_is_null_throws_ArgumentNullException() + { + IMediatorBuilder? configurator = null; + + _ = Assert.Throws( + "configurator", + () => configurator!.UseAzureQueueStorageTransport(FakeConnectionString) + ); + } + + [Test] + public void UseAzureQueueStorageTransport_ConnectionString_When_connectionString_is_null_throws_ArgumentException() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => + _ = Assert.Throws( + "connectionString", + () => config.UseAzureQueueStorageTransport((string)null!) + ) + ); + } + + [Test] + public void UseAzureQueueStorageTransport_Uri_When_configurator_is_null_throws_ArgumentNullException() + { + IMediatorBuilder? configurator = null; + + _ = Assert.Throws( + "configurator", + () => configurator!.UseAzureQueueStorageTransport(FakeServiceUri) + ); + } + + [Test] + public void UseAzureQueueStorageTransport_Uri_When_uri_is_null_throws_ArgumentNullException() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => + _ = Assert.Throws( + "queueServiceUri", + () => config.UseAzureQueueStorageTransport((Uri)null!) + ) + ); + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_registers_transport() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeConnectionString)); + + var descriptor = services.Single(d => d.ServiceType == typeof(IMessageTransport)); + _ = await Assert.That(descriptor.ImplementationType).IsEqualTo(typeof(AzureQueueStorageMessageTransport)); + } + + [Test] + public async Task UseAzureQueueStorageTransport_Uri_registers_transport() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeServiceUri)); + + var descriptor = services.Single(d => d.ServiceType == typeof(IMessageTransport)); + _ = await Assert.That(descriptor.ImplementationType).IsEqualTo(typeof(AzureQueueStorageMessageTransport)); + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_sets_connection_string_in_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeConnectionString)); + + var provider = services.BuildServiceProvider(); + await using (provider.ConfigureAwait(false)) + { + var options = + provider.GetRequiredService>(); + _ = await Assert.That(options.Value.ConnectionString).IsEqualTo(FakeConnectionString); + } + } + + [Test] + public async Task UseAzureQueueStorageTransport_Uri_sets_service_uri_in_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeServiceUri)); + + var provider = services.BuildServiceProvider(); + await using (provider.ConfigureAwait(false)) + { + var options = + provider.GetRequiredService>(); + _ = await Assert.That(options.Value.QueueServiceUri).IsEqualTo(FakeServiceUri); + } + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_default_options_have_correct_defaults() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeConnectionString)); + + var provider = services.BuildServiceProvider(); + await using (provider.ConfigureAwait(false)) + { + var options = + provider.GetRequiredService>(); + + using (Assert.Multiple()) + { + _ = await Assert.That(options.Value.QueueName).IsEqualTo("pulse-outbox"); + _ = await Assert.That(options.Value.CreateQueueIfNotExists).IsTrue(); + _ = await Assert.That(options.Value.MessageVisibilityTimeout).IsNull(); + } + } + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_configureOptions_is_applied() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => + config.UseAzureQueueStorageTransport( + FakeConnectionString, + options => + { + options.QueueName = "my-queue"; + options.CreateQueueIfNotExists = false; + } + ) + ); + + var provider = services.BuildServiceProvider(); + await using (provider.ConfigureAwait(false)) + { + var options = + provider.GetRequiredService>(); + + using (Assert.Multiple()) + { + _ = await Assert.That(options.Value.QueueName).IsEqualTo("my-queue"); + _ = await Assert.That(options.Value.CreateQueueIfNotExists).IsFalse(); + } + } + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_replaces_existing_transport() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddSingleton(new DummyTransport()); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeConnectionString)); + + var descriptor = services.Single(d => d.ServiceType == typeof(IMessageTransport)); + _ = await Assert.That(descriptor.ImplementationType).IsEqualTo(typeof(AzureQueueStorageMessageTransport)); + } + + [Test] + public async Task UseAzureQueueStorageTransport_ConnectionString_registers_transport_as_singleton() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseAzureQueueStorageTransport(FakeConnectionString)); + + var descriptor = services.Single(d => d.ServiceType == typeof(IMessageTransport)); + _ = await Assert.That(descriptor.Lifetime).IsEqualTo(ServiceLifetime.Singleton); + } + + private sealed class DummyTransport : IMessageTransport + { + public Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default) => + Task.CompletedTask; + } +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageMessageTransportTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageMessageTransportTests.cs new file mode 100644 index 00000000..4807a784 --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageMessageTransportTests.cs @@ -0,0 +1,330 @@ +namespace NetEvolve.Pulse.Tests.Unit.AzureQueueStorage; + +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Core; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using Microsoft.Extensions.Options; +using NetEvolve.Extensions.TUnit; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +[TestGroup("AzureQueueStorage")] +public sealed class AzureQueueStorageMessageTransportTests +{ + // ── Constructor guards ──────────────────────────────────────────────────── + + [Test] + public async Task Constructor_When_options_is_null_throws_ArgumentNullException() + { + IOptions options = null!; + + _ = await Assert.That(() => new AzureQueueStorageMessageTransport(options)).Throws(); + } + + [Test] + public async Task Constructor_With_queueClient_When_options_is_null_throws_ArgumentNullException() + { + IOptions options = null!; + var fakeClient = new FakeQueueClient(); + + _ = await Assert + .That(() => new AzureQueueStorageMessageTransport(options, fakeClient)) + .Throws(); + } + + [Test] + public async Task Constructor_With_queueClient_When_queueClient_is_null_throws_ArgumentNullException() + { + var options = Options.Create( + new AzureQueueStorageTransportOptions { ConnectionString = "UseDevelopmentStorage=true" } + ); + QueueClient nullClient = null!; + + _ = await Assert + .That(() => new AzureQueueStorageMessageTransport(options, nullClient)) + .Throws(); + } + + // ── SendAsync null guard ────────────────────────────────────────────────── + + [Test] + public async Task SendAsync_When_message_is_null_throws_ArgumentNullException(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + + _ = await Assert.That(() => transport.SendAsync(null!, cancellationToken)).Throws(); + } + + // ── SendAsync happy path ────────────────────────────────────────────────── + + [Test] + public async Task SendAsync_Sends_base64_encoded_message(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + var message = CreateOutboxMessage(); + + await transport.SendAsync(message, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(fakeClient.SentMessages).HasSingleItem(); + + var sentBase64 = fakeClient.SentMessages[0]; + var decodedBytes = Convert.FromBase64String(sentBase64); + var json = Encoding.UTF8.GetString(decodedBytes); + + _ = await Assert.That(json).IsNotNullOrEmpty(); + using var doc = JsonDocument.Parse(json); + _ = await Assert.That(doc.RootElement.GetProperty("id").GetGuid()).IsEqualTo(message.Id); + _ = await Assert + .That(doc.RootElement.GetProperty("eventType").GetString()) + .IsEqualTo(message.EventType.ToOutboxEventTypeName()); + _ = await Assert.That(doc.RootElement.GetProperty("payload").GetString()).IsEqualTo(message.Payload); + } + + [Test] + public async Task SendAsync_Passes_visibility_timeout_when_configured(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + var timeout = TimeSpan.FromMinutes(5); + var options = Options.Create( + new AzureQueueStorageTransportOptions + { + ConnectionString = "UseDevelopmentStorage=true", + MessageVisibilityTimeout = timeout, + } + ); + using var transport = new AzureQueueStorageMessageTransport(options, fakeClient); + + await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(fakeClient.LastVisibilityTimeout).IsEqualTo(timeout); + } + + [Test] + public async Task SendAsync_Does_not_pass_visibility_timeout_when_not_configured( + CancellationToken cancellationToken + ) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + + await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(fakeClient.LastVisibilityTimeout).IsNull(); + } + + // ── SendAsync oversized message ─────────────────────────────────────────── + + [Test] + public async Task SendAsync_When_message_exceeds_48KB_throws_InvalidOperationException( + CancellationToken cancellationToken + ) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + + // Create a large payload that exceeds 48 KB raw + var largePayload = new string('x', 50 * 1024); + var message = CreateOutboxMessage(payload: largePayload); + + _ = await Assert + .That(() => transport.SendAsync(message, cancellationToken)) + .Throws(); + } + + [Test] + public async Task SendAsync_When_message_is_small_does_not_throw(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + var message = CreateOutboxMessage(); + + // This should not throw + await transport.SendAsync(message, cancellationToken).ConfigureAwait(false); + _ = await Assert.That(fakeClient.SentMessages).HasSingleItem(); + } + + // ── SendBatchAsync ──────────────────────────────────────────────────────── + + [Test] + public async Task SendBatchAsync_When_messages_is_null_throws_ArgumentNullException( + CancellationToken cancellationToken + ) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + + _ = await Assert.That(() => transport.SendBatchAsync(null!, cancellationToken)).Throws(); + } + + [Test] + public async Task SendBatchAsync_When_messages_is_empty_does_nothing(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + + await transport.SendBatchAsync([], cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(fakeClient.SentMessages).IsEmpty(); + } + + [Test] + public async Task SendBatchAsync_Sends_each_message_sequentially(CancellationToken cancellationToken) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage(), CreateOutboxMessage() }; + + await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(fakeClient.SentMessages.Count).IsEqualTo(3); + } + + [Test] + public async Task SendBatchAsync_When_one_message_is_oversized_throws_InvalidOperationException( + CancellationToken cancellationToken + ) + { + var fakeClient = new FakeQueueClient(); + using var transport = CreateTransport(fakeClient); + var largePayload = new string('x', 50 * 1024); + var messages = new[] + { + CreateOutboxMessage(), + CreateOutboxMessage(payload: largePayload), + CreateOutboxMessage(), + }; + + _ = await Assert + .That(() => transport.SendBatchAsync(messages, cancellationToken)) + .Throws(); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static AzureQueueStorageMessageTransport CreateTransport(FakeQueueClient fakeClient) + { + var options = Options.Create( + new AzureQueueStorageTransportOptions { ConnectionString = "UseDevelopmentStorage=true" } + ); + return new AzureQueueStorageMessageTransport(options, fakeClient); + } + + private static OutboxMessage CreateOutboxMessage(string? payload = null) => + new() + { + Id = Guid.NewGuid(), + EventType = typeof(TestOutboxEvent), + Payload = payload ?? """{"data":"test"}""", + CorrelationId = "corr-123", + CreatedAt = DateTimeOffset.UtcNow, + UpdatedAt = DateTimeOffset.UtcNow, + RetryCount = 0, + }; + + // ── Fakes ───────────────────────────────────────────────────────────────── + + private sealed class FakeQueueClient : QueueClient + { + public List SentMessages { get; } = []; + public TimeSpan? LastVisibilityTimeout { get; private set; } + public int CreateIfNotExistsCallCount { get; private set; } + + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Reliability", + "CA2000:Dispose objects before losing scope", + Justification = "FakeAzureResponse is passed to Response.FromValue and immediately returned; its Dispose is a no-op." + )] + public override Task> SendMessageAsync( + string messageText, + TimeSpan? visibilityTimeout = null, + TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default + ) + { + SentMessages.Add(messageText); + LastVisibilityTimeout = visibilityTimeout; + var receipt = QueuesModelFactory.SendReceipt( + messageId: Guid.NewGuid().ToString(), + insertionTime: DateTimeOffset.UtcNow, + expirationTime: DateTimeOffset.UtcNow.AddDays(7), + popReceipt: "pop-receipt", + timeNextVisible: DateTimeOffset.UtcNow + ); + return Task.FromResult(Response.FromValue(receipt, new FakeAzureResponse())); + } + + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Reliability", + "CA2000:Dispose objects before losing scope", + Justification = "FakeAzureResponse is immediately returned as the raw response; its Dispose is a no-op." + )] + public override Task CreateIfNotExistsAsync( + IDictionary? metadata = null, + CancellationToken cancellationToken = default + ) + { + CreateIfNotExistsCallCount++; + return Task.FromResult(new FakeAzureResponse()); + } + } + + private sealed class FakeAzureResponse : Response + { + public override int Status => 200; + public override string ReasonPhrase => "OK"; + public override Stream? ContentStream + { + get => null; + set { } + } + public override string ClientRequestId + { + get => string.Empty; + set { } + } + + public override void Dispose() { } + + protected override bool ContainsHeader(string name) => false; + + protected override IEnumerable EnumerateHeaders() => []; + + protected override bool TryGetHeader( + string name, + [System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out string? value + ) + { + value = null; + return false; + } + + protected override bool TryGetHeaderValues( + string name, + [System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out IEnumerable? values + ) + { + values = null; + return false; + } + } + + private sealed record TestOutboxEvent : IEvent + { + public string? CausationId { get; set; } + public string? CorrelationId { get; set; } + public string Id { get; init; } = Guid.NewGuid().ToString(); + public DateTimeOffset? PublishedAt { get; set; } + } +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageTransportOptionsValidatorTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageTransportOptionsValidatorTests.cs new file mode 100644 index 00000000..7828bf4c --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Unit/AzureQueueStorage/AzureQueueStorageTransportOptionsValidatorTests.cs @@ -0,0 +1,107 @@ +namespace NetEvolve.Pulse.Tests.Unit.AzureQueueStorage; + +using NetEvolve.Extensions.TUnit; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +[TestGroup("AzureQueueStorage")] +public sealed class AzureQueueStorageTransportOptionsValidatorTests +{ + private static readonly AzureQueueStorageTransportOptionsValidator _validator = new(); + + [Test] + public async Task Validate_When_neither_ConnectionString_nor_Uri_provided_fails() + { + var options = new AzureQueueStorageTransportOptions { ConnectionString = null, QueueServiceUri = null }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Failed).IsTrue(); + } + + [Test] + public async Task Validate_When_ConnectionString_provided_succeeds() + { + var options = new AzureQueueStorageTransportOptions { ConnectionString = "UseDevelopmentStorage=true" }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Succeeded).IsTrue(); + } + + [Test] + public async Task Validate_When_QueueServiceUri_provided_succeeds() + { + var options = new AzureQueueStorageTransportOptions + { + QueueServiceUri = new Uri("https://account.queue.core.windows.net"), + }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Succeeded).IsTrue(); + } + + [Test] + public async Task Validate_When_QueueName_is_empty_fails() + { + var options = new AzureQueueStorageTransportOptions + { + ConnectionString = "UseDevelopmentStorage=true", + QueueName = string.Empty, + }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Failed).IsTrue(); + } + + [Test] + public async Task Validate_When_QueueName_is_whitespace_fails() + { + var options = new AzureQueueStorageTransportOptions + { + ConnectionString = "UseDevelopmentStorage=true", + QueueName = " ", + }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Failed).IsTrue(); + } + + [Test] + public async Task Validate_Default_options_without_ConnectionString_fails() + { + var options = new AzureQueueStorageTransportOptions(); + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Failed).IsTrue(); + } + + [Test] + public async Task Validate_ConnectionString_is_whitespace_and_no_Uri_fails() + { + var options = new AzureQueueStorageTransportOptions { ConnectionString = " " }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Failed).IsTrue(); + } + + [Test] + public async Task Validate_Both_ConnectionString_and_QueueServiceUri_provided_succeeds() + { + var options = new AzureQueueStorageTransportOptions + { + ConnectionString = "UseDevelopmentStorage=true", + QueueServiceUri = new Uri("https://account.queue.core.windows.net"), + }; + + var result = _validator.Validate(null, options); + + _ = await Assert.That(result.Succeeded).IsTrue(); + } +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/NetEvolve.Pulse.Tests.Unit.csproj b/tests/NetEvolve.Pulse.Tests.Unit/NetEvolve.Pulse.Tests.Unit.csproj index d07b44d9..4984105b 100644 --- a/tests/NetEvolve.Pulse.Tests.Unit/NetEvolve.Pulse.Tests.Unit.csproj +++ b/tests/NetEvolve.Pulse.Tests.Unit/NetEvolve.Pulse.Tests.Unit.csproj @@ -28,6 +28,7 @@ +