diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 52780bef2d..9d874478da 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -82,6 +82,7 @@ + diff --git a/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 657a84244d..b6b3b8048a 100644 --- a/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -125,7 +125,7 @@ async Task InitializeServiceControl(ScenarioContext context) hostBuilder.AddServiceControlAuthentication(settings.OpenIdConnectSettings); hostBuilder.AddServiceControl(settings, configuration); hostBuilder.AddServiceControlHttps(settings.HttpsSettings); - hostBuilder.AddServiceControlApi(settings.CorsSettings); + hostBuilder.AddServiceControlApi(settings); hostBuilder.AddServiceControlTesting(settings); @@ -135,7 +135,7 @@ async Task InitializeServiceControl(ScenarioContext context) host.UseTestRemoteIp(); host.UseServiceControlAuthentication(settings.OpenIdConnectSettings.Enabled); - host.UseServiceControl(settings.ForwardedHeadersSettings, settings.HttpsSettings); + host.UseServiceControl(settings.ForwardedHeadersSettings, settings.HttpsSettings, settings.EnableMcpServer); await host.StartAsync(); DomainEvents = host.Services.GetRequiredService(); // Bring this back and look into the base address of the client diff --git a/src/ServiceControl.Audit.AcceptanceTests/Mcp/When_mcp_server_is_enabled.cs b/src/ServiceControl.Audit.AcceptanceTests/Mcp/When_mcp_server_is_enabled.cs new file mode 100644 index 0000000000..8e30222545 --- /dev/null +++ b/src/ServiceControl.Audit.AcceptanceTests/Mcp/When_mcp_server_is_enabled.cs @@ -0,0 +1,214 @@ +namespace ServiceControl.Audit.AcceptanceTests.Mcp; + +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Json; +using System.Text.Json; +using System.Threading.Tasks; +using AcceptanceTesting; +using AcceptanceTesting.EndpointTemplates; +using Audit.Auditing.MessagesView; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTesting.Customization; +using NServiceBus.Settings; +using NUnit.Framework; + +class When_mcp_server_is_enabled : AcceptanceTest +{ + [SetUp] + public void EnableMcp() => SetSettings = s => s.EnableMcpServer = true; + + [Test] + public async Task Should_expose_mcp_endpoint() + { + await Define() + .Done(async _ => + { + var response = await InitializeMcpSession(); + return response.StatusCode == HttpStatusCode.OK; + }) + .Run(); + } + + [Test] + public async Task Should_list_audit_message_tools() + { + string toolsJson = null; + + await Define() + .Done(async _ => + { + var sessionId = await InitializeAndGetSessionId(); + if (sessionId == null) + { + return false; + } + + var response = await SendMcpRequest(sessionId, "tools/list", new { }); + if (response == null) + { + return false; + } + + toolsJson = await response.Content.ReadAsStringAsync(); + return response.StatusCode == HttpStatusCode.OK; + }) + .Run(); + + Assert.That(toolsJson, Is.Not.Null); + var doc = JsonDocument.Parse(toolsJson); + var result = doc.RootElement.GetProperty("result"); + var tools = result.GetProperty("tools"); + + var toolNames = tools.EnumerateArray() + .Select(t => t.GetProperty("name").GetString()) + .ToList(); + + Assert.That(toolNames, Does.Contain("GetAuditMessages")); + Assert.That(toolNames, Does.Contain("SearchAuditMessages")); + Assert.That(toolNames, Does.Contain("GetAuditMessagesByEndpoint")); + Assert.That(toolNames, Does.Contain("GetAuditMessagesByConversation")); + Assert.That(toolNames, Does.Contain("GetAuditMessageBody")); + Assert.That(toolNames, Does.Contain("GetKnownEndpoints")); + Assert.That(toolNames, Does.Contain("GetEndpointAuditCounts")); + } + + [Test] + public async Task Should_call_get_audit_messages_tool() + { + string toolResult = null; + + var context = await Define() + .WithEndpoint(b => b.When((bus, c) => bus.Send(new MyMessage()))) + .WithEndpoint() + .Done(async c => + { + if (c.MessageId == null) + { + return false; + } + + // Wait for the message to be ingested + if (!await this.TryGetMany("/api/messages?include_system_messages=false&sort=id", m => m.MessageId == c.MessageId)) + { + return false; + } + + var sessionId = await InitializeAndGetSessionId(); + if (sessionId == null) + { + return false; + } + + var response = await SendMcpRequest(sessionId, "tools/call", new + { + name = "GetAuditMessages", + arguments = new { includeSystemMessages = false, page = 1, perPage = 50 } + }); + + if (response == null || response.StatusCode != HttpStatusCode.OK) + { + return false; + } + + toolResult = await response.Content.ReadAsStringAsync(); + return true; + }) + .Run(); + + Assert.That(toolResult, Is.Not.Null); + var doc = JsonDocument.Parse(toolResult); + var result = doc.RootElement.GetProperty("result"); + var content = result.GetProperty("content"); + var textContent = content.EnumerateArray().First().GetProperty("text").GetString(); + var messagesResult = JsonDocument.Parse(textContent); + Assert.That(messagesResult.RootElement.GetProperty("totalCount").GetInt32(), Is.GreaterThanOrEqualTo(1)); + } + + async Task InitializeMcpSession() + { + var request = new HttpRequestMessage(HttpMethod.Post, "/mcp") + { + Content = JsonContent.Create(new + { + jsonrpc = "2.0", + id = 1, + method = "initialize", + @params = new + { + protocolVersion = "2025-03-26", + capabilities = new { }, + clientInfo = new { name = "test-client", version = "1.0" } + } + }) + }; + return await HttpClient.SendAsync(request); + } + + async Task InitializeAndGetSessionId() + { + var response = await InitializeMcpSession(); + if (response.StatusCode != HttpStatusCode.OK) + { + return null; + } + + if (response.Headers.TryGetValues("mcp-session-id", out var values)) + { + return values.FirstOrDefault(); + } + + return null; + } + + async Task SendMcpRequest(string sessionId, string method, object @params) + { + var request = new HttpRequestMessage(HttpMethod.Post, "/mcp") + { + Content = JsonContent.Create(new + { + jsonrpc = "2.0", + id = 2, + method, + @params + }) + }; + request.Headers.Add("mcp-session-id", sessionId); + return await HttpClient.SendAsync(request); + } + + public class Sender : EndpointConfigurationBuilder + { + public Sender() => + EndpointSetup(c => + { + var routing = c.ConfigureRouting(); + routing.RouteToEndpoint(typeof(MyMessage), typeof(Receiver)); + }); + } + + public class Receiver : EndpointConfigurationBuilder + { + public Receiver() => EndpointSetup(); + + public class MyMessageHandler(MyContext testContext, IReadOnlySettings settings) : IHandleMessages + { + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + testContext.EndpointNameOfReceivingEndpoint = settings.EndpointName(); + testContext.MessageId = context.MessageId; + return Task.CompletedTask; + } + } + } + + public class MyMessage : ICommand; + + public class MyContext : ScenarioContext + { + public string MessageId { get; set; } + public string EndpointNameOfReceivingEndpoint { get; set; } + } +} diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index efcd99c0f6..c197a2e54a 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -133,7 +133,7 @@ async Task InitializeServiceControl(ScenarioContext context) return criticalErrorContext.Stop(cancellationToken); }, settings, configuration); - hostBuilder.AddServiceControlAuditApi(settings.CorsSettings); + hostBuilder.AddServiceControlAuditApi(settings); hostBuilder.AddServiceControlHttps(settings.HttpsSettings); hostBuilder.AddServiceControlAuditTesting(settings); @@ -144,7 +144,7 @@ async Task InitializeServiceControl(ScenarioContext context) host.UseTestRemoteIp(); host.UseServiceControlAuthentication(settings.OpenIdConnectSettings.Enabled); - host.UseServiceControlAudit(settings.ForwardedHeadersSettings, settings.HttpsSettings); + host.UseServiceControlAudit(settings.ForwardedHeadersSettings, settings.HttpsSettings, settings.EnableMcpServer); await host.StartAsync(); ServiceProvider = host.Services; InstanceTestServer = host.GetTestServer(); diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index 83897faeba..fa983cf7d6 100644 --- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -58,5 +58,6 @@ "ServiceControlQueueAddress": "Particular.ServiceControl", "TimeToRestartAuditIngestionAfterFailure": "00:01:00", "EnableFullTextSearchOnBodies": true, + "EnableMcpServer": false, "ShutdownTimeout": "00:00:05" } \ No newline at end of file diff --git a/src/ServiceControl.Audit.UnitTests/Mcp/AuditMessageMcpToolsTests.cs b/src/ServiceControl.Audit.UnitTests/Mcp/AuditMessageMcpToolsTests.cs new file mode 100644 index 0000000000..f47daaf176 --- /dev/null +++ b/src/ServiceControl.Audit.UnitTests/Mcp/AuditMessageMcpToolsTests.cs @@ -0,0 +1,188 @@ +#nullable enable + +namespace ServiceControl.Audit.UnitTests.Mcp; + +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Audit.Auditing; +using Audit.Auditing.MessagesView; +using Audit.Infrastructure; +using Audit.Mcp; +using Audit.Monitoring; +using Audit.Persistence; +using NUnit.Framework; +using ServiceControl.SagaAudit; + +[TestFixture] +class AuditMessageMcpToolsTests +{ + StubAuditDataStore store = null!; + AuditMessageTools tools = null!; + + [SetUp] + public void SetUp() + { + store = new StubAuditDataStore(); + tools = new AuditMessageTools(store); + } + + [Test] + public async Task GetAuditMessages_returns_messages() + { + store.MessagesResult = new QueryResult>( + [new() { MessageId = "msg-1", MessageType = "MyNamespace.MyMessage" }], + new QueryStatsInfo("etag", 1)); + + var result = await tools.GetAuditMessages(); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("totalCount").GetInt32(), Is.EqualTo(1)); + Assert.That(doc.RootElement.GetProperty("results").GetArrayLength(), Is.EqualTo(1)); + } + + [Test] + public async Task GetAuditMessages_passes_paging_and_sort_parameters() + { + await tools.GetAuditMessages(page: 2, perPage: 25, sort: "processed_at", direction: "asc"); + + Assert.That(store.LastGetMessagesArgs, Is.Not.Null); + Assert.That(store.LastGetMessagesArgs!.Value.PagingInfo.Page, Is.EqualTo(2)); + Assert.That(store.LastGetMessagesArgs!.Value.PagingInfo.PageSize, Is.EqualTo(25)); + Assert.That(store.LastGetMessagesArgs!.Value.SortInfo.Sort, Is.EqualTo("processed_at")); + Assert.That(store.LastGetMessagesArgs!.Value.SortInfo.Direction, Is.EqualTo("asc")); + } + + [Test] + public async Task SearchAuditMessages_passes_query() + { + await tools.SearchAuditMessages("OrderPlaced"); + + Assert.That(store.LastQueryMessagesSearchParam, Is.EqualTo("OrderPlaced")); + } + + [Test] + public async Task GetAuditMessagesByEndpoint_queries_by_endpoint() + { + await tools.GetAuditMessagesByEndpoint("Sales"); + + Assert.That(store.LastQueryByEndpointName, Is.EqualTo("Sales")); + Assert.That(store.LastQueryByEndpointKeyword, Is.Null); + } + + [Test] + public async Task GetAuditMessagesByEndpoint_with_keyword_uses_keyword_query() + { + await tools.GetAuditMessagesByEndpoint("Sales", keyword: "OrderPlaced"); + + Assert.That(store.LastQueryByEndpointAndKeywordEndpoint, Is.EqualTo("Sales")); + Assert.That(store.LastQueryByEndpointAndKeywordKeyword, Is.EqualTo("OrderPlaced")); + } + + [Test] + public async Task GetAuditMessagesByConversation_queries_by_conversation_id() + { + await tools.GetAuditMessagesByConversation("conv-123"); + + Assert.That(store.LastConversationId, Is.EqualTo("conv-123")); + } + + [Test] + public async Task GetAuditMessageBody_returns_body_content() + { + store.MessageBodyResult = MessageBodyView.FromString("{\"orderId\": 123}", "application/json", 16, "etag"); + + var result = await tools.GetAuditMessageBody("msg-1"); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("contentType").GetString(), Is.EqualTo("application/json")); + Assert.That(doc.RootElement.GetProperty("body").GetString(), Is.EqualTo("{\"orderId\": 123}")); + } + + [Test] + public async Task GetAuditMessageBody_returns_error_when_not_found() + { + store.MessageBodyResult = MessageBodyView.NotFound(); + + var result = await tools.GetAuditMessageBody("msg-missing"); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("error").GetString(), Does.Contain("not found")); + } + + [Test] + public async Task GetAuditMessageBody_returns_error_when_no_content() + { + store.MessageBodyResult = MessageBodyView.NoContent(); + + var result = await tools.GetAuditMessageBody("msg-empty"); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("error").GetString(), Does.Contain("no body content")); + } + + class StubAuditDataStore : IAuditDataStore + { + static readonly QueryResult> EmptyMessagesResult = new([], QueryStatsInfo.Zero); + static readonly QueryResult> EmptyEndpointsResult = new([], QueryStatsInfo.Zero); + static readonly QueryResult> EmptyAuditCountsResult = new([], QueryStatsInfo.Zero); + + public QueryResult>? MessagesResult { get; set; } + public MessageBodyView MessageBodyResult { get; set; } = MessageBodyView.NotFound(); + + // Captured arguments + public (bool IncludeSystemMessages, PagingInfo PagingInfo, SortInfo SortInfo, DateTimeRange? TimeSentRange)? LastGetMessagesArgs { get; private set; } + public string? LastQueryMessagesSearchParam { get; private set; } + public string? LastQueryByEndpointName { get; private set; } + public string? LastQueryByEndpointKeyword { get; private set; } + public string? LastQueryByEndpointAndKeywordEndpoint { get; private set; } + public string? LastQueryByEndpointAndKeywordKeyword { get; private set; } + public string? LastConversationId { get; private set; } + + public Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + { + LastGetMessagesArgs = (includeSystemMessages, pagingInfo, sortInfo, timeSentRange); + return Task.FromResult(MessagesResult ?? EmptyMessagesResult); + } + + public Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + { + LastQueryMessagesSearchParam = searchParam; + return Task.FromResult(MessagesResult ?? EmptyMessagesResult); + } + + public Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + { + LastQueryByEndpointName = endpointName; + LastQueryByEndpointKeyword = null; + return Task.FromResult(MessagesResult ?? EmptyMessagesResult); + } + + public Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + { + LastQueryByEndpointAndKeywordEndpoint = endpoint; + LastQueryByEndpointAndKeywordKeyword = keyword; + return Task.FromResult(MessagesResult ?? EmptyMessagesResult); + } + + public Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) + { + LastConversationId = conversationId; + return Task.FromResult(MessagesResult ?? EmptyMessagesResult); + } + + public Task GetMessageBody(string messageId, CancellationToken cancellationToken) + => Task.FromResult(MessageBodyResult); + + public Task>> QueryKnownEndpoints(CancellationToken cancellationToken) + => Task.FromResult(EmptyEndpointsResult); + + public Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) + => Task.FromResult(QueryResult.Empty()); + + public Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) + => Task.FromResult(EmptyAuditCountsResult); + } +} diff --git a/src/ServiceControl.Audit.UnitTests/Mcp/EndpointMcpToolsTests.cs b/src/ServiceControl.Audit.UnitTests/Mcp/EndpointMcpToolsTests.cs new file mode 100644 index 0000000000..0b3951ce97 --- /dev/null +++ b/src/ServiceControl.Audit.UnitTests/Mcp/EndpointMcpToolsTests.cs @@ -0,0 +1,96 @@ +#nullable enable + +namespace ServiceControl.Audit.UnitTests.Mcp; + +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Audit.Auditing; +using Audit.Auditing.MessagesView; +using Audit.Infrastructure; +using Audit.Mcp; +using Audit.Monitoring; +using Audit.Persistence; +using NUnit.Framework; +using ServiceControl.SagaAudit; + +[TestFixture] +class EndpointMcpToolsTests +{ + StubAuditDataStore store = null!; + EndpointTools tools = null!; + + [SetUp] + public void SetUp() + { + store = new StubAuditDataStore(); + tools = new EndpointTools(store); + } + + [Test] + public async Task GetKnownEndpoints_returns_endpoints() + { + store.KnownEndpointsResult = new QueryResult>( + [new() { EndpointDetails = new EndpointDetails { Name = "Sales", Host = "server1" } }], + new QueryStatsInfo("etag", 1)); + + var result = await tools.GetKnownEndpoints(); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("totalCount").GetInt32(), Is.EqualTo(1)); + Assert.That(doc.RootElement.GetProperty("results").GetArrayLength(), Is.EqualTo(1)); + } + + [Test] + public async Task GetEndpointAuditCounts_returns_counts() + { + store.AuditCountsResult = new QueryResult>( + [new() { UtcDate = DateTime.UtcNow.Date, Count = 42 }], + new QueryStatsInfo("etag", 1)); + + var result = await tools.GetEndpointAuditCounts("Sales"); + var doc = JsonDocument.Parse(result); + + Assert.That(doc.RootElement.GetProperty("totalCount").GetInt32(), Is.EqualTo(1)); + Assert.That(store.LastAuditCountsEndpointName, Is.EqualTo("Sales")); + } + + class StubAuditDataStore : IAuditDataStore + { + public QueryResult>? KnownEndpointsResult { get; set; } + public QueryResult>? AuditCountsResult { get; set; } + public string? LastAuditCountsEndpointName { get; private set; } + + public Task>> QueryKnownEndpoints(CancellationToken cancellationToken) + => Task.FromResult(KnownEndpointsResult ?? new QueryResult>([], QueryStatsInfo.Zero)); + + public Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) + { + LastAuditCountsEndpointName = endpointName; + return Task.FromResult(AuditCountsResult ?? new QueryResult>([], QueryStatsInfo.Zero)); + } + + public Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + => Task.FromResult(new QueryResult>([], QueryStatsInfo.Zero)); + + public Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + => Task.FromResult(new QueryResult>([], QueryStatsInfo.Zero)); + + public Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + => Task.FromResult(new QueryResult>([], QueryStatsInfo.Zero)); + + public Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) + => Task.FromResult(new QueryResult>([], QueryStatsInfo.Zero)); + + public Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) + => Task.FromResult(new QueryResult>([], QueryStatsInfo.Zero)); + + public Task GetMessageBody(string messageId, CancellationToken cancellationToken) + => Task.FromResult(MessageBodyView.NotFound()); + + public Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) + => Task.FromResult(QueryResult.Empty()); + } +} diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config index a3f5781c51..8adfc4075d 100644 --- a/src/ServiceControl.Audit/App.config +++ b/src/ServiceControl.Audit/App.config @@ -8,6 +8,8 @@ These settings are only here so that we can debug ServiceControl while developin + + diff --git a/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/RunCommand.cs b/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/RunCommand.cs index 22e2fff776..7ddfcf46cd 100644 --- a/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/RunCommand.cs +++ b/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/RunCommand.cs @@ -25,10 +25,10 @@ public override async Task Execute(HostArguments args, Settings settings) //Do nothing. The transports in NSB 8 are designed to handle broker outages. Audit ingestion will be paused when broker is unavailable. return Task.CompletedTask; }, settings, endpointConfiguration); - hostBuilder.AddServiceControlAuditApi(settings.CorsSettings); + hostBuilder.AddServiceControlAuditApi(settings); var app = hostBuilder.Build(); - app.UseServiceControlAudit(settings.ForwardedHeadersSettings, settings.HttpsSettings); + app.UseServiceControlAudit(settings.ForwardedHeadersSettings, settings.HttpsSettings, settings.EnableMcpServer); app.UseServiceControlAuthentication(settings.OpenIdConnectSettings.Enabled); await app.RunAsync(settings.RootUrl); diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index 3203bd349e..22ec971d12 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -54,6 +54,7 @@ public Settings(string transportType = null, string persisterType = null, Loggin ServiceControlQueueAddress = SettingsReader.Read(SettingsRootNamespace, "ServiceControlQueueAddress"); TimeToRestartAuditIngestionAfterFailure = GetTimeToRestartAuditIngestionAfterFailure(); EnableFullTextSearchOnBodies = SettingsReader.Read(SettingsRootNamespace, "EnableFullTextSearchOnBodies", true); + EnableMcpServer = SettingsReader.Read(SettingsRootNamespace, "EnableMcpServer", false); ShutdownTimeout = SettingsReader.Read(SettingsRootNamespace, "ShutdownTimeout", ShutdownTimeout); AssemblyLoadContextResolver = static assemblyPath => new PluginAssemblyLoadContext(assemblyPath); @@ -187,6 +188,8 @@ public int MaxBodySizeToStore public bool EnableFullTextSearchOnBodies { get; set; } + public bool EnableMcpServer { get; set; } + // The default value is set to the maximum allowed time by the most // restrictive hosting platform, which is Linux containers. Linux // containers allow for a maximum of 10 seconds. We set it to 5 to diff --git a/src/ServiceControl.Audit/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs index 638041d4b1..f650640314 100644 --- a/src/ServiceControl.Audit/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs @@ -4,13 +4,22 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; + using ModelContextProtocol.AspNetCore; using ServiceControl.Infrastructure; static class HostApplicationBuilderExtensions { - public static void AddServiceControlAuditApi(this IHostApplicationBuilder builder, CorsSettings corsSettings) + public static void AddServiceControlAuditApi(this IHostApplicationBuilder builder, Settings.Settings settings) { - builder.Services.AddCors(options => options.AddDefaultPolicy(Cors.GetDefaultPolicy(corsSettings))); + if (settings.EnableMcpServer) + { + builder.Services + .AddMcpServer() + .WithHttpTransport() + .WithToolsFromAssembly(); + } + + builder.Services.AddCors(options => options.AddDefaultPolicy(Cors.GetDefaultPolicy(settings.CorsSettings))); // We're not explicitly adding Gzip here because it's already in the default list of supported compressors builder.Services.AddResponseCompression(); diff --git a/src/ServiceControl.Audit/Mcp/AuditMessageTools.cs b/src/ServiceControl.Audit/Mcp/AuditMessageTools.cs new file mode 100644 index 0000000000..b4fddeaa51 --- /dev/null +++ b/src/ServiceControl.Audit/Mcp/AuditMessageTools.cs @@ -0,0 +1,147 @@ +#nullable enable + +namespace ServiceControl.Audit.Mcp; + +using System.ComponentModel; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Infrastructure; +using ModelContextProtocol.Server; +using Persistence; + +[McpServerToolType] +public class AuditMessageTools(IAuditDataStore store) +{ + [McpServerTool, Description("Get a list of successfully processed audit messages. Supports paging and sorting. Returns message metadata including endpoints, timing information, and message type.")] + public async Task GetAuditMessages( + [Description("Whether to include system messages in results. Default is false")] bool includeSystemMessages = false, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, processed_at, message_type, critical_time, delivery_time, processing_time. Default is time_sent")] string sort = "time_sent", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc", + [Description("Filter by time sent start (ISO 8601 format)")] string? timeSentFrom = null, + [Description("Filter by time sent end (ISO 8601 format)")] string? timeSentTo = null, + CancellationToken cancellationToken = default) + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + var timeSentRange = new DateTimeRange(timeSentFrom, timeSentTo); + + var results = await store.GetMessages(includeSystemMessages, pagingInfo, sortInfo, timeSentRange, cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Search audit messages by a keyword or phrase. Searches across message content and metadata.")] + public async Task SearchAuditMessages( + [Description("The search query string")] string query, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, processed_at, message_type, critical_time, delivery_time, processing_time. Default is time_sent")] string sort = "time_sent", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc", + [Description("Filter by time sent start (ISO 8601 format)")] string? timeSentFrom = null, + [Description("Filter by time sent end (ISO 8601 format)")] string? timeSentTo = null, + CancellationToken cancellationToken = default) + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + var timeSentRange = new DateTimeRange(timeSentFrom, timeSentTo); + + var results = await store.QueryMessages(query, pagingInfo, sortInfo, timeSentRange, cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get audit messages received by a specific endpoint. Can optionally filter by keyword.")] + public async Task GetAuditMessagesByEndpoint( + [Description("The name of the receiving endpoint")] string endpointName, + [Description("Optional keyword to filter messages")] string? keyword = null, + [Description("Whether to include system messages in results. Default is false")] bool includeSystemMessages = false, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, processed_at, message_type, critical_time, delivery_time, processing_time. Default is time_sent")] string sort = "time_sent", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc", + [Description("Filter by time sent start (ISO 8601 format)")] string? timeSentFrom = null, + [Description("Filter by time sent end (ISO 8601 format)")] string? timeSentTo = null, + CancellationToken cancellationToken = default) + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + var timeSentRange = new DateTimeRange(timeSentFrom, timeSentTo); + + var results = keyword != null + ? await store.QueryMessagesByReceivingEndpointAndKeyword(endpointName, keyword, pagingInfo, sortInfo, timeSentRange, cancellationToken) + : await store.QueryMessagesByReceivingEndpoint(includeSystemMessages, endpointName, pagingInfo, sortInfo, timeSentRange, cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get all audit messages that belong to a specific conversation. A conversation groups related messages that were triggered by the same initial message.")] + public async Task GetAuditMessagesByConversation( + [Description("The conversation ID to filter by")] string conversationId, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, processed_at, message_type, critical_time, delivery_time, processing_time. Default is time_sent")] string sort = "time_sent", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc", + CancellationToken cancellationToken = default) + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + + var results = await store.QueryMessagesByConversationId(conversationId, pagingInfo, sortInfo, cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get the body content of a specific audit message by its message ID.")] + public async Task GetAuditMessageBody( + [Description("The message ID")] string messageId, + CancellationToken cancellationToken = default) + { + var result = await store.GetMessageBody(messageId, cancellationToken); + + if (!result.Found) + { + return JsonSerializer.Serialize(new { Error = $"Message '{messageId}' not found." }, McpJsonOptions.Default); + } + + if (!result.HasContent) + { + return JsonSerializer.Serialize(new { Error = $"Message '{messageId}' has no body content." }, McpJsonOptions.Default); + } + + if (result.StringContent != null) + { + return JsonSerializer.Serialize(new + { + result.ContentType, + result.ContentLength, + Body = result.StringContent + }, McpJsonOptions.Default); + } + + return JsonSerializer.Serialize(new + { + result.ContentType, + result.ContentLength, + Body = "(stream content - not available as text)" + }, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl.Audit/Mcp/EndpointTools.cs b/src/ServiceControl.Audit/Mcp/EndpointTools.cs new file mode 100644 index 0000000000..705a88fbb2 --- /dev/null +++ b/src/ServiceControl.Audit/Mcp/EndpointTools.cs @@ -0,0 +1,40 @@ +#nullable enable + +namespace ServiceControl.Audit.Mcp; + +using System.ComponentModel; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using ModelContextProtocol.Server; +using Persistence; + +[McpServerToolType] +public class EndpointTools(IAuditDataStore store) +{ + [McpServerTool, Description("Get a list of all known endpoints that have sent or received audit messages.")] + public async Task GetKnownEndpoints(CancellationToken cancellationToken = default) + { + var results = await store.QueryKnownEndpoints(cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get audit message counts per day for a specific endpoint. Useful for understanding message throughput.")] + public async Task GetEndpointAuditCounts( + [Description("The name of the endpoint")] string endpointName, + CancellationToken cancellationToken = default) + { + var results = await store.QueryAuditCounts(endpointName, cancellationToken); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl.Audit/Mcp/McpJsonOptions.cs b/src/ServiceControl.Audit/Mcp/McpJsonOptions.cs new file mode 100644 index 0000000000..ff03d91eae --- /dev/null +++ b/src/ServiceControl.Audit/Mcp/McpJsonOptions.cs @@ -0,0 +1,16 @@ +#nullable enable + +namespace ServiceControl.Audit.Mcp; + +using System.Text.Json; +using System.Text.Json.Serialization; + +static class McpJsonOptions +{ + public static JsonSerializerOptions Default { get; } = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + WriteIndented = false + }; +} diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 1752bf81bd..b7394443c1 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -26,6 +26,7 @@ + diff --git a/src/ServiceControl.Audit/WebApplicationExtensions.cs b/src/ServiceControl.Audit/WebApplicationExtensions.cs index 76785dd77d..e8edece77f 100644 --- a/src/ServiceControl.Audit/WebApplicationExtensions.cs +++ b/src/ServiceControl.Audit/WebApplicationExtensions.cs @@ -8,7 +8,7 @@ namespace ServiceControl.Audit; public static class WebApplicationExtensions { - public static void UseServiceControlAudit(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings) + public static void UseServiceControlAudit(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings, bool enableMcpServer) { app.UseServiceControlForwardedHeaders(forwardedHeadersSettings); app.UseServiceControlHttps(httpsSettings); @@ -17,5 +17,10 @@ public static void UseServiceControlAudit(this WebApplication app, ForwardedHead app.UseHttpLogging(); app.UseCors(); app.MapControllers(); + + if (enableMcpServer) + { + app.MapMcp("/mcp"); + } } } \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index 6873e229b3..5de2540e03 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -37,6 +37,7 @@ }, "NotificationsFilter": null, "AllowMessageEditing": false, + "EnableMcpServer": false, "EnableIntegratedServicePulse": false, "ServicePulseSettings": null, "MessageFilter": null, diff --git a/src/ServiceControl/App.config b/src/ServiceControl/App.config index d6271805e5..698755c9a7 100644 --- a/src/ServiceControl/App.config +++ b/src/ServiceControl/App.config @@ -5,6 +5,8 @@ These settings are only here so that we can debug ServiceControl while developin --> + + diff --git a/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs b/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs index 105f756daf..932e301047 100644 --- a/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs +++ b/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs @@ -26,7 +26,7 @@ public override async Task Execute(HostArguments args, Settings settings) var hostBuilder = Host.CreateApplicationBuilder(); hostBuilder.AddServiceControl(settings, endpointConfiguration); - hostBuilder.AddServiceControlApi(settings.CorsSettings); + hostBuilder.AddServiceControlApi(settings); using var app = hostBuilder.Build(); await app.StartAsync(); diff --git a/src/ServiceControl/Hosting/Commands/RunCommand.cs b/src/ServiceControl/Hosting/Commands/RunCommand.cs index ebc08958cf..9778db2cc0 100644 --- a/src/ServiceControl/Hosting/Commands/RunCommand.cs +++ b/src/ServiceControl/Hosting/Commands/RunCommand.cs @@ -27,10 +27,10 @@ public override async Task Execute(HostArguments args, Settings settings) hostBuilder.AddServiceControlAuthentication(settings.OpenIdConnectSettings); hostBuilder.AddServiceControlHttps(settings.HttpsSettings); hostBuilder.AddServiceControl(settings, endpointConfiguration); - hostBuilder.AddServiceControlApi(settings.CorsSettings); + hostBuilder.AddServiceControlApi(settings); var app = hostBuilder.Build(); - app.UseServiceControl(settings.ForwardedHeadersSettings, settings.HttpsSettings); + app.UseServiceControl(settings.ForwardedHeadersSettings, settings.HttpsSettings, settings.EnableMcpServer); if (settings.EnableIntegratedServicePulse) { app.UseServicePulse(settings.ServicePulseSettings); diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs index d71b9dca66..24e7082863 100644 --- a/src/ServiceControl/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs @@ -81,6 +81,7 @@ public Settings( DisableExternalIntegrationsPublishing = SettingsReader.Read(SettingsRootNamespace, "DisableExternalIntegrationsPublishing", false); TrackInstancesInitialValue = SettingsReader.Read(SettingsRootNamespace, "TrackInstancesInitialValue", true); ShutdownTimeout = SettingsReader.Read(SettingsRootNamespace, "ShutdownTimeout", ShutdownTimeout); + EnableMcpServer = SettingsReader.Read(SettingsRootNamespace, "EnableMcpServer", false); AssemblyLoadContextResolver = static assemblyPath => new PluginAssemblyLoadContext(assemblyPath); } @@ -113,6 +114,8 @@ public Settings( public bool AllowMessageEditing { get; set; } + public bool EnableMcpServer { get; set; } + public bool EnableIntegratedServicePulse { get; set; } public ServicePulseSettings ServicePulseSettings { get; set; } diff --git a/src/ServiceControl/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs b/src/ServiceControl/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs index 298885ae0f..17dc44d5d3 100644 --- a/src/ServiceControl/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl/Infrastructure/WebApi/HostApplicationBuilderExtensions.cs @@ -9,10 +9,11 @@ using Microsoft.Extensions.Hosting; using Particular.LicensingComponent.WebApi; using Particular.ServiceControl; + using ServiceBus.Management.Infrastructure.Settings; static class HostApplicationBuilderExtensions { - public static void AddServiceControlApi(this IHostApplicationBuilder builder, CorsSettings corsSettings) + public static void AddServiceControlApi(this IHostApplicationBuilder builder, Settings settings) { // This registers concrete classes that implement IApi. Currently it is hard to find out to what // component those APIs should belong to so we leave it here for now. @@ -20,7 +21,15 @@ public static void AddServiceControlApi(this IHostApplicationBuilder builder, Co builder.AddServiceControlApis(); - builder.Services.AddCors(options => options.AddDefaultPolicy(Cors.GetDefaultPolicy(corsSettings))); + if (settings.EnableMcpServer) + { + builder.Services + .AddMcpServer() + .WithHttpTransport() + .WithToolsFromAssembly(); + } + + builder.Services.AddCors(options => options.AddDefaultPolicy(Cors.GetDefaultPolicy(settings.CorsSettings))); // We're not explicitly adding Gzip here because it's already in the default list of supported compressors builder.Services.AddResponseCompression(); diff --git a/src/ServiceControl/Mcp/ArchiveTools.cs b/src/ServiceControl/Mcp/ArchiveTools.cs new file mode 100644 index 0000000000..86abe21de0 --- /dev/null +++ b/src/ServiceControl/Mcp/ArchiveTools.cs @@ -0,0 +1,90 @@ +namespace ServiceControl.Mcp; + +using System.ComponentModel; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using MessageFailures.InternalMessages; +using ModelContextProtocol.Server; +using NServiceBus; +using Persistence.Recoverability; +using ServiceControl.Recoverability; + +[McpServerToolType] +public class ArchiveTools(IMessageSession messageSession, IArchiveMessages archiver) +{ + [McpServerTool, Description("Archive a single failed message by its unique ID. The message will be moved to the archived status.")] + public async Task ArchiveFailedMessage( + [Description("The unique ID of the failed message to archive")] string failedMessageId) + { + await messageSession.SendLocal(m => m.FailedMessageId = failedMessageId); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Archive requested for message '{failedMessageId}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Archive multiple failed messages by their unique IDs. All specified messages will be moved to the archived status.")] + public async Task ArchiveFailedMessages( + [Description("Array of unique message IDs to archive")] string[] messageIds) + { + if (messageIds.Any(string.IsNullOrEmpty)) + { + return JsonSerializer.Serialize(new { Error = "All message IDs must be non-empty strings." }, McpJsonOptions.Default); + } + + foreach (var id in messageIds) + { + await messageSession.SendLocal(m => m.FailedMessageId = id); + } + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Archive requested for {messageIds.Length} messages." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Archive all failed messages in a specific failure group. Failure groups are collections of messages grouped by exception type and stack trace.")] + public async Task ArchiveFailureGroup( + [Description("The ID of the failure group to archive")] string groupId) + { + if (archiver.IsOperationInProgressFor(groupId, ArchiveType.FailureGroup)) + { + return JsonSerializer.Serialize(new { Status = "InProgress", Message = $"An archive operation is already in progress for group '{groupId}'." }, McpJsonOptions.Default); + } + + await archiver.StartArchiving(groupId, ArchiveType.FailureGroup); + await messageSession.SendLocal(m => m.GroupId = groupId); + + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Archive requested for all messages in failure group '{groupId}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Unarchive a single failed message by its unique ID. The message will be moved back to the unresolved status.")] + public async Task UnarchiveFailedMessage( + [Description("The unique ID of the failed message to unarchive")] string failedMessageId) + { + await messageSession.SendLocal(m => m.FailedMessageIds = [failedMessageId]); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Unarchive requested for message '{failedMessageId}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Unarchive multiple failed messages by their unique IDs. All specified messages will be moved back to the unresolved status.")] + public async Task UnarchiveFailedMessages( + [Description("Array of unique message IDs to unarchive")] string[] messageIds) + { + if (messageIds.Any(string.IsNullOrEmpty)) + { + return JsonSerializer.Serialize(new { Error = "All message IDs must be non-empty strings." }, McpJsonOptions.Default); + } + + await messageSession.SendLocal(m => m.FailedMessageIds = messageIds); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Unarchive requested for {messageIds.Length} messages." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Unarchive all failed messages in a specific failure group. Failure groups are collections of messages grouped by exception type and stack trace.")] + public async Task UnarchiveFailureGroup( + [Description("The ID of the failure group to unarchive")] string groupId) + { + if (archiver.IsOperationInProgressFor(groupId, ArchiveType.FailureGroup)) + { + return JsonSerializer.Serialize(new { Status = "InProgress", Message = $"An archive operation is already in progress for group '{groupId}'." }, McpJsonOptions.Default); + } + + await archiver.StartUnarchiving(groupId, ArchiveType.FailureGroup); + await messageSession.SendLocal(m => m.GroupId = groupId); + + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Unarchive requested for all messages in failure group '{groupId}'." }, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl/Mcp/FailedMessageTools.cs b/src/ServiceControl/Mcp/FailedMessageTools.cs new file mode 100644 index 0000000000..79c6d47e96 --- /dev/null +++ b/src/ServiceControl/Mcp/FailedMessageTools.cs @@ -0,0 +1,94 @@ +#nullable enable + +namespace ServiceControl.Mcp; + +using System.ComponentModel; +using System.Text.Json; +using System.Threading.Tasks; +using MessageFailures.Api; +using ModelContextProtocol.Server; +using Persistence; +using Persistence.Infrastructure; + +[McpServerToolType] +public class FailedMessageTools(IErrorMessageDataStore store) +{ + [McpServerTool, Description("Get a list of failed messages. Supports filtering by status (unresolved, resolved, archived, retryissued), modified date, and queue address. Returns paged results.")] + public async Task GetFailedMessages( + [Description("Filter by status: unresolved, resolved, archived, retryissued")] string? status = null, + [Description("Filter by modified date (ISO 8601 format)")] string? modified = null, + [Description("Filter by queue address")] string? queueAddress = null, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, message_type, time_of_failure. Default is time_of_failure")] string sort = "time_of_failure", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc") + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + + var results = await store.ErrorGet(status, modified, queueAddress, pagingInfo, sortInfo); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get details of a specific failed message by its unique ID.")] + public async Task GetFailedMessageById( + [Description("The unique ID of the failed message")] string failedMessageId) + { + var result = await store.ErrorBy(failedMessageId); + + if (result == null) + { + return JsonSerializer.Serialize(new { Error = $"Failed message '{failedMessageId}' not found." }, McpJsonOptions.Default); + } + + return JsonSerializer.Serialize(result, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get the last processing attempt for a specific failed message.")] + public async Task GetFailedMessageLastAttempt( + [Description("The unique ID of the failed message")] string failedMessageId) + { + var result = await store.ErrorLastBy(failedMessageId); + + if (result == null) + { + return JsonSerializer.Serialize(new { Error = $"Failed message '{failedMessageId}' not found." }, McpJsonOptions.Default); + } + + return JsonSerializer.Serialize(result, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get a summary of error counts grouped by status (unresolved, archived, resolved, retryissued).")] + public async Task GetErrorsSummary() + { + var result = await store.ErrorsSummary(); + return JsonSerializer.Serialize(result, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get failed messages for a specific endpoint.")] + public async Task GetFailedMessagesByEndpoint( + [Description("The name of the endpoint")] string endpointName, + [Description("Filter by status: unresolved, resolved, archived, retryissued")] string? status = null, + [Description("Filter by modified date (ISO 8601 format)")] string? modified = null, + [Description("Page number (1-based). Default is 1")] int page = 1, + [Description("Number of results per page. Default is 50")] int perPage = 50, + [Description("Sort field: time_sent, message_type, time_of_failure. Default is time_of_failure")] string sort = "time_of_failure", + [Description("Sort direction: asc or desc. Default is desc")] string direction = "desc") + { + var pagingInfo = new PagingInfo(page, perPage); + var sortInfo = new SortInfo(sort, direction); + + var results = await store.ErrorsByEndpointName(status, endpointName, modified, pagingInfo, sortInfo); + + return JsonSerializer.Serialize(new + { + results.QueryStats.TotalCount, + results.Results + }, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl/Mcp/FailureGroupTools.cs b/src/ServiceControl/Mcp/FailureGroupTools.cs new file mode 100644 index 0000000000..ec311f4ff8 --- /dev/null +++ b/src/ServiceControl/Mcp/FailureGroupTools.cs @@ -0,0 +1,30 @@ +#nullable enable + +namespace ServiceControl.Mcp; + +using System.ComponentModel; +using System.Text.Json; +using System.Threading.Tasks; +using ModelContextProtocol.Server; +using Persistence; +using Recoverability; + +[McpServerToolType] +public class FailureGroupTools(GroupFetcher fetcher, IRetryHistoryDataStore retryStore) +{ + [McpServerTool, Description("Get failure groups, which are collections of failed messages grouped by a classifier (default: exception type and stack trace). Each group shows the count of failures, the first and last occurrence, and any retry operation status.")] + public async Task GetFailureGroups( + [Description("The classifier to group by. Default is 'Exception Type and Stack Trace'")] string classifier = "Exception Type and Stack Trace", + [Description("Optional filter for the classifier")] string? classifierFilter = null) + { + var results = await fetcher.GetGroups(classifier, classifierFilter); + return JsonSerializer.Serialize(results, McpJsonOptions.Default); + } + + [McpServerTool, Description("Get the retry history showing past retry operations and their outcomes.")] + public async Task GetRetryHistory() + { + var retryHistory = await retryStore.GetRetryHistory(); + return JsonSerializer.Serialize(retryHistory, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl/Mcp/McpJsonOptions.cs b/src/ServiceControl/Mcp/McpJsonOptions.cs new file mode 100644 index 0000000000..1e37e52d37 --- /dev/null +++ b/src/ServiceControl/Mcp/McpJsonOptions.cs @@ -0,0 +1,14 @@ +namespace ServiceControl.Mcp; + +using System.Text.Json; +using System.Text.Json.Serialization; + +static class McpJsonOptions +{ + public static JsonSerializerOptions Default { get; } = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + WriteIndented = false + }; +} diff --git a/src/ServiceControl/Mcp/RetryTools.cs b/src/ServiceControl/Mcp/RetryTools.cs new file mode 100644 index 0000000000..7d41f9d2f2 --- /dev/null +++ b/src/ServiceControl/Mcp/RetryTools.cs @@ -0,0 +1,84 @@ +namespace ServiceControl.Mcp; + +using System.ComponentModel; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using MessageFailures; +using MessageFailures.InternalMessages; +using ModelContextProtocol.Server; +using NServiceBus; +using Recoverability; +using Persistence; + +[McpServerToolType] +public class RetryTools(IMessageSession messageSession, RetryingManager retryingManager) +{ + [McpServerTool, Description("Retry a single failed message by its unique ID. The message will be sent back to its original queue for reprocessing.")] + public async Task RetryFailedMessage( + [Description("The unique ID of the failed message to retry")] string failedMessageId) + { + await messageSession.SendLocal(m => m.FailedMessageId = failedMessageId); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Retry requested for message '{failedMessageId}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Retry multiple failed messages by their unique IDs. All specified messages will be sent back to their original queues for reprocessing.")] + public async Task RetryFailedMessages( + [Description("Array of unique message IDs to retry")] string[] messageIds) + { + if (messageIds.Any(string.IsNullOrEmpty)) + { + return JsonSerializer.Serialize(new { Error = "All message IDs must be non-empty strings." }, McpJsonOptions.Default); + } + + await messageSession.SendLocal(m => m.MessageUniqueIds = messageIds); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Retry requested for {messageIds.Length} messages." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Retry all failed messages from a specific queue address.")] + public async Task RetryFailedMessagesByQueue( + [Description("The queue address to retry all failed messages from")] string queueAddress) + { + await messageSession.SendLocal(m => + { + m.QueueAddress = queueAddress; + m.Status = FailedMessageStatus.Unresolved; + }); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Retry requested for all failed messages in queue '{queueAddress}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Retry all failed messages across all queues. Use with caution as this affects all unresolved failed messages.")] + public async Task RetryAllFailedMessages() + { + await messageSession.SendLocal(new RequestRetryAll()); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = "Retry requested for all failed messages." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Retry all failed messages for a specific endpoint.")] + public async Task RetryAllFailedMessagesByEndpoint( + [Description("The name of the endpoint to retry all failed messages for")] string endpointName) + { + await messageSession.SendLocal(new RequestRetryAll { Endpoint = endpointName }); + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Retry requested for all failed messages in endpoint '{endpointName}'." }, McpJsonOptions.Default); + } + + [McpServerTool, Description("Retry all failed messages in a specific failure group. Failure groups are collections of messages grouped by exception type and stack trace.")] + public async Task RetryFailureGroup( + [Description("The ID of the failure group to retry")] string groupId) + { + if (retryingManager.IsOperationInProgressFor(groupId, RetryType.FailureGroup)) + { + return JsonSerializer.Serialize(new { Status = "InProgress", Message = $"A retry operation is already in progress for group '{groupId}'." }, McpJsonOptions.Default); + } + + var started = System.DateTime.UtcNow; + await retryingManager.Wait(groupId, RetryType.FailureGroup, started); + await messageSession.SendLocal(new RetryAllInGroup + { + GroupId = groupId, + Started = started + }); + + return JsonSerializer.Serialize(new { Status = "Accepted", Message = $"Retry requested for all messages in failure group '{groupId}'." }, McpJsonOptions.Default); + } +} diff --git a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs index 2e317cba54..ae852de26e 100644 --- a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs @@ -21,7 +21,7 @@ public async Task Handle(ArchiveMessage message, IMessageHandlerContext context) var failedMessage = await dataStore.ErrorBy(failedMessageId); - if (failedMessage.Status != FailedMessageStatus.Archived) + if (failedMessage is not null && failedMessage.Status != FailedMessageStatus.Archived) { await domainEvents.Raise(new FailedMessageArchived { diff --git a/src/ServiceControl/ServiceControl.csproj b/src/ServiceControl/ServiceControl.csproj index d931751d34..2475998650 100644 --- a/src/ServiceControl/ServiceControl.csproj +++ b/src/ServiceControl/ServiceControl.csproj @@ -33,6 +33,7 @@ + diff --git a/src/ServiceControl/WebApplicationExtensions.cs b/src/ServiceControl/WebApplicationExtensions.cs index 685bc7dc16..4d3be18f2c 100644 --- a/src/ServiceControl/WebApplicationExtensions.cs +++ b/src/ServiceControl/WebApplicationExtensions.cs @@ -3,13 +3,15 @@ namespace ServiceControl; using Infrastructure.SignalR; using Infrastructure.WebApi; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.HttpOverrides; +using ModelContextProtocol.AspNetCore; using ServiceControl.Hosting.ForwardedHeaders; using ServiceControl.Hosting.Https; using ServiceControl.Infrastructure; public static class WebApplicationExtensions { - public static void UseServiceControl(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings) + public static void UseServiceControl(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings, bool enableMcpServer) { app.UseServiceControlForwardedHeaders(forwardedHeadersSettings); app.UseServiceControlHttps(httpsSettings); @@ -19,5 +21,10 @@ public static void UseServiceControl(this WebApplication app, ForwardedHeadersSe app.MapHub("/api/messagestream"); app.UseCors(); app.MapControllers(); + + if (enableMcpServer) + { + app.MapMcp("/mcp"); + } } } \ No newline at end of file