fix: Agent tool proxy, chunk storage, and polling overhaul#269
fix: Agent tool proxy, chunk storage, and polling overhaul#269
Conversation
- Replace LIST-based chunk accumulation with SET-based snapshot/terminal keys (AgentSnapshot/AgentTerminal) to prevent memory growth - Add tool proxy system: main process exports tool definitions to Redis, agent process registers them as proxy tools routed via IPC - Generalize TelegramTaskConsumer to handle any tool (not just send_message) with scoped DI and bounded concurrency (SemaphoreSlim) - Add ProxyToolRegistry to McpToolHelper with RegisterProxyTools() and ExportToolDefinitions() methods - Add ExecuteRegisteredToolAsync overload accepting scoped IServiceProvider - Update GarnetClient: PublishSnapshotAsync (SET) and PublishTerminalAsync (SET) replace PublishChunkAsync (RPUSH) - Rewrite ChunkPollingService for GET-based polling with change detection - Add generic ExecuteRemoteToolAsync to ToolExecutor for agent-side proxy - Register proxy tools from Redis in LLMAgentProgram startup - Export tool definitions to Redis in GeneralBootstrap after init - Update all tests for new SET-based semantics and Expiration overload Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
📝 WalkthroughWalkthroughThe pull request refactors the LLM agent's task state storage from list-based chunk accumulation to snapshot/terminal-based Redis strings, introduces proxy tool contracts and registration mechanisms, and updates the agent startup and task processing pipelines to support remote tool execution with concurrent task handling. Changes
Sequence DiagramssequenceDiagram
participant Agent as Agent Service
participant Boot as GeneralBootstrap
participant Redis as Redis
participant McpTool as McpToolHelper
Boot->>McpTool: ExportToolDefinitions()
McpTool-->>Boot: List<ProxyToolDefinition>
Boot->>Redis: StringSetAsync(AgentToolDefs, JSON, TTL=24h)
Redis-->>Boot: OK
participant AgentProg as LLMAgentProgram
AgentProg->>Redis: StringGetAsync(AgentToolDefs)
Redis-->>AgentProg: JSON bytes
AgentProg->>AgentProg: Deserialize → List<ProxyToolDefinition>
AgentProg->>McpTool: RegisterProxyTools(definitions, executor)
McpTool->>McpTool: Add to ProxyToolRegistry<br/>Cache XML definitions
sequenceDiagram
participant Agent as AgentLoopService
participant Garnet as GarnetClient
participant Redis as Redis
participant Poller as ChunkPollingService
Agent->>Garnet: PublishSnapshotAsync(snapshot chunk)
Garnet->>Redis: StringSetAsync(AgentSnapshot(taskId), JSON, TTL=1h)
Redis-->>Garnet: OK
Agent->>Garnet: PublishTerminalAsync(done chunk)
Garnet->>Redis: StringSetAsync(AgentTerminal(taskId), JSON, TTL=1h)
Redis-->>Garnet: OK
Poller->>Redis: StringGetAsync(AgentTerminal(taskId))
Redis-->>Poller: JSON (if exists)
Poller->>Poller: Terminal found → emit + complete task
Poller->>Redis: KeyDeleteAsync(AgentSnapshot, AgentTerminal)
sequenceDiagram
participant Agent as Agent
participant ToolExec as ToolExecutor
participant Queue as Redis TaskQueue
participant Consumer as TelegramTaskConsumer
participant McpTool as McpToolHelper
Agent->>ToolExec: ExecuteRemoteToolAsync(toolName, args, ...)
ToolExec->>Queue: Enqueue(TelegramAgentToolTask)
Queue-->>ToolExec: OK
ToolExec->>Queue: Await RPC result (timeout=60s)
Consumer->>Queue: Dequeue(task)
Consumer->>McpTool: ExecuteRegisteredToolAsync(toolName, args, scope)
McpTool-->>Consumer: result
Consumer->>Queue: SetResult(taskId, result)
Queue-->>ToolExec: RPC result received
ToolExec-->>Agent: result string
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔍 PR检查报告📋 检查概览
🧪 测试结果
📊 代码质量
📁 测试产物
🔗 相关链接此报告由GitHub Actions自动生成 |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs (1)
83-94:⚠️ Potential issue | 🟡 MinorFire-and-forget
snapshotsTaskmay swallow enumerator exceptions.In
ConcurrentSessions_IsolateQueuedTasksAndStreams(lines 83-84) andRecoveryFlow_RequeuesTimedOutTaskAndCompletesOnRetry(line 150),snapshotsTask[1|2]/snapshotsTaskare started but never awaited. IfReadSnapshotsAsyncthrows (e.g., a channel writer fault), the test won't observe it — you'd only notice via unobserved-task warnings. Consider awaiting them (orTask.WhenAll(...)) afterDrainUntilCompletedAsyncso enumerator faults surface.Also applies to: 150-155
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs` around lines 83 - 94, The test starts ReadSnapshotsAsync (snapshotsTask1, snapshotsTask2 or snapshotsTask) but never awaits them, so any exceptions from their enumerator are lost; after awaiting DrainUntilCompletedAsync(...) (and before asserting/completing), await the snapshot tasks (e.g., await Task.WhenAll(snapshotsTask1, snapshotsTask2) or await snapshotsTask) to surface enumerator faults from ReadSnapshotsAsync; update both ConcurrentSessions_IsolateQueuedTasksAndStreams (snapshotsTask1/2) and RecoveryFlow_RequeuesTimedOutTaskAndCompletesOnRetry (snapshotsTask) to await the snapshot tasks before finishing the test.TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs (1)
711-724:⚠️ Potential issue | 🟡 MinorMisleading error when a proxy tool is registered but the executor is missing.
If
ProxyToolRegistry.ContainsKey(toolName)is true but_proxyToolExecutoris null (e.g.RegisterProxyToolswas never called with an executor, or future refactor splits them), this branch silently falls through to the external-tool check and then theToolRegistrycheck, which throwsArgumentException($"Tool '{toolName}' not registered."). That's confusing since the tool is registered as a proxy — it just can't be routed. Prefer an explicit diagnostic:- // Check if this is a proxy tool (routed to remote process via IPC) - if (ProxyToolRegistry.ContainsKey(toolName) && _proxyToolExecutor != null) { - var proxyResult = await _proxyToolExecutor(toolName, stringArguments); - return proxyResult; - } + // Check if this is a proxy tool (routed to remote process via IPC) + if (ProxyToolRegistry.ContainsKey(toolName)) { + if (_proxyToolExecutor == null) { + throw new InvalidOperationException( + $"Proxy tool '{toolName}' is registered but no proxy executor is configured."); + } + return await _proxyToolExecutor(toolName, stringArguments); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs` around lines 711 - 724, If ProxyToolRegistry contains the tool but _proxyToolExecutor is null, the code should not fall through to external or ToolRegistry checks; add an explicit check after the ProxyToolRegistry.ContainsKey(toolName) test that throws a clear exception (e.g., InvalidOperationException) stating the tool is registered as a proxy but no _proxyToolExecutor is configured, referencing ProxyToolRegistry and _proxyToolExecutor; keep the existing ExternalToolRegistry/ExecuteExternalToolAsync and ToolRegistry logic unchanged.
🧹 Nitpick comments (6)
TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs (1)
43-48: Unusedsnapshotslist after assertion change.
snapshotsis collected but no longer asserted. Minor: either drop the local or keep a sanity-check assertion (e.g.,Assert.NotNull(snapshots)/ a bounded-count assertion) to confirm the stream channel completed cleanly. Keeping the awaited read is still useful to surface enumerator exceptions, so the await itself is worth keeping.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs` around lines 43 - 48, The local snapshots variable is awaited but never asserted; either remove the unused local and simply await snapshotsTask to preserve exception propagation, or add a lightweight sanity check such as Assert.NotNull(snapshots) or a bounded-count assertion (e.g., Assert.InRange(snapshots.Count, 0, 2)) to confirm the stream channel completed cleanly; update the code around snapshotsTask and the terminal/handle.Completion check (variables snapshots, snapshotsTask, handle, terminal, AgentChunkType.Done) accordingly so the awaited read is retained for enumerator exceptions while eliminating the unused local or validating its contents.TelegramSearchBot.LLMAgent/LLMAgentProgram.cs (1)
49-83: A few small concerns inRegisterProxyToolsFromRedisAsync.
Scoped ToolExecutor resolved from the root provider (line 64).
ToolExecutoris registered asAddScoped(seeBuildServicesline 110), but it's pulled directly from the rootServiceProviderand captured in the closure, making it a process-lifetime singleton for all subsequent proxy tool invocations. It happens to work becauseGarnetClient/GarnetRpcClientare singletons, but the scoped registration is then misleading and would break if anyone adds a scoped dependency (e.g.,DataDbContext) toToolExecutor. Either registerToolExecutoras singleton, or create a scope per invocation inside the lambda.Silent
long.TryParsefailures (lines 71-73). If__chatId/__userId/__messageIdare present but malformed, the parse result is discarded and the remote call proceeds with0. ALogWarningon parse failure would make misrouting diagnosable.Mutating caller's
argumentsdict (lines 71-73).arguments.Remove(...)mutates the dictionary the caller passes in. IfMcpToolHelperever needs to retry or log the original arguments, the reserved keys will be gone. Consider copying:var args = new Dictionary<string, string>(arguments, StringComparer.Ordinal);before extracting.🛠️ Proposed diff
- var toolExecutor = services.GetRequiredService<Service.ToolExecutor>(); - - // Register proxy tools with an executor that routes to the main process via Redis IPC - McpToolHelper.RegisterProxyTools(toolDefs, async (toolName, arguments) => { - // Resolve chatId/userId/messageId from ToolContext if needed - // These will be set by the calling code in McpToolHelper before invoking - long remoteChatId = 0, remoteUserId = 0, remoteMessageId = 0; - if (arguments.TryGetValue("__chatId", out var cid)) { long.TryParse(cid, out remoteChatId); arguments.Remove("__chatId"); } - if (arguments.TryGetValue("__userId", out var uid)) { long.TryParse(uid, out remoteUserId); arguments.Remove("__userId"); } - if (arguments.TryGetValue("__messageId", out var mid)) { long.TryParse(mid, out remoteMessageId); arguments.Remove("__messageId"); } - - return await toolExecutor.ExecuteRemoteToolAsync( - toolName, arguments, remoteChatId, remoteUserId, remoteMessageId, CancellationToken.None); - }); + // Register proxy tools with an executor that routes to the main process via Redis IPC. + // A fresh DI scope is created per invocation so scoped dependencies (future-proofing) are honored. + var rootProvider = services; + McpToolHelper.RegisterProxyTools(toolDefs, async (toolName, arguments) => { + using var scope = rootProvider.CreateScope(); + var toolExecutor = scope.ServiceProvider.GetRequiredService<Service.ToolExecutor>(); + + var args = new Dictionary<string, string>(arguments); + long remoteChatId = 0, remoteUserId = 0, remoteMessageId = 0; + if (args.Remove("__chatId", out var cid) && !long.TryParse(cid, out remoteChatId)) { + logger.LogWarning("Proxy tool {Tool}: invalid __chatId '{Value}'", toolName, cid); + } + if (args.Remove("__userId", out var uid) && !long.TryParse(uid, out remoteUserId)) { + logger.LogWarning("Proxy tool {Tool}: invalid __userId '{Value}'", toolName, uid); + } + if (args.Remove("__messageId", out var mid) && !long.TryParse(mid, out remoteMessageId)) { + logger.LogWarning("Proxy tool {Tool}: invalid __messageId '{Value}'", toolName, mid); + } + + return await toolExecutor.ExecuteRemoteToolAsync( + toolName, args, remoteChatId, remoteUserId, remoteMessageId, CancellationToken.None); + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/LLMAgentProgram.cs` around lines 49 - 83, RegisterProxyToolsFromRedisAsync captures a scoped ToolExecutor from the root provider and mutates the incoming arguments while silently swallowing malformed __chatId/__userId/__messageId parses; fix by (1) not resolving ToolExecutor once from the root—inside the McpToolHelper.RegisterProxyTools lambda create a scope (services.CreateScope()) and resolve Service.ToolExecutor from scope.ServiceProvider for each invocation so scoped deps remain valid, (2) copy the incoming arguments before modifying (e.g., new Dictionary<string,string>(arguments,...)) so the caller’s dictionary isn’t mutated, and (3) when long.TryParse fails for __chatId/__userId/__messageId, log a warning with the offending value (use the logger) instead of silently proceeding with 0, then call toolExecutor.ExecuteRemoteToolAsync with the parsed IDs.TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs (1)
92-93: Fire-and-forgetKeyDeleteAsyncsilently swallows Redis errors.
_ = db.KeyDeleteAsync(...)discards the returned Task, so anyRedisExceptionduring cleanup is unobserved. TTL acts as a safety net (as the comment notes), but it's worth at least aContinueWiththat logs, or awaiting the deletes — they're already inside an async method.🛠️ Simple fix
- // Cleanup keys (use TTL as safety net, no race condition) - _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); - _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); + // Cleanup keys (TTL is safety net) + await db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); + await db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId));(and analogous change in
CompleteTrackedTaskAsync.)Also applies to: 153-154
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs` around lines 92 - 93, The fire-and-forget calls to db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)) and db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)) silently drop exceptions; change these to either await the Tasks or attach a ContinueWith that logs failures so RedisExceptions are observed (do the same fix in CompleteTrackedTaskAsync for the analogous calls at the other location). Ensure you use the same ILogger/processLogger used in this class to record any exception and include context (taskId and key name) in the message.TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs (1)
23-23: Make the concurrency cap configurable.The
4, 4is a magic number that ties throughput to a hardcoded constant. Consider pulling it fromEnv(e.g.Env.TelegramToolConcurrency) or a configuration value so it can be tuned per deployment without a rebuild. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs` at line 23, The hardcoded SemaphoreSlim(4, 4) concurrency cap in TelegramTaskConsumer should be made configurable: read an integer concurrency value (e.g., Env.TelegramToolConcurrency or via injected IConfiguration/IOptions) with a sensible default (4) and validate it (>=1), then initialize the field _concurrencyLimiter using that value for both initialCount and maxCount; update the TelegramTaskConsumer constructor to accept/configure this setting and use it when constructing _concurrencyLimiter so deployments can tune throughput without rebuilding.TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs (1)
1068-1108:RegisterProxyToolsis effectively a full-replacement operation — please document.
ProxyToolRegistry.Clear()and_sCachedProxyToolsXml = sb.ToString()at the bottom mean every call wipes previously-registered proxy tools. That's fine for the single-shot startup usage inLLMAgentProgram.cs, but the XML doc on the method doesn't call it out. Callers that might want to incrementally add proxy tools (e.g. on reconnect) would be surprised. Consider a brief note in the summary, or splitting intoSetProxyToolsvsAddProxyToolsif incremental use is anticipated.Also note:
_proxyToolExecutoris assigned without any synchronization. In the current one-time-at-startup flow this is fine, but if this ever becomes callable at runtime you'll wantVolatile.Writeor a lock to pair with the executor read on line 713.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs` around lines 1068 - 1108, RegisterProxyTools currently performs a full-replacement of proxy tool state (it calls ProxyToolRegistry.Clear() and overwrites _sCachedProxyToolsXml) and assigns _proxyToolExecutor without any synchronization; update the method's XML doc summary to explicitly state it is a full replace operation (or refactor into two methods such as SetProxyTools for full replace and AddProxyTools for incremental addition), and if RegisterProxyTools may be called at runtime make the executor assignment thread-safe (use Volatile.Write(ref _proxyToolExecutor, executor) or a lock to match the executor read elsewhere) so callers are not surprised by the destructive behavior or race conditions.TelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetClientTests.cs (1)
22-71: Strengthen TTL assertions in snapshot and terminal tests to prevent regressions.The implementation correctly passes
ChunkTtl(1 hour) toStringSetAsync, but the tests useIt.IsAny<Expiration>()without capturing or asserting the expiry argument. A future regression that accidentally drops the TTL would pass these tests silently, reintroducing unbounded memory growth. Capture theExpirationparameter in theCallbackand assert it matches the expected TTL to lock in this fix.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetClientTests.cs` around lines 22 - 71, The tests for PublishSnapshotAsync and PublishTerminalAsync currently ignore the Expiration passed to IDatabase.StringSetAsync; capture the Expiration in the Setup Callback alongside RedisKey and RedisValue (the Callback used for StringSetAsync in these tests) and assert that the captured Expiration equals GarnetClient.ChunkTtl (1 hour) or has the expected TimeSpan/seconds value so the TTL is verified; update the Callback signatures in the GarnetClientTests for both PublishSnapshotAsync and PublishTerminalAsync to accept and store the Expiration and add an Assert comparing the captured expiration to the expected ChunkTtl.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@TelegramSearchBot/AppBootstrap/GeneralBootstrap.cs`:
- Around line 171-181: The current startup writes tool definitions to Redis with
a 24h TTL which can expire and silently degrade agents; update the export logic
that uses McpToolHelper.ExportToolDefinitions(), IConnectionMultiplexer and
redis.GetDatabase().StringSetAsync(LlmAgentRedisKeys.AgentToolDefs, json, ...)
to either write the key without an expiry (omit the TimeSpan) or implement a
periodic refresh (e.g., move publishing into an IHostedService/timer that
re-runs ExportToolDefinitions and calls StringSetAsync to refresh TTL), and
change the catch from Log.Warning(...) to Log.Error(ex, ...) so failures are
surfaced as errors.
In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs`:
- Around line 76-117: PollTaskAsync can hang if a snapshot exists but no
terminal chunk arrives; call TryCompleteFromTaskStateAsync not only when
snapshot is missing but also after processing (or detecting no-change in) the
snapshot. Update PollTaskAsync to invoke TryCompleteFromTaskStateAsync(taskId,
tracked, cancellationToken) after you handle snapshot cases (e.g., after
comparing/updating tracked.LastSnapshotJson and after the early return when
snapshotStr == tracked.LastSnapshotJson) so that task state transitions
(Failed/Cancelled) from AgentRegistryService are honored even when AgentSnapshot
remains present; keep existing snapshot/terminal delivery logic
(AgentSnapshot/AgentTerminal, tracked.LastContent) intact.
In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs`:
- Around line 67-75: The fire-and-forget Task.Run currently leaks results and
semaphore permits; instead, track the spawned tasks and drain them on shutdown
and remove the stoppingToken argument from Task.Run. Concretely: when you call
_concurrencyLimiter.WaitAsync(...) and then start the worker, capture the Task
returned by Task.Run (do not pass stoppingToken to Task.Run) and add it to a
concurrent collection (e.g., _inflightTasks) and ensure the Task.Run delegate
still calls ExecuteToolTaskAsync/ExecuteSendMessageAsync and always
_concurrencyLimiter.Release() in a finally block; then override StopAsync to
await Task.WhenAll(_inflightTasks) (or await a drained snapshot with
cancellation awareness) so in-flight ExecuteToolTaskAsync writes
(TELEGRAM_RESULT:{requestId}) complete before shutdown; also remove passing
stoppingToken into Task.Run so the semaphore release is not skipped if the token
is already canceled.
---
Outside diff comments:
In `@TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs`:
- Around line 711-724: If ProxyToolRegistry contains the tool but
_proxyToolExecutor is null, the code should not fall through to external or
ToolRegistry checks; add an explicit check after the
ProxyToolRegistry.ContainsKey(toolName) test that throws a clear exception
(e.g., InvalidOperationException) stating the tool is registered as a proxy but
no _proxyToolExecutor is configured, referencing ProxyToolRegistry and
_proxyToolExecutor; keep the existing
ExternalToolRegistry/ExecuteExternalToolAsync and ToolRegistry logic unchanged.
In `@TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs`:
- Around line 83-94: The test starts ReadSnapshotsAsync (snapshotsTask1,
snapshotsTask2 or snapshotsTask) but never awaits them, so any exceptions from
their enumerator are lost; after awaiting DrainUntilCompletedAsync(...) (and
before asserting/completing), await the snapshot tasks (e.g., await
Task.WhenAll(snapshotsTask1, snapshotsTask2) or await snapshotsTask) to surface
enumerator faults from ReadSnapshotsAsync; update both
ConcurrentSessions_IsolateQueuedTasksAndStreams (snapshotsTask1/2) and
RecoveryFlow_RequeuesTimedOutTaskAndCompletesOnRetry (snapshotsTask) to await
the snapshot tasks before finishing the test.
---
Nitpick comments:
In `@TelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetClientTests.cs`:
- Around line 22-71: The tests for PublishSnapshotAsync and PublishTerminalAsync
currently ignore the Expiration passed to IDatabase.StringSetAsync; capture the
Expiration in the Setup Callback alongside RedisKey and RedisValue (the Callback
used for StringSetAsync in these tests) and assert that the captured Expiration
equals GarnetClient.ChunkTtl (1 hour) or has the expected TimeSpan/seconds value
so the TTL is verified; update the Callback signatures in the GarnetClientTests
for both PublishSnapshotAsync and PublishTerminalAsync to accept and store the
Expiration and add an Assert comparing the captured expiration to the expected
ChunkTtl.
In `@TelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.cs`:
- Around line 1068-1108: RegisterProxyTools currently performs a
full-replacement of proxy tool state (it calls ProxyToolRegistry.Clear() and
overwrites _sCachedProxyToolsXml) and assigns _proxyToolExecutor without any
synchronization; update the method's XML doc summary to explicitly state it is a
full replace operation (or refactor into two methods such as SetProxyTools for
full replace and AddProxyTools for incremental addition), and if
RegisterProxyTools may be called at runtime make the executor assignment
thread-safe (use Volatile.Write(ref _proxyToolExecutor, executor) or a lock to
match the executor read elsewhere) so callers are not surprised by the
destructive behavior or race conditions.
In `@TelegramSearchBot.LLMAgent/LLMAgentProgram.cs`:
- Around line 49-83: RegisterProxyToolsFromRedisAsync captures a scoped
ToolExecutor from the root provider and mutates the incoming arguments while
silently swallowing malformed __chatId/__userId/__messageId parses; fix by (1)
not resolving ToolExecutor once from the root—inside the
McpToolHelper.RegisterProxyTools lambda create a scope (services.CreateScope())
and resolve Service.ToolExecutor from scope.ServiceProvider for each invocation
so scoped deps remain valid, (2) copy the incoming arguments before modifying
(e.g., new Dictionary<string,string>(arguments,...)) so the caller’s dictionary
isn’t mutated, and (3) when long.TryParse fails for
__chatId/__userId/__messageId, log a warning with the offending value (use the
logger) instead of silently proceeding with 0, then call
toolExecutor.ExecuteRemoteToolAsync with the parsed IDs.
In `@TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs`:
- Around line 43-48: The local snapshots variable is awaited but never asserted;
either remove the unused local and simply await snapshotsTask to preserve
exception propagation, or add a lightweight sanity check such as
Assert.NotNull(snapshots) or a bounded-count assertion (e.g.,
Assert.InRange(snapshots.Count, 0, 2)) to confirm the stream channel completed
cleanly; update the code around snapshotsTask and the terminal/handle.Completion
check (variables snapshots, snapshotsTask, handle, terminal,
AgentChunkType.Done) accordingly so the awaited read is retained for enumerator
exceptions while eliminating the unused local or validating its contents.
In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs`:
- Around line 92-93: The fire-and-forget calls to
db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)) and
db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)) silently drop
exceptions; change these to either await the Tasks or attach a ContinueWith that
logs failures so RedisExceptions are observed (do the same fix in
CompleteTrackedTaskAsync for the analogous calls at the other location). Ensure
you use the same ILogger/processLogger used in this class to record any
exception and include context (taskId and key name) in the message.
In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs`:
- Line 23: The hardcoded SemaphoreSlim(4, 4) concurrency cap in
TelegramTaskConsumer should be made configurable: read an integer concurrency
value (e.g., Env.TelegramToolConcurrency or via injected
IConfiguration/IOptions) with a sensible default (4) and validate it (>=1), then
initialize the field _concurrencyLimiter using that value for both initialCount
and maxCount; update the TelegramTaskConsumer constructor to accept/configure
this setting and use it when constructing _concurrencyLimiter so deployments can
tune throughput without rebuilding.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 86c6fea6-ef40-4114-a9cd-dc4217738909
📒 Files selected for processing (13)
TelegramSearchBot.Common/Model/AI/LlmAgentContracts.csTelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetClientTests.csTelegramSearchBot.LLM/Service/AI/LLM/McpToolHelper.csTelegramSearchBot.LLMAgent/LLMAgentProgram.csTelegramSearchBot.LLMAgent/Service/AgentLoopService.csTelegramSearchBot.LLMAgent/Service/GarnetClient.csTelegramSearchBot.LLMAgent/Service/ToolExecutor.csTelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.csTelegramSearchBot.Test/Service/AI/LLM/ChunkPollingServiceTests.csTelegramSearchBot.Test/Service/AI/LLM/InMemoryRedisTestHarness.csTelegramSearchBot/AppBootstrap/GeneralBootstrap.csTelegramSearchBot/Service/AI/LLM/ChunkPollingService.csTelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs
| // Export tool definitions to Redis so agent processes can discover available tools | ||
| try { | ||
| var redis = service.GetRequiredService<IConnectionMultiplexer>(); | ||
| var toolDefs = McpToolHelper.ExportToolDefinitions(); | ||
| var json = JsonConvert.SerializeObject(toolDefs); | ||
| await redis.GetDatabase().StringSetAsync( | ||
| LlmAgentRedisKeys.AgentToolDefs, json, TimeSpan.FromHours(24)); | ||
| Log.Information("Exported {Count} tool definitions to Redis for agent discovery.", toolDefs.Count); | ||
| } catch (Exception ex) { | ||
| Log.Warning(ex, "Failed to export tool definitions to Redis. Agent processes may have limited tools."); | ||
| } |
There was a problem hiding this comment.
Tool-defs TTL may expire before the main process restarts.
The tool definitions are written once at startup with a 24h TTL, but the main process typically runs much longer than 24h. After expiry, any newly-launched agent (e.g., a fresh chat session spawned after day 1) will read an empty key and fall back to its 5 local tools only — with just a LogWarning from the agent side. Consider either:
- Writing without an expiry (tool defs are effectively static for the process lifetime), or
- Periodically re-publishing (e.g., on a timer/
IHostedService) so the TTL is refreshed.
Also consider Log.Error (rather than Warning) on export failure, since it silently degrades agent capability to ~5 tools.
🛠️ Suggested change (no expiry)
- await redis.GetDatabase().StringSetAsync(
- LlmAgentRedisKeys.AgentToolDefs, json, TimeSpan.FromHours(24));
+ await redis.GetDatabase().StringSetAsync(
+ LlmAgentRedisKeys.AgentToolDefs, json);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/AppBootstrap/GeneralBootstrap.cs` around lines 171 - 181,
The current startup writes tool definitions to Redis with a 24h TTL which can
expire and silently degrade agents; update the export logic that uses
McpToolHelper.ExportToolDefinitions(), IConnectionMultiplexer and
redis.GetDatabase().StringSetAsync(LlmAgentRedisKeys.AgentToolDefs, json, ...)
to either write the key without an expiry (omit the TimeSpan) or implement a
periodic refresh (e.g., move publishing into an IHostedService/timer that
re-runs ExportToolDefinitions and calls StringSetAsync to refresh TTL), and
change the catch from Log.Warning(...) to Log.Error(ex, ...) so failures are
surfaced as errors.
| private async Task PollTaskAsync(string taskId, TrackedTask tracked, CancellationToken cancellationToken) { | ||
| var values = await _redis.GetDatabase().ListRangeAsync(LlmAgentRedisKeys.AgentChunks(taskId), tracked.NextIndex, -1); | ||
| if (values.Length == 0) { | ||
| var db = _redis.GetDatabase(); | ||
|
|
||
| // Check for terminal chunk first (Done/Error/IterationLimitReached) | ||
| var terminalJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | ||
| if (terminalJson.HasValue) { | ||
| var terminal = JsonConvert.DeserializeObject<AgentStreamChunk>(terminalJson.ToString()); | ||
| if (terminal != null) { | ||
| // Deliver any final snapshot content before the terminal chunk | ||
| if (!string.IsNullOrEmpty(terminal.Content) && terminal.Content != tracked.LastContent) { | ||
| await tracked.Channel.Writer.WriteAsync(terminal, cancellationToken); | ||
| } | ||
| tracked.Completion.TrySetResult(terminal); | ||
| tracked.Channel.Writer.TryComplete(); | ||
| _trackedTasks.TryRemove(taskId, out _); | ||
| // Cleanup keys (use TTL as safety net, no race condition) | ||
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | ||
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // Check for snapshot updates | ||
| var snapshotJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | ||
| if (!snapshotJson.HasValue) { | ||
| // No snapshot yet - check task state for early completion/failure | ||
| await TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken); | ||
| return; | ||
| } | ||
|
|
||
| foreach (var value in values) { | ||
| var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(value.ToString()); | ||
| if (chunk == null) { | ||
| tracked.NextIndex++; | ||
| continue; | ||
| } | ||
| var snapshotStr = snapshotJson.ToString(); | ||
| if (snapshotStr == tracked.LastSnapshotJson) { | ||
| return; // No change since last poll | ||
| } | ||
|
|
||
| tracked.LastSnapshotJson = snapshotStr; | ||
| var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(snapshotStr); | ||
| if (chunk != null && chunk.Content != tracked.LastContent) { | ||
| tracked.LastContent = chunk.Content; | ||
| await tracked.Channel.Writer.WriteAsync(chunk, cancellationToken); | ||
| tracked.NextIndex++; | ||
| await _redis.GetDatabase().StringSetAsync( | ||
| LlmAgentRedisKeys.AgentChunkIndex(taskId), | ||
| tracked.NextIndex, | ||
| TimeSpan.FromHours(1), | ||
| When.Always); | ||
|
|
||
| if (chunk.Type is AgentChunkType.Done or AgentChunkType.Error or AgentChunkType.IterationLimitReached) { | ||
| tracked.Completion.TrySetResult(chunk); | ||
| tracked.Channel.Writer.TryComplete(); | ||
| _trackedTasks.TryRemove(taskId, out _); | ||
| await _redis.GetDatabase().KeyDeleteAsync(LlmAgentRedisKeys.AgentChunkIndex(taskId)); | ||
| await _redis.GetDatabase().KeyDeleteAsync(LlmAgentRedisKeys.AgentChunks(taskId)); | ||
| break; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Edge: task can get stuck if a snapshot exists but the terminal chunk never arrives.
TryCompleteFromTaskStateAsync is only invoked on the "no snapshot" branch (lines 100-104). If a task publishes a snapshot and then the agent process dies abnormally (OOM/SIGKILL) before PublishTerminalAsync, the snapshot key stays present and the polling loop will keep no-op'ing (same LastSnapshotJson each cycle) — handle.Completion never resolves, even if AgentRegistryService/recovery flips task state to Failed. Consider checking task state after processing the snapshot too, e.g., after line 116, so Failed/Cancelled terminal states are honored regardless of whether a snapshot exists.
🛠️ Suggested adjustment
tracked.LastSnapshotJson = snapshotStr;
var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(snapshotStr);
if (chunk != null && chunk.Content != tracked.LastContent) {
tracked.LastContent = chunk.Content;
await tracked.Channel.Writer.WriteAsync(chunk, cancellationToken);
}
+
+ // Also honor Failed/Cancelled task state even after snapshots exist
+ await TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private async Task PollTaskAsync(string taskId, TrackedTask tracked, CancellationToken cancellationToken) { | |
| var values = await _redis.GetDatabase().ListRangeAsync(LlmAgentRedisKeys.AgentChunks(taskId), tracked.NextIndex, -1); | |
| if (values.Length == 0) { | |
| var db = _redis.GetDatabase(); | |
| // Check for terminal chunk first (Done/Error/IterationLimitReached) | |
| var terminalJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | |
| if (terminalJson.HasValue) { | |
| var terminal = JsonConvert.DeserializeObject<AgentStreamChunk>(terminalJson.ToString()); | |
| if (terminal != null) { | |
| // Deliver any final snapshot content before the terminal chunk | |
| if (!string.IsNullOrEmpty(terminal.Content) && terminal.Content != tracked.LastContent) { | |
| await tracked.Channel.Writer.WriteAsync(terminal, cancellationToken); | |
| } | |
| tracked.Completion.TrySetResult(terminal); | |
| tracked.Channel.Writer.TryComplete(); | |
| _trackedTasks.TryRemove(taskId, out _); | |
| // Cleanup keys (use TTL as safety net, no race condition) | |
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | |
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | |
| return; | |
| } | |
| } | |
| // Check for snapshot updates | |
| var snapshotJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | |
| if (!snapshotJson.HasValue) { | |
| // No snapshot yet - check task state for early completion/failure | |
| await TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken); | |
| return; | |
| } | |
| foreach (var value in values) { | |
| var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(value.ToString()); | |
| if (chunk == null) { | |
| tracked.NextIndex++; | |
| continue; | |
| } | |
| var snapshotStr = snapshotJson.ToString(); | |
| if (snapshotStr == tracked.LastSnapshotJson) { | |
| return; // No change since last poll | |
| } | |
| tracked.LastSnapshotJson = snapshotStr; | |
| var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(snapshotStr); | |
| if (chunk != null && chunk.Content != tracked.LastContent) { | |
| tracked.LastContent = chunk.Content; | |
| await tracked.Channel.Writer.WriteAsync(chunk, cancellationToken); | |
| tracked.NextIndex++; | |
| await _redis.GetDatabase().StringSetAsync( | |
| LlmAgentRedisKeys.AgentChunkIndex(taskId), | |
| tracked.NextIndex, | |
| TimeSpan.FromHours(1), | |
| When.Always); | |
| if (chunk.Type is AgentChunkType.Done or AgentChunkType.Error or AgentChunkType.IterationLimitReached) { | |
| tracked.Completion.TrySetResult(chunk); | |
| tracked.Channel.Writer.TryComplete(); | |
| _trackedTasks.TryRemove(taskId, out _); | |
| await _redis.GetDatabase().KeyDeleteAsync(LlmAgentRedisKeys.AgentChunkIndex(taskId)); | |
| await _redis.GetDatabase().KeyDeleteAsync(LlmAgentRedisKeys.AgentChunks(taskId)); | |
| break; | |
| } | |
| } | |
| } | |
| private async Task PollTaskAsync(string taskId, TrackedTask tracked, CancellationToken cancellationToken) { | |
| var db = _redis.GetDatabase(); | |
| // Check for terminal chunk first (Done/Error/IterationLimitReached) | |
| var terminalJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | |
| if (terminalJson.HasValue) { | |
| var terminal = JsonConvert.DeserializeObject<AgentStreamChunk>(terminalJson.ToString()); | |
| if (terminal != null) { | |
| // Deliver any final snapshot content before the terminal chunk | |
| if (!string.IsNullOrEmpty(terminal.Content) && terminal.Content != tracked.LastContent) { | |
| await tracked.Channel.Writer.WriteAsync(terminal, cancellationToken); | |
| } | |
| tracked.Completion.TrySetResult(terminal); | |
| tracked.Channel.Writer.TryComplete(); | |
| _trackedTasks.TryRemove(taskId, out _); | |
| // Cleanup keys (use TTL as safety net, no race condition) | |
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | |
| _ = db.KeyDeleteAsync(LlmAgentRedisKeys.AgentTerminal(taskId)); | |
| return; | |
| } | |
| } | |
| // Check for snapshot updates | |
| var snapshotJson = await db.StringGetAsync(LlmAgentRedisKeys.AgentSnapshot(taskId)); | |
| if (!snapshotJson.HasValue) { | |
| // No snapshot yet - check task state for early completion/failure | |
| await TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken); | |
| return; | |
| } | |
| var snapshotStr = snapshotJson.ToString(); | |
| if (snapshotStr == tracked.LastSnapshotJson) { | |
| return; // No change since last poll | |
| } | |
| tracked.LastSnapshotJson = snapshotStr; | |
| var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(snapshotStr); | |
| if (chunk != null && chunk.Content != tracked.LastContent) { | |
| tracked.LastContent = chunk.Content; | |
| await tracked.Channel.Writer.WriteAsync(chunk, cancellationToken); | |
| } | |
| // Also honor Failed/Cancelled task state even after snapshots exist | |
| await TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs` around lines 76 -
117, PollTaskAsync can hang if a snapshot exists but no terminal chunk arrives;
call TryCompleteFromTaskStateAsync not only when snapshot is missing but also
after processing (or detecting no-change in) the snapshot. Update PollTaskAsync
to invoke TryCompleteFromTaskStateAsync(taskId, tracked, cancellationToken)
after you handle snapshot cases (e.g., after comparing/updating
tracked.LastSnapshotJson and after the early return when snapshotStr ==
tracked.LastSnapshotJson) so that task state transitions (Failed/Cancelled) from
AgentRegistryService are honored even when AgentSnapshot remains present; keep
existing snapshot/terminal delivery logic (AgentSnapshot/AgentTerminal,
tracked.LastContent) intact.
| // Execute concurrently with bounded parallelism | ||
| await _concurrencyLimiter.WaitAsync(stoppingToken); | ||
| _ = Task.Run(async () => { | ||
| try { | ||
| await ExecuteToolTaskAsync(task, stoppingToken); | ||
| } finally { | ||
| _concurrencyLimiter.Release(); | ||
| } | ||
|
|
||
| var chatId = task.Arguments.TryGetValue("chatId", out var chatIdString) && long.TryParse(chatIdString, out var parsedChatId) | ||
| ? parsedChatId | ||
| : task.ChatId; | ||
|
|
||
| var sent = await _sendMessage.AddTaskWithResult(() => _botClient.SendMessage(chatId, text, cancellationToken: stoppingToken), chatId); | ||
| response.Success = true; | ||
| response.TelegramMessageId = sent.MessageId; | ||
| response.Result = sent.MessageId.ToString(); | ||
| } catch (Exception ex) when (ex is not OperationCanceledException) { | ||
| _logger.LogError(ex, "Failed to execute telegram task {RequestId}", task.RequestId); | ||
| response.ErrorMessage = ex.Message; | ||
| } | ||
|
|
||
| await _redis.GetDatabase().StringSetAsync( | ||
| LlmAgentRedisKeys.TelegramResult(task.RequestId), | ||
| JsonConvert.SerializeObject(response), | ||
| TimeSpan.FromMinutes(5)); | ||
| }, stoppingToken); |
There was a problem hiding this comment.
In-flight tool tasks are not drained on shutdown (can lose results).
This fire-and-forget _ = Task.Run(...) pattern has two issues:
-
Lost results on graceful shutdown. When
stoppingTokenfires, the outer loop breaks andExecuteAsyncreturns immediately, but tasks already scheduled viaTask.Runkeep running without being awaited by the host. IfExecuteToolTaskAsyncis mid-write to Redis (line 105) when the host finalizes, theTELEGRAM_RESULT:{requestId}key is never set — and the agent-sideExecuteRemoteToolAsyncwill only discover this by timing out after 60s (or, forsend_message, user-visible silence). Track the tasks and drain them in an overriddenStopAsync. -
Semaphore release skipped if the delegate never runs.
Task.Run(delegate, stoppingToken)will refuse to invokedelegateifstoppingTokenis already cancelled when the task is scheduled — in that narrow window thefinally { Release(); }never executes, leaking a permit. Not catastrophic (the service is stopping anyway) but thestoppingTokenargument toTask.Runbuys nothing here since the token is already passed intoExecuteToolTaskAsync/ExecuteSendMessageAsyncfor cooperative cancellation. Drop it fromTask.Run.
🔧 Suggested fix
public sealed class TelegramTaskConsumer : BackgroundService {
...
private readonly SemaphoreSlim _concurrencyLimiter = new(4, 4);
+ private readonly ConcurrentDictionary<Task, byte> _inFlight = new(); await _concurrencyLimiter.WaitAsync(stoppingToken);
- _ = Task.Run(async () => {
+ Task worker = null;
+ worker = Task.Run(async () => {
try {
await ExecuteToolTaskAsync(task, stoppingToken);
} finally {
_concurrencyLimiter.Release();
+ _inFlight.TryRemove(worker!, out _);
}
- }, stoppingToken);
+ });
+ _inFlight.TryAdd(worker, 0);+ public override async Task StopAsync(CancellationToken cancellationToken) {
+ await base.StopAsync(cancellationToken);
+ try {
+ await Task.WhenAll(_inFlight.Keys).WaitAsync(TimeSpan.FromSeconds(10), cancellationToken);
+ } catch (Exception ex) {
+ _logger.LogWarning(ex, "Some in-flight Telegram tool tasks did not complete within drain timeout");
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs` around lines 67 -
75, The fire-and-forget Task.Run currently leaks results and semaphore permits;
instead, track the spawned tasks and drain them on shutdown and remove the
stoppingToken argument from Task.Run. Concretely: when you call
_concurrencyLimiter.WaitAsync(...) and then start the worker, capture the Task
returned by Task.Run (do not pass stoppingToken to Task.Run) and add it to a
concurrent collection (e.g., _inflightTasks) and ensure the Task.Run delegate
still calls ExecuteToolTaskAsync/ExecuteSendMessageAsync and always
_concurrencyLimiter.Release() in a finally block; then override StopAsync to
await Task.WhenAll(_inflightTasks) (or await a drained snapshot with
cancellation awareness) so in-flight ExecuteToolTaskAsync writes
(TELEGRAM_RESULT:{requestId}) complete before shutdown; also remove passing
stoppingToken into Task.Run so the semaphore release is not skipped if the token
is already canceled.
Summary
Fixes three major issues with the LLM agent process separation:
1. Memory accumulation from chunk storage (LIST → SET)
2. Missing tools in agent mode
3. Generalized task consumer
Key Changes
Tests
All 415 tests pass. Updated GarnetClient, ChunkPolling, and AgentIntegration tests for SET-based semantics.
Summary by CodeRabbit
New Features
Improvements