Implement LLM agent process separation#265
Conversation
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
📝 WalkthroughWalkthroughThis PR implements a distributed LLM agent architecture that offloads language model execution to separate worker processes. The main application enqueues LLM tasks to Redis; independent agent processes execute these tasks, stream results back, and handle recovery. Two new projects ( Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/Controller
participant Queue as LLMTaskQueueService
participant Registry as AgentRegistryService
participant AgentProc as Agent Process
participant Polling as ChunkPollingService
participant Redis as Redis
Client->>Queue: EnqueueMessageTaskAsync()
Queue->>Registry: EnsureAgentAsync(chatId)
alt Agent not running
Registry->>AgentProc: StartAsync() via launcher
AgentProc->>Redis: Update session heartbeat
end
Queue->>Redis: Push task to AgentTaskQueue
Queue->>Polling: TrackTask(taskId)
Queue-->>Client: Return AgentTaskStreamHandle
AgentProc->>Redis: BRPop AgentTaskQueue
AgentProc->>AgentProc: ProcessTaskAsync()
loop Streaming chunks
AgentProc->>Redis: PublishChunkAsync()
end
Polling->>Redis: Poll agent chunks
Polling->>Polling: Emit snapshot/error/done
Polling-->>Client: ReadSnapshotsAsync() yields chunks
Client->>Polling: Read terminal chunk
Client->>Client: Send result to Telegram
sequenceDiagram
participant Agent as Agent Process
participant Registry as AgentRegistryService
participant Heartbeat as Heartbeat Task
participant Maintenance as Maintenance Loop
participant Redis as Redis
participant Launcher as Process Launcher
Agent->>Redis: Save session (alive)
Agent->>Heartbeat: Start periodic heartbeat
loop Every N seconds
Heartbeat->>Redis: Update heartbeat timestamp
end
loop Maintenance cycle
Maintenance->>Registry: RunMaintenanceOnceAsync()
Registry->>Redis: Load all active sessions
alt Session heartbeat stale
Registry->>Registry: Re-queue task with recovery
Registry->>Launcher: TryKill(processId)
else Session idle timeout
Registry->>Redis: RequestShutdownAsync()
else Shutdown grace expired
Registry->>Launcher: TryKill(processId)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔍 PR检查报告📋 检查概览
🧪 测试结果
📊 代码质量
📁 测试产物
🔗 相关链接此报告由GitHub Actions自动生成 |
There was a problem hiding this comment.
Actionable comments posted: 7
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (20)
TelegramSearchBot.Test/Service/AI/LLM/InMemoryRedisTestHarness.cs-35-50 (1)
35-50:⚠️ Potential issue | 🟡 Minor
ListRangeAsyncignores negativestartvalues.
normalizedStart = Math.Max(0, start)silently clamps negative starts to 0, but Redis treats negative indices as offsets from the end (e.g.,-1= last element). If any production code under test ever callsListRangeAsyncwith a negative start (paging from tail), tests will observe different data than a real Redis would return. If current callers only use non-negative starts, this is a minor fidelity gap worth a comment; otherwise consider mirroring thestop < 0handling forstart.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.Test/Service/AI/LLM/InMemoryRedisTestHarness.cs` around lines 35 - 50, The ListRangeAsync test stub incorrectly clamps negative start indices to zero (normalizedStart = Math.Max(0, start)), which differs from Redis semantics where negative indices count from the end; update the Database.Setup for ListRangeAsync to handle negative start the same way stop is handled (e.g., if start < 0 compute normalizedStart = list.Count + (int)start or similar, and then bound it into [0, list.Count-1]) so behavior mirrors real Redis when resolving _lists entries in the lock block; reference the ListRangeAsync setup, variables normalizedStart/normalizedStop, and the _lists lookup to locate and change the logic.TelegramSearchBot.LLMAgent/Program.cs-1-7 (1)
1-7:⚠️ Potential issue | 🟡 MinorAlign LLMAgentProgram bootstrap with Program.cs guideline or document the deviation.
The agent worker currently deviates from the repository's Program.cs guideline.
LLMAgentProgram.BuildServices()uses simple console logging instead of Serilog, does not configure OpenTelemetry, does not set HTTP/2 defaults, and does not useGeneralBootstrap.Startup. This means agent-side logs do not flow to the observability backend that the main process uses.Either update
LLMAgentProgramto initialize Serilog + OpenTelemetry viaGeneralBootstrap.Startup(if applicable to worker processes), or document explicitly why the agent worker requires a separate logging and bootstrap pipeline.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Program.cs` around lines 1 - 7, Program.Main currently calls LLMAgentProgram.RunAsync but LLMAgentProgram.BuildServices uses plain console logging and lacks Serilog/OpenTelemetry/HTTP2 and GeneralBootstrap.Startup integration; update LLMAgentProgram to call the same bootstrap flow as the main process by invoking GeneralBootstrap.Startup (or the equivalent shared bootstrap method) to initialize Serilog, OpenTelemetry and HTTP/2 defaults before building services (reference LLMAgentProgram.BuildServices and Program.Main), or add a clear README/inline documentation near LLMAgentProgram.BuildServices explaining why the agent intentionally uses a separate logging/observability pipeline and list the specific differences (no Serilog, no OpenTelemetry, HTTP/2 defaults) so reviewers know the deviation is deliberate.TelegramSearchBot/TelegramSearchBot.csproj-98-99 (1)
98-99:⚠️ Potential issue | 🟡 MinorConfirm the purpose of referencing worker executables — unnecessary for process launching via file path.
The ProjectReferences to
TelegramSearchBot.LLMAgentandTelegramSearchBot.SubAgent(bothOutputType=Exe) exist at lines 98-99, but no code in the main app imports types from these projects. The worker processes are launched via file path (AppContext.BaseDirectory), not direct assembly loading.Since your Scrutor configuration uses explicit
FromAssemblyOf<T>()calls (forIOnUpdate,IScheduleTask,IView, etc.) rather thanFromApplicationDependencies(), unintended type registration from worker assemblies is not a risk.However, these ProjectReferences still cause the worker executables and their full dependency graphs to be copied into the main app's output directory. If the purpose is only to ensure worker DLLs are present for subprocess launching, consider:
- Removing the ProjectReferences and instead using a post-build step or separate build configuration to copy only the worker
.exefiles- Adding a comment documenting why the references exist if they serve another purpose
If these projects share contract types with the main app, extract those into a shared contracts library.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/TelegramSearchBot.csproj` around lines 98 - 99, The ProjectReference entries for TelegramSearchBot.LLMAgent and TelegramSearchBot.SubAgent are pulling full worker executables and their dependencies into the main output even though the main app launches workers by file path and does not use their types; remove these ProjectReference lines (or replace them with a post-build copy step that only copies the worker .exe) to avoid unnecessary assemblies in the main output, or if those references were intended to share contracts, move the shared types into a separate shared contracts project and reference that instead; alternatively, if you must keep the ProjectReferences for a reason, add a clear comment above the references explaining why they are required.TelegramSearchBot.SubAgent/Program.cs-28-32 (1)
28-32:⚠️ Potential issue | 🟡 MinorMinor:
ProcessExithandler can race withshutdownCts.Dispose().
shutdownCtsis ausinglocal; ifProcessExitfires afterMainreturns and disposal has run,shutdownCts.Cancel()will throwObjectDisposedExceptionon the process-exit thread. Guarding with atry/catch(or unregistering the handler before returning) avoids a noisy shutdown.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.SubAgent/Program.cs` around lines 28 - 32, The ProcessExit handler calls shutdownCts.Cancel() but shutdownCts is a using local and may be disposed after Main returns, causing ObjectDisposedException; update the AppDomain.CurrentDomain.ProcessExit handler (and/or the Console.CancelKeyPress handler) to either wrap shutdownCts.Cancel() in a try/catch that ignores ObjectDisposedException (and other benign exceptions) or unregister the handler (using -=) before disposing the using-local shutdownCts so the handler cannot run after disposal; reference the shutdownCts variable and the AppDomain.CurrentDomain.ProcessExit and Console.CancelKeyPress event registrations when making the change.TelegramSearchBot.LLMAgent/Service/GarnetClient.cs-21-33 (1)
21-33:⚠️ Potential issue | 🟡 Minor
BRPopAsynctimeout conversion can produce0, meaning "block forever" in Redis.
(int)Math.Ceiling(timeout.TotalSeconds)is fine for positive timeouts, butTimeSpan.Zeroor negative timeouts get sent asBRPOP key 0, which in Redis semantics means block indefinitely — the opposite of a zero/negative timeout. Consider clamping to a minimum of 1 second, or throwingArgumentOutOfRangeExceptionfor non-positive inputs.🛡️ Suggested guard
- public async Task<string?> BRPopAsync(string key, TimeSpan timeout) { - var result = await _redis.GetDatabase().ExecuteAsync("BRPOP", key, (int)Math.Ceiling(timeout.TotalSeconds)); + public async Task<string?> BRPopAsync(string key, TimeSpan timeout) { + if (timeout <= TimeSpan.Zero) { + throw new ArgumentOutOfRangeException(nameof(timeout), "BRPOP timeout must be positive; 0 would block indefinitely."); + } + var seconds = Math.Max(1, (int)Math.Ceiling(timeout.TotalSeconds)); + var result = await _redis.GetDatabase().ExecuteAsync("BRPOP", key, seconds);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/GarnetClient.cs` around lines 21 - 33, BRPopAsync currently converts timeout via (int)Math.Ceiling(timeout.TotalSeconds) which turns TimeSpan.Zero or negative values into 0 and causes Redis to block indefinitely; update BRPopAsync to validate the timeout first and either throw an ArgumentOutOfRangeException for non-positive timeouts or clamp the converted seconds to a minimum of 1 before calling ExecuteAsync (e.g., check timeout <= TimeSpan.Zero and handle accordingly), keeping the rest of the method (result parsing and return) unchanged.TelegramSearchBot.LLMAgent/Service/AgentToolService.cs-22-28 (1)
22-28:⚠️ Potential issue | 🟡 MinorPass real cancellation token to
SendMessageAsyncinstead ofCancellationToken.None.
ToolContextdoes not carry aCancellationToken, but_toolExecutor.SendMessageAsyncaccepts one. Either addCancellationTokentoToolContextand use it here, or accept a separateCancellationTokenparameter inSendMessageAsyncto allow agent shutdown or timeout to cancel the Redis round-trip. Currently, the operation cannot be cancelled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/AgentToolService.cs` around lines 22 - 28, The SendMessageAsync wrapper currently passes CancellationToken.None to _toolExecutor.SendMessageAsync making the call un-cancelable; update AgentToolService.SendMessageAsync to accept a CancellationToken parameter (e.g., add CancellationToken cancellationToken = default to the method signature) and pass that token into _toolExecutor.SendMessageAsync instead of CancellationToken.None; alternatively, if you prefer ToolContext to carry cancellation state, add a CancellationToken property to ToolContext and use toolContext.CancellationToken when calling _toolExecutor.SendMessageAsync (ensure the BuiltInTool method signature is updated to surface the token).TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs-75-83 (1)
75-83:⚠️ Potential issue | 🟡 MinorCancellation during shutdown is logged as an error and still produces a result write.
On shutdown,
stoppingTokenflows into_botClient.SendMessage(...)(via line 71) and will throwOperationCanceledException. The broadcatch (Exception ex)at line 75 treats this as a failed Telegram task: it logs at Error level, then writes aSuccess=falseresult to Redis (lines 80-83), causing the corresponding waiter on the agent side to observe a spurious failure instead of a timeout.🛠️ Proposed fix
- try { + try { if (!task.ToolName.Equals("send_message", StringComparison.OrdinalIgnoreCase)) { throw new InvalidOperationException($"Unsupported telegram tool: {task.ToolName}"); } // ... response.Success = true; response.TelegramMessageId = sent.MessageId; response.Result = sent.MessageId.ToString(); - } catch (Exception ex) { + } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { + // Shutdown in progress — don't publish a "failed" result. + return; + } catch (Exception ex) { _logger.LogError(ex, "Failed to execute telegram task {RequestId}", task.RequestId); response.ErrorMessage = ex.Message; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs` around lines 75 - 83, The catch-all in TelegramTaskConsumer.cs currently treats OperationCanceledException from _botClient.SendMessage (propagated via stoppingToken) as an error: it logs at Error and writes a failed response to Redis via LlmAgentRedisKeys.TelegramResult, causing a spurious failure; update the exception handling in the method that calls _botClient.SendMessage so that if cancellation is detected (either by catching OperationCanceledException specifically or by checking stoppingToken.IsCancellationRequested) you do not log it as an error nor write the Redis result — instead rethrow or return immediately (no Redis write) so shutdown cancellation is propagated correctly; keep other exceptions handled by the existing catch (Exception ex) path that logs and writes the response.TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs-94-121 (1)
94-121:⚠️ Potential issue | 🟡 MinorNo timeout/TTL-based recovery if agent crashes without writing a terminal chunk or updating status.
TryCompleteFromTaskStateAsynconly resolvesCompletionwhen the task-state hash transitions toCompleted/Failed/Cancelled. If the agent process dies after publishing intermediate snapshots but before updating status (or writingDone/Error), the tracked task stays in_trackedTasksindefinitely, the consumer'sCompletiontask hangs, and its unbounded channel accumulates in memory. Consider adding a wall-clock timeout per tracked task (e.g., viaAgentSessionInfo.LastHeartbeatUtcor a per-task deadline) that synthesizes anErrorchunk on inactivity, so upstream controllers can surface the failure.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs` around lines 94 - 121, TryCompleteFromTaskStateAsync currently only completes tasks when Redis status becomes terminal, so add timeout/TTL-based recovery: track a per-task deadline (either from AgentSessionInfo.LastHeartbeatUtc or attach a per-task expiry when putting entries into _trackedTasks) and, inside TryCompleteFromTaskStateAsync (or a small helper invoked from it), detect stale tasks whose last heartbeat/expiresUtc is older than the threshold and call CompleteTrackedTaskAsync with an AgentStreamChunk of Type=AgentChunkType.Error (ErrorMessage set to a synthesized timeout message) to clean up _trackedTasks; ensure the deadline logic references the tracked task metadata (e.g., tracked or AgentSessionInfo) and cancels/marks Completion so channels don’t hang and memory is reclaimed.TelegramSearchBot/Controller/AI/LLM/AgentMonitorController.cs-36-85 (1)
36-85:⚠️ Potential issue | 🟡 MinorCommand parsing is brittle; consider normalizing input.
A few minor parsing gaps:
- Line 36:
text.StartsWith("/agent", …)matches unrelated commands like/agentsor/agentinfo, sending them through the admin check and silently dropping them.- Telegram bot commands are often suffixed with
@BotName(e.g.,/agent list@MyBot). The equality checks at lines 44/61 andStartsWith("/agent kill ", …)at line 73 will not match these forms.- Trailing whitespace (e.g.,
/agent list) falls through all branches with no response./agent killwith no argument also silently returns (no usage hint).Suggest trimming, stripping the optional
@BotNamesuffix, and splitting the command by whitespace before dispatching; also emit a usage message on any unrecognized/agent …input to aid operators.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Controller/AI/LLM/AgentMonitorController.cs` around lines 36 - 85, The command parsing in AgentMonitorController.cs is brittle: normalize the incoming text (trim whitespace), strip an optional `@BotName` suffix, and split the command into tokens before dispatch so "/agents" or "/agentinfo" won't match and forms like "/agent list@MyBot" or trailing spaces are handled; in the handler that currently checks text.StartsWith("/agent", ...), replace direct StartsWith/Equals checks with token-based matching for "list" and "stats", and for the "kill" branch validate presence of a second token (emit usage text if missing) and parse it safely (emit usage on parse failure); finally, if the first token is exactly "/agent" but the subcommand is unrecognized, send a usage/help message to the user instead of silently returning.TelegramSearchBot.Common/Model/AI/LlmAgentContracts.cs-102-102 (1)
102-102:⚠️ Potential issue | 🟡 MinorDictionary
StringComparer.OrdinalIgnoreCaseis lost after JSON round-trip.
TelegramAgentToolTask.Arguments,SubAgentMcpExecuteRequest.Env/Arguments, andSubAgentBackgroundTaskRequest.Envare initialized withOrdinalIgnoreCasecomparers, but Newtonsoft.Json creates a newDictionary<TKey,TValue>with the default (case-sensitive) comparer during deserialization, so consumers reading these from Redis will get case-sensitive lookups. If case-insensitive semantics matter for any downstream consumer, either use a customJsonConverteror copy into a newDictionary(..., StringComparer.OrdinalIgnoreCase)after deserialization.Also applies to: 161-170
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.Common/Model/AI/LlmAgentContracts.cs` at line 102, The Dictionaries (TelegramAgentToolTask.Arguments, SubAgentMcpExecuteRequest.Env and Arguments, SubAgentBackgroundTaskRequest.Env) are initialized with StringComparer.OrdinalIgnoreCase but Newtonsoft.Json will deserialize them into plain case-sensitive Dictionary instances; fix by restoring a case-insensitive comparer after deserialization — either implement a custom JsonConverter for Dictionary<string,string> that constructs dictionaries with StringComparer.OrdinalIgnoreCase or add an [OnDeserialized] method on the affected classes that replaces the deserialized dictionary with new Dictionary<string,string>(existing, StringComparer.OrdinalIgnoreCase); update the named properties (Arguments, Env) to use the restored dictionary so downstream code gets case-insensitive lookups.TelegramSearchBot.LLMAgent/Service/LlmServiceProxy.cs-111-141 (1)
111-141:⚠️ Potential issue | 🟡 MinorGuard against
historyMessage.Userbeing null after deserialization.
AgentHistoryMessage.Useris initialized with a default, but the task envelope arrives via JSON — if"User"is absent/null in the payload,historyMessage.User.UserIdon line 123 throws NRE and aborts the whole seed. Prefer a null-safe access (historyMessage.User?.UserId) and skip the user insert when no snapshot is present.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/LlmServiceProxy.cs` around lines 111 - 141, The loop over task.History in LlmServiceProxy.cs assumes historyMessage.User is non-null and calls historyMessage.User.UserId (and reads other User properties) which can NRE after JSON deserialization; change the user-seeding logic to first check if historyMessage.User is null (e.g., var user = historyMessage.User; if (user != null) { if (seededUsers.Add(user.UserId)) dbContext.UserData.Add(new UserData { Id = user.UserId, FirstName = user.FirstName, LastName = user.LastName, UserName = user.UserName, IsBot = user.IsBot, IsPremium = user.IsPremium }); }) and use null-safe access (historyMessage.User?.UserId) everywhere you currently assume User exists so you skip user insert when the snapshot is absent.TelegramSearchBot.SubAgent/Service/SubAgentService.cs-21-65 (1)
21-65:⚠️ Potential issue | 🟡 MinorWorker loop has no resilience to Redis/transient failures.
If
BRPOP, the cast at line 28, or the finalStringSetAsyncthrows (Redis hiccup, network blip, etc.), the exception escapesRunAsyncand the sub-agent process silently stops consuming work. Consider wrapping the loop body with an outertry/catchthat logs and backs off (e.g.,await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken)), so transient connection issues don't kill the worker.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.SubAgent/Service/SubAgentService.cs` around lines 21 - 65, The RunAsync worker loop is vulnerable to transient Redis/network exceptions that can escape and stop the process; wrap the entire loop body (everything inside while in RunAsync that calls _redis.GetDatabase().ExecuteAsync("BRPOP", ...), the cast to RedisResult[], payload handling, task deserialization and the final StringSetAsync) in an outer try/catch that catches Exception, logs the error via _logger.LogError(ex, "...") (include context like RequestId or operation), and then awaits a short backoff (e.g., await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken)); if OperationCanceledException/TaskCanceledException is observed or cancellationToken.IsCancellationRequested then rethrow/exit so cancellation still works. Use the existing symbols RunAsync, _redis, LlmAgentRedisKeys, ExecuteMcpAsync and ExecuteBackgroundTaskAsync to locate the code to wrap.TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs-62-91 (1)
62-91:⚠️ Potential issue | 🟡 MinorCorrupt JSON advances
NextIndexbut does not persist it, causing re-read loops.At line 70-72, when
DeserializeObjectreturns null,tracked.NextIndexis bumped in memory but the RedisAgentChunkIndexkey (updated at line 77-81) is only persisted after a successfulWriteAsync. If the service restarts and another replica takes over, it will re-read the bad entry from the stale index and loop forever on the same poison chunk. Persist the advanced index in the null-chunk branch too.♻️ Proposed fix
var chunk = JsonConvert.DeserializeObject<AgentStreamChunk>(value.ToString()); if (chunk == null) { tracked.NextIndex++; + await _redis.GetDatabase().StringSetAsync( + LlmAgentRedisKeys.AgentChunkIndex(taskId), + tracked.NextIndex, + TimeSpan.FromHours(1), + When.Always); continue; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/ChunkPollingService.cs` around lines 62 - 91, When JsonConvert.DeserializeObject returns null for a value in ChunkPollingService (the loop that processes values from _redis.ListRangeAsync), the code currently increments tracked.NextIndex in memory but does not persist it to Redis, which can cause other replicas to re-read the same poison chunk; modify the null-chunk branch (the if (chunk == null) block) to call _redis.GetDatabase().StringSetAsync(LlmAgentRedisKeys.AgentChunkIndex(taskId), tracked.NextIndex, TimeSpan.FromHours(1), When.Always) before continuing so the advanced index is persisted the same way it is after successful WriteAsync.TelegramSearchBot.LLMAgent/Service/LlmServiceProxy.cs-65-81 (1)
65-81:⚠️ Potential issue | 🟡 MinorMutating
Env.BotIdand service properties from withinCallAsyncis not thread-safe.
Env.BotIdis a process-wide static andBotNameis a property on what is typically a singletonILLMService. If the agent process ever executes two tasks concurrently (even within the same chat scope), these writes race with reads elsewhere in the LLM pipeline. Since identity is task-specific, consider threading it throughLlmExecutionContext(or a per-call parameter) rather than stamping it onto shared state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/LlmServiceProxy.cs` around lines 65 - 81, ApplyBotIdentity currently mutates shared state (Env.BotId) and singleton service properties (OpenAIService.BotName, OllamaService.BotName, GeminiService.BotName, AnthropicService.BotName) which is not thread-safe; instead, stop writing to Env.BotId and service instance properties and thread the identity through a per-call context (e.g., add botName and botUserId to LlmExecutionContext or as additional parameters on CallAsync), update callers of ApplyBotIdentity to populate the LlmExecutionContext with botName/botUserId, and adjust service call sites (or add a per-call method on ILLMService) to accept the identity from that context so the services use the per-call identity rather than mutating singletons.TelegramSearchBot/Service/AI/LLM/LLMTaskQueueService.cs-77-95 (1)
77-95:⚠️ Potential issue | 🟡 MinorNo expiry on the task-state hash — orphaned tasks will accumulate in Redis indefinitely.
AgentTaskState:{taskId}is written viaHashSetAsyncwithout a TTL. If an agent crashes, the controller consumer goes away, or the task is simply never cleaned up downstream, these hashes live forever and slowly bloat Redis. Consider setting a key expiry (e.g.,KeyExpireAsyncwith a generous timeout like 24h) alongside the hash write — matching the 1h TTL used forAgentChunkIndexinChunkPollingService.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/LLMTaskQueueService.cs` around lines 77 - 95, EnqueueTaskAsync currently writes the per-task hash via LlmAgentRedisKeys.AgentTaskState(task.TaskId) without a TTL, causing orphaned hashes to accumulate; after the HashSetAsync call in EnqueueTaskAsync, call the Redis KeyExpireAsync for LlmAgentRedisKeys.AgentTaskState(task.TaskId) with a generous timeout (e.g., TimeSpan.FromHours(24)) so the task-state hash will expire automatically (match the AgentChunkIndex TTL semantics used in ChunkPollingService).TelegramSearchBot/AppBootstrap/AppBootstrap.cs-55-89 (1)
55-89:⚠️ Potential issue | 🟡 MinorJob handle leaks if
SetInformationJobObjectfails.If
SetInformationJobObjectreturns false and this method throws at line 82, the freshly createdSafeJobHandleis never disposed — callers likeAddProcessonly receive the handle on successful return, so the kernel job object lingers until finalization. Dispose the handle on the failure path (and ideally also on any exception within the try block).🔒 Proposed fix
var length = Marshal.SizeOf(typeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION)); var extendedInfoPtr = Marshal.AllocHGlobal(length); try { Marshal.StructureToPtr(extendedInfo, extendedInfoPtr, false); if (!SetInformationJobObject(handle, JobObjectInfoType.ExtendedLimitInformation, extendedInfoPtr, (uint)length)) { + handle.Dispose(); throw new InvalidOperationException("Unable to set information", new Win32Exception()); } + } catch { + handle.Dispose(); + throw; } finally { Marshal.FreeHGlobal(extendedInfoPtr); } return handle;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/AppBootstrap/AppBootstrap.cs` around lines 55 - 89, The CreateConfiguredJobHandle method currently leaks the SafeJobHandle if SetInformationJobObject fails or any exception occurs before successful return; update CreateConfiguredJobHandle to ensure the created SafeJobHandle is disposed on all failure paths by wrapping the handle creation and subsequent setup in a try/catch/finally (or using) pattern: create the SafeJobHandle, try to configure JOBOBJECT_EXTENDED_LIMIT_INFORMATION and call SetInformationJobObject, and if SetInformationJobObject returns false or any exception is thrown, call Dispose() (or Close()) on that handle before rethrowing the exception so callers like AddProcess only get valid handles and the kernel job object is not leaked.TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs-263-268 (1)
263-268:⚠️ Potential issue | 🟡 MinorDrain loop is capped at ~500 ms and swallows incomplete runs silently.
DrainUntilCompletedAsyncpolls at most 50 times × 10 ms, then returns regardless of handle state. On slow CI (or if any stage blocks briefly), downstreamAssert.Equalmay observe a partial snapshot list and fail intermittently — or worse, an unfinished test is reported as green if the assertion happens to match prematurely. Consider asserting completion after the loop (e.g.Assert.True(handles.All(h => h.Completion.IsCompleted))) and/or raising the budget.🛡️ Proposed fix
private static async Task DrainUntilCompletedAsync(ChunkPollingService polling, params AgentTaskStreamHandle[] handles) { - for (var i = 0; i < 50 && handles.Any(h => !h.Completion.IsCompleted); i++) { + for (var i = 0; i < 200 && handles.Any(h => !h.Completion.IsCompleted); i++) { await polling.RunPollCycleAsync(); await Task.Delay(10); } + Assert.All(handles, h => Assert.True(h.Completion.IsCompleted, "Stream handle did not complete within the drain budget")); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.cs` around lines 263 - 268, The drain loop in DrainUntilCompletedAsync returns after 50 iterations (≈500ms) and can silently allow unfinished AgentTaskStreamHandle.Completion tasks; update DrainUntilCompletedAsync to increase the retry budget (e.g., raise the iteration count and/or delay) and after the loop assert that all handles completed by checking handles.All(h => h.Completion.IsCompleted) (use Assert.True or equivalent), so any incomplete runs fail the test; keep the existing polling call to ChunkPollingService.RunPollCycleAsync and Task.Delay but adjust the loop bounds and add the final Assert to fail fast on CI when completions are not reached.TelegramSearchBot.LLMAgent/Service/AgentLoopService.cs-104-111 (1)
104-111:⚠️ Potential issue | 🟡 Minor
await heartbeatTaskinfinallycan replace the original exception.
heartbeatCts.Cancel()will makeWaitForNextTickAsyncthrowOperationCanceledExceptioninsideRunHeartbeatAsync.await heartbeatTaskthen rethrows that, masking any exception that caused us to reachfinally(e.g. a Redis error from the main loop). Await it defensively so cleanup always completes and the primary exception propagates.♻️ Proposed fix
- heartbeatCts.Cancel(); + heartbeatCts.Cancel(); session.Status = "stopped"; session.LastHeartbeatUtc = DateTime.UtcNow; await _rpcClient.SaveSessionAsync(session); await _rpcClient.KeyDeleteAsync(LlmAgentRedisKeys.AgentControl(chatId)); - await heartbeatTask; + try { await heartbeatTask; } catch (OperationCanceledException) { /* expected */ }Additionally, consider the same pattern inside
RunHeartbeatAsyncto avoid propagating cancellation to callers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/AgentLoopService.cs` around lines 104 - 111, The finally block currently does "await heartbeatTask" which can rethrow OperationCanceledException (from heartbeatCts.Cancel()) and mask the original exception; change this to await the heartbeatTask inside a try/catch that swallows OperationCanceledException (and optionally logs and swallows other heartbeat exceptions) so the primary exception that caused the finally to run still propagates; update the cleanup sequence around heartbeatCts.Cancel(), session updates, _rpcClient.SaveSessionAsync/KeyDeleteAsync to await heartbeatTask within try { await heartbeatTask; } catch (OperationCanceledException) { /* ignore */ } and consider making RunHeartbeatAsync/WaitForNextTickAsync observe cancellation without throwing to avoid propagating cancellation to callers.TelegramSearchBot/Service/AI/LLM/AgentRegistryService.cs-270-274 (1)
270-274:⚠️ Potential issue | 🟡 MinorUnhandled exception when the persisted payload is malformed.
If
JsonConvert.DeserializeObject<AgentExecutionTask>throws (corrupt/legacy payload), recovery aborts and the task is neither re-queued nor dead-lettered; combined with theExecuteAsyncissue above, the whole maintenance loop can die. Guard the parse and fall through to the dead-letter path on failure.🛡️ Proposed fix
- var task = JsonConvert.DeserializeObject<AgentExecutionTask>(payload); - if (task != null) { - task.RecoveryAttempt = recoveryCount + 1; - payload = JsonConvert.SerializeObject(task); - } + try { + var task = JsonConvert.DeserializeObject<AgentExecutionTask>(payload); + if (task != null) { + task.RecoveryAttempt = recoveryCount + 1; + payload = JsonConvert.SerializeObject(task); + } + } catch (JsonException ex) { + _logger.LogError(ex, "Failed to re-serialize recovery payload for task {TaskId}", session.CurrentTaskId); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot/Service/AI/LLM/AgentRegistryService.cs` around lines 270 - 274, In AgentRegistryService (around the code calling JsonConvert.DeserializeObject<AgentExecutionTask>), guard the JSON parse with a try/catch around JsonConvert.DeserializeObject<AgentExecutionTask>(payload); on exception increment/log the error and route the message to the existing dead-letter handling path (or set a flag so the subsequent dead-letter logic runs) instead of letting the exception escape; if Deserialize succeeds keep the existing RecoveryAttempt bump and payload re-serialization, but if it fails ensure the task is not re-queued and invoke the same dead-letter/reject logic used elsewhere so malformed/corrupt payloads are properly dead-lettered.TelegramSearchBot.LLMAgent/Service/GarnetRpcClient.cs-63-83 (1)
63-83:⚠️ Potential issue | 🟡 MinorNon-atomic session write and potential NRE on nullable fields.
Two concerns:
- Each field is written via a separate
HashSetAsynccall, so concurrent readers (e.g.AgentRegistryService.GetSessionAsync) can observe a half-updated session hash. UseIDatabase.HashSetAsync(key, HashEntry[])to write all fields atomically (asAgentRegistryService.SaveSessionAsyncalready does at line 301).- If
session.CurrentTaskIdorsession.ErrorMessageisnull, the dictionary initializer throwsArgumentNullException.AgentRegistryService.SaveSessionAsyncguards both with?? string.Empty; mirror that here.♻️ Proposed fix (atomic + null-safe)
- var fields = new Dictionary<string, string> { + var entries = new[] { - ["chatId"] = session.ChatId.ToString(), - ["processId"] = session.ProcessId.ToString(), - ["port"] = session.Port.ToString(), - ["status"] = session.Status, - ["currentTaskId"] = session.CurrentTaskId, - ["startedAtUtc"] = session.StartedAtUtc.ToString("O"), - ["lastHeartbeatUtc"] = session.LastHeartbeatUtc.ToString("O"), - ["lastActiveAtUtc"] = session.LastActiveAtUtc.ToString("O"), - ["shutdownRequestedAtUtc"] = session.ShutdownRequestedAtUtc == DateTime.MinValue ? string.Empty : session.ShutdownRequestedAtUtc.ToString("O"), - ["error"] = session.ErrorMessage - }; - - foreach (var entry in fields) { - await HashSetAsync(key, entry.Key, entry.Value); - } + new HashEntry("chatId", session.ChatId), + new HashEntry("processId", session.ProcessId), + new HashEntry("port", session.Port), + new HashEntry("status", session.Status ?? string.Empty), + new HashEntry("currentTaskId", session.CurrentTaskId ?? string.Empty), + new HashEntry("startedAtUtc", session.StartedAtUtc.ToString("O")), + new HashEntry("lastHeartbeatUtc", session.LastHeartbeatUtc.ToString("O")), + new HashEntry("lastActiveAtUtc", session.LastActiveAtUtc.ToString("O")), + new HashEntry("shutdownRequestedAtUtc", session.ShutdownRequestedAtUtc == DateTime.MinValue ? string.Empty : session.ShutdownRequestedAtUtc.ToString("O")), + new HashEntry("error", session.ErrorMessage ?? string.Empty), + }; + + await Db.HashSetAsync(key, entries); await KeyExpireAsync(key, TimeSpan.FromSeconds(Math.Max(Env.AgentHeartbeatTimeoutSeconds * 2, 30)));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@TelegramSearchBot.LLMAgent/Service/GarnetRpcClient.cs` around lines 63 - 83, SaveSessionAsync currently writes each hash field with repeated await HashSetAsync calls and can throw ArgumentNullException if nullable properties like session.CurrentTaskId or session.ErrorMessage are null; change it to build a HashEntry[] (mirroring AgentRegistryService.SaveSessionAsync) with all fields, using session.CurrentTaskId ?? string.Empty and session.ErrorMessage ?? string.Empty (and any other nullable string/date formatted values guarded similarly), then call IDatabase.HashSetAsync(key, hashEntries) once to persist atomically before calling KeyExpireAsync; keep LlmAgentRedisKeys.AgentSession(key) and the existing expiration logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 03fed483-80cf-4e01-8422-10dc61f9db48
📒 Files selected for processing (46)
Docs/README_MCP.mdREADME.mdTelegramSearchBot.Common/Env.csTelegramSearchBot.Common/Model/AI/LLMProvider.csTelegramSearchBot.Common/Model/AI/LlmAgentContracts.csTelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetClientTests.csTelegramSearchBot.LLM.Test/Service/AI/LLM/GarnetRpcClientTests.csTelegramSearchBot.LLM.Test/Service/AI/LLM/ToolExecutorTests.csTelegramSearchBot.LLM.Test/TelegramSearchBot.LLM.Test.csprojTelegramSearchBot.LLMAgent/LLMAgentProgram.csTelegramSearchBot.LLMAgent/Program.csTelegramSearchBot.LLMAgent/Service/AgentLoopService.csTelegramSearchBot.LLMAgent/Service/AgentToolService.csTelegramSearchBot.LLMAgent/Service/GarnetClient.csTelegramSearchBot.LLMAgent/Service/GarnetRpcClient.csTelegramSearchBot.LLMAgent/Service/IAgentTaskExecutor.csTelegramSearchBot.LLMAgent/Service/InMemoryMessageExtensionService.csTelegramSearchBot.LLMAgent/Service/LlmServiceProxy.csTelegramSearchBot.LLMAgent/Service/ToolExecutor.csTelegramSearchBot.LLMAgent/TelegramSearchBot.LLMAgent.csprojTelegramSearchBot.SubAgent/Program.csTelegramSearchBot.SubAgent/Service/SubAgentService.csTelegramSearchBot.SubAgent/TelegramSearchBot.SubAgent.csprojTelegramSearchBot.Test/Service/AI/LLM/AgentEnvCollection.csTelegramSearchBot.Test/Service/AI/LLM/AgentIntegrationTests.csTelegramSearchBot.Test/Service/AI/LLM/AgentRegistryServiceTests.csTelegramSearchBot.Test/Service/AI/LLM/ChunkPollingServiceTests.csTelegramSearchBot.Test/Service/AI/LLM/FakeAgentTaskExecutor.csTelegramSearchBot.Test/Service/AI/LLM/InMemoryRedisTestHarness.csTelegramSearchBot.Test/Service/AI/LLM/LLMTaskQueueServiceTests.csTelegramSearchBot.Test/TelegramSearchBot.Test.csprojTelegramSearchBot.slnTelegramSearchBot/AppBootstrap/AppBootstrap.csTelegramSearchBot/AppBootstrap/LLMAgentBootstrap.csTelegramSearchBot/AppBootstrap/SubAgentBootstrap.csTelegramSearchBot/Controller/AI/LLM/AgentMonitorController.csTelegramSearchBot/Controller/AI/LLM/GeneralLLMController.csTelegramSearchBot/Controller/AI/LLM/LLMIterationCallbackController.csTelegramSearchBot/Extension/ServiceCollectionExtension.csTelegramSearchBot/Service/AI/LLM/AgentRegistryService.csTelegramSearchBot/Service/AI/LLM/ChunkPollingService.csTelegramSearchBot/Service/AI/LLM/IAgentProcessLauncher.csTelegramSearchBot/Service/AI/LLM/LLMTaskQueueService.csTelegramSearchBot/Service/AI/LLM/LlmAgentProcessLauncher.csTelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.csTelegramSearchBot/TelegramSearchBot.csproj
💤 Files with no reviewable changes (1)
- TelegramSearchBot.Common/Model/AI/LLMProvider.cs
| var task = JsonConvert.DeserializeObject<AgentExecutionTask>(payload); | ||
| if (task == null || task.ChatId != chatId) { | ||
| if (task != null) { | ||
| await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | ||
| } | ||
|
|
||
| continue; | ||
| } | ||
|
|
||
| if (await IsShutdownRequestedAsync(chatId)) { | ||
| await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | ||
| session.Status = "shutting_down"; | ||
| await _rpcClient.SaveSessionAsync(session); | ||
| break; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm enqueue helpers and BRPOP direction
rg -nP --type=cs -C2 '\b(LPushAsync|RPushAsync|BRPopAsync|ListLeftPushAsync|ListRightPushAsync)\s*\('
# Confirm every enqueue of AgentTaskQueue
rg -nP --type=cs -C2 'LlmAgentRedisKeys\.AgentTaskQueue'Repository: ModerRAS/TelegramSearchBot
Length of output: 17665
Use LPushAsync instead of RPushAsync when requeuing to AgentTaskQueue.
Tasks are enqueued with ListLeftPushAsync (left) and dequeued with BRPopAsync (right), creating a FIFO queue. Requeuing with RPushAsync pushes the payload back to the right (the dequeue end), causing the next BRPOP on this or any worker to immediately re-pop the same task. On single-worker deployments this creates a tight requeue loop at line 53 when a task's chatId doesn't match; on multi-worker deployments the shutdown path at line 60 causes LIFO behavior instead of FIFO, unfairly starving other queued work.
Use LPushAsync (available on GarnetClient) to requeue at the left, so requeued tasks are processed after current pending work.
Fix
- if (task == null || task.ChatId != chatId) {
- if (task != null) {
- await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload);
- }
- continue;
- }
-
- if (await IsShutdownRequestedAsync(chatId)) {
- await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload);
+ if (task == null || task.ChatId != chatId) {
+ if (task != null) {
+ await _garnetClient.LPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload);
+ }
+ continue;
+ }
+
+ if (await IsShutdownRequestedAsync(chatId)) {
+ await _garnetClient.LPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var task = JsonConvert.DeserializeObject<AgentExecutionTask>(payload); | |
| if (task == null || task.ChatId != chatId) { | |
| if (task != null) { | |
| await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | |
| } | |
| continue; | |
| } | |
| if (await IsShutdownRequestedAsync(chatId)) { | |
| await _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | |
| session.Status = "shutting_down"; | |
| await _rpcClient.SaveSessionAsync(session); | |
| break; | |
| } | |
| var task = JsonConvert.DeserializeObject<AgentExecutionTask>(payload); | |
| if (task == null || task.ChatId != chatId) { | |
| if (task != null) { | |
| await _garnetClient.LPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | |
| } | |
| continue; | |
| } | |
| if (await IsShutdownRequestedAsync(chatId)) { | |
| await _garnetClient.LPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload); | |
| session.Status = "shutting_down"; | |
| await _rpcClient.SaveSessionAsync(session); | |
| break; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot.LLMAgent/Service/AgentLoopService.cs` around lines 50 - 64,
The requeue uses _garnetClient.RPushAsync which pushes to the right (dequeue)
end causing immediate re-pop/requeue loops; change both requeue calls that
reference _garnetClient.RPushAsync(LlmAgentRedisKeys.AgentTaskQueue, payload)
(the one in the task ChatId mismatch branch and the one in the
IsShutdownRequestedAsync branch) to use _garnetClient.LPushAsync(...) so
requeued items go to the left and maintain FIFO behavior in AgentLoopService;
keep the surrounding logic (session.Status update and
_rpcClient.SaveSessionAsync) unchanged.
| } catch (Exception ex) { | ||
| _logger.LogError(ex, "Agent task {TaskId} failed", task.TaskId); | ||
| await _garnetClient.PublishChunkAsync(new AgentStreamChunk { | ||
| TaskId = task.TaskId, | ||
| Type = AgentChunkType.Error, | ||
| Sequence = sequence, | ||
| ErrorMessage = ex.Message | ||
| }); | ||
| await _rpcClient.SaveTaskStateAsync(task.TaskId, AgentTaskStatus.Failed, ex.Message, new Dictionary<string, string> { | ||
| ["payload"] = payload, | ||
| ["workerChatId"] = workerChatId.ToString(), | ||
| ["failedAtUtc"] = DateTime.UtcNow.ToString("O") | ||
| }); | ||
| } |
There was a problem hiding this comment.
OperationCanceledException is persisted as Failed / Error.
When cancellationToken is triggered (graceful shutdown / host stop), WithCancellation at line 127 throws OperationCanceledException, which this generic catch (Exception) then records as a failed task and publishes an Error chunk with the cancellation message. That corrupts task state and user-visible streams on legitimate shutdowns.
Suggest rethrowing cancellation (and also saving AgentTaskStatus.Cancelled if the token originated from the caller rather than host shutdown):
🛡️ Proposed fix
- } catch (Exception ex) {
+ } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) {
+ throw;
+ } catch (Exception ex) {
_logger.LogError(ex, "Agent task {TaskId} failed", task.TaskId);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } catch (Exception ex) { | |
| _logger.LogError(ex, "Agent task {TaskId} failed", task.TaskId); | |
| await _garnetClient.PublishChunkAsync(new AgentStreamChunk { | |
| TaskId = task.TaskId, | |
| Type = AgentChunkType.Error, | |
| Sequence = sequence, | |
| ErrorMessage = ex.Message | |
| }); | |
| await _rpcClient.SaveTaskStateAsync(task.TaskId, AgentTaskStatus.Failed, ex.Message, new Dictionary<string, string> { | |
| ["payload"] = payload, | |
| ["workerChatId"] = workerChatId.ToString(), | |
| ["failedAtUtc"] = DateTime.UtcNow.ToString("O") | |
| }); | |
| } | |
| } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { | |
| throw; | |
| } catch (Exception ex) { | |
| _logger.LogError(ex, "Agent task {TaskId} failed", task.TaskId); | |
| await _garnetClient.PublishChunkAsync(new AgentStreamChunk { | |
| TaskId = task.TaskId, | |
| Type = AgentChunkType.Error, | |
| Sequence = sequence, | |
| ErrorMessage = ex.Message | |
| }); | |
| await _rpcClient.SaveTaskStateAsync(task.TaskId, AgentTaskStatus.Failed, ex.Message, new Dictionary<string, string> { | |
| ["payload"] = payload, | |
| ["workerChatId"] = workerChatId.ToString(), | |
| ["failedAtUtc"] = DateTime.UtcNow.ToString("O") | |
| }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot.LLMAgent/Service/AgentLoopService.cs` around lines 171 -
184, The catch-all in AgentLoopService (catch (Exception ex) in the agent loop)
incorrectly treats OperationCanceledException as a task failure and publishes an
Error chunk; update the exception handling so OperationCanceledException is
handled separately: detect cancellation (OperationCanceledException or
token.IsCancellationRequested), do not publish a failure/error chunk for host
shutdown—rethrow the cancellation to allow graceful shutdown—and when the
cancellation originated from the caller (not host stop) save the task state as
AgentTaskStatus.Cancelled and publish an appropriate cancellation chunk instead
of Error; keep the existing logging for other exceptions and retain
publishing/saving logic in the generic exception handler.
| var response = new SubAgentTaskResult { | ||
| Success = false | ||
| }; | ||
| try { | ||
| var task = JsonConvert.DeserializeObject<SubAgentTaskEnvelope>(payload); | ||
| if (task == null) { | ||
| throw new InvalidOperationException("Invalid sub-agent payload."); | ||
| } | ||
|
|
||
| response.RequestId = task.RequestId; | ||
| response.Result = task.Type switch { | ||
| "echo" => task.Payload, | ||
| "mcp_execute" => await ExecuteMcpAsync(task, cancellationToken), | ||
| "background_task" => await ExecuteBackgroundTaskAsync(task, cancellationToken), | ||
| _ => throw new InvalidOperationException($"unsupported:{task.Type}") | ||
| }; | ||
| response.Success = true; | ||
| } catch (Exception ex) { | ||
| _logger.LogError(ex, "SubAgent task failed"); | ||
| response.ErrorMessage = ex.Message; | ||
| } | ||
|
|
||
| await _redis.GetDatabase().StringSetAsync( | ||
| LlmAgentRedisKeys.SubAgentResult(response.RequestId), | ||
| JsonConvert.SerializeObject(response), | ||
| TimeSpan.FromMinutes(5)); |
There was a problem hiding this comment.
Deserialization failures write results to an empty RequestId key and the original caller never sees the error.
response.RequestId is only assigned at line 47, after deserialization. If JsonConvert.DeserializeObject throws (malformed payload) or returns null, the catch block persists the error envelope to LlmAgentRedisKeys.SubAgentResult(""), which the waiting LLMAgent will never read — it will simply time out. Additionally, concurrent failed tasks all collide on the same empty-key entry.
Parse the RequestId first (or extract it defensively before invoking the main pipeline) so a failure always lands under the correct result key.
🛡️ Proposed adjustment
- var response = new SubAgentTaskResult {
- Success = false
- };
- try {
- var task = JsonConvert.DeserializeObject<SubAgentTaskEnvelope>(payload);
- if (task == null) {
- throw new InvalidOperationException("Invalid sub-agent payload.");
- }
-
- response.RequestId = task.RequestId;
+ var response = new SubAgentTaskResult { Success = false };
+ SubAgentTaskEnvelope? task = null;
+ try {
+ task = JsonConvert.DeserializeObject<SubAgentTaskEnvelope>(payload)
+ ?? throw new InvalidOperationException("Invalid sub-agent payload.");
+ response.RequestId = task.RequestId;
response.Result = task.Type switch {
...
};
response.Success = true;
} catch (Exception ex) {
_logger.LogError(ex, "SubAgent task failed");
+ response.RequestId ??= task?.RequestId ?? string.Empty;
response.ErrorMessage = ex.Message;
}
+ if (string.IsNullOrEmpty(response.RequestId)) {
+ continue; // Nothing to correlate to; don't write an orphan result.
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var response = new SubAgentTaskResult { | |
| Success = false | |
| }; | |
| try { | |
| var task = JsonConvert.DeserializeObject<SubAgentTaskEnvelope>(payload); | |
| if (task == null) { | |
| throw new InvalidOperationException("Invalid sub-agent payload."); | |
| } | |
| response.RequestId = task.RequestId; | |
| response.Result = task.Type switch { | |
| "echo" => task.Payload, | |
| "mcp_execute" => await ExecuteMcpAsync(task, cancellationToken), | |
| "background_task" => await ExecuteBackgroundTaskAsync(task, cancellationToken), | |
| _ => throw new InvalidOperationException($"unsupported:{task.Type}") | |
| }; | |
| response.Success = true; | |
| } catch (Exception ex) { | |
| _logger.LogError(ex, "SubAgent task failed"); | |
| response.ErrorMessage = ex.Message; | |
| } | |
| await _redis.GetDatabase().StringSetAsync( | |
| LlmAgentRedisKeys.SubAgentResult(response.RequestId), | |
| JsonConvert.SerializeObject(response), | |
| TimeSpan.FromMinutes(5)); | |
| var response = new SubAgentTaskResult { Success = false }; | |
| SubAgentTaskEnvelope? task = null; | |
| try { | |
| task = JsonConvert.DeserializeObject<SubAgentTaskEnvelope>(payload) | |
| ?? throw new InvalidOperationException("Invalid sub-agent payload."); | |
| response.RequestId = task.RequestId; | |
| response.Result = task.Type switch { | |
| "echo" => task.Payload, | |
| "mcp_execute" => await ExecuteMcpAsync(task, cancellationToken), | |
| "background_task" => await ExecuteBackgroundTaskAsync(task, cancellationToken), | |
| _ => throw new InvalidOperationException($"unsupported:{task.Type}") | |
| }; | |
| response.Success = true; | |
| } catch (Exception ex) { | |
| _logger.LogError(ex, "SubAgent task failed"); | |
| response.RequestId ??= task?.RequestId ?? string.Empty; | |
| response.ErrorMessage = ex.Message; | |
| } | |
| if (string.IsNullOrEmpty(response.RequestId)) { | |
| continue; // Nothing to correlate to; don't write an orphan result. | |
| } | |
| await _redis.GetDatabase().StringSetAsync( | |
| LlmAgentRedisKeys.SubAgentResult(response.RequestId), | |
| JsonConvert.SerializeObject(response), | |
| TimeSpan.FromMinutes(5)); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot.SubAgent/Service/SubAgentService.cs` around lines 38 - 63,
The catch writes results to an empty RequestId because response.RequestId is
only set after full deserialization; to fix, extract/set the RequestId before
running the pipeline so errors persist to the correct Redis key: parse the
payload defensively (e.g., parse to a lightweight JObject/JsonDocument or
attempt a minimal JsonConvert.DeserializeObject<dynamic>/SubAgentTaskEnvelope
partial parse) to read RequestId and assign response.RequestId immediately, then
proceed to deserialize/process (ExecuteMcpAsync, ExecuteBackgroundTaskAsync) as
now; ensure any exceptions still use the pre-populated response.RequestId when
calling LlmAgentRedisKeys.SubAgentResult.
| using var process = Process.Start(startInfo) ?? throw new InvalidOperationException("Failed to start background task process."); | ||
| using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | ||
| timeoutCts.CancelAfter(TimeSpan.FromSeconds(Math.Max(1, request.TimeoutSeconds))); | ||
|
|
||
| var stdoutTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token); | ||
| var stderrTask = process.StandardError.ReadToEndAsync(timeoutCts.Token); | ||
| await process.WaitForExitAsync(timeoutCts.Token); | ||
|
|
||
| var stdout = await stdoutTask; | ||
| var stderr = await stderrTask; | ||
| if (process.ExitCode != 0) { | ||
| throw new InvalidOperationException(string.IsNullOrWhiteSpace(stderr) | ||
| ? $"Background task exited with code {process.ExitCode}." | ||
| : stderr.Trim()); | ||
| } | ||
|
|
||
| return string.IsNullOrWhiteSpace(stdout) ? stderr.Trim() : stdout.Trim(); | ||
| } |
There was a problem hiding this comment.
Background task timeout leaks the child process.
When timeoutCts fires, WaitForExitAsync throws OperationCanceledException but the spawned process keeps running — nothing calls Kill(). On repeated timeouts this accumulates orphan processes (and the using var process block only closes the handle, it does not terminate the child).
🛡️ Proposed fix
- using var process = Process.Start(startInfo) ?? throw new InvalidOperationException("Failed to start background task process.");
- using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- timeoutCts.CancelAfter(TimeSpan.FromSeconds(Math.Max(1, request.TimeoutSeconds)));
-
- var stdoutTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token);
- var stderrTask = process.StandardError.ReadToEndAsync(timeoutCts.Token);
- await process.WaitForExitAsync(timeoutCts.Token);
+ using var process = Process.Start(startInfo) ?? throw new InvalidOperationException("Failed to start background task process.");
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timeoutCts.CancelAfter(TimeSpan.FromSeconds(Math.Max(1, request.TimeoutSeconds)));
+
+ var stdoutTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token);
+ var stderrTask = process.StandardError.ReadToEndAsync(timeoutCts.Token);
+ try {
+ await process.WaitForExitAsync(timeoutCts.Token);
+ } catch (OperationCanceledException) {
+ try { if (!process.HasExited) process.Kill(entireProcessTree: true); } catch { }
+ throw new TimeoutException($"Background task exceeded {request.TimeoutSeconds}s timeout.");
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| using var process = Process.Start(startInfo) ?? throw new InvalidOperationException("Failed to start background task process."); | |
| using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | |
| timeoutCts.CancelAfter(TimeSpan.FromSeconds(Math.Max(1, request.TimeoutSeconds))); | |
| var stdoutTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token); | |
| var stderrTask = process.StandardError.ReadToEndAsync(timeoutCts.Token); | |
| await process.WaitForExitAsync(timeoutCts.Token); | |
| var stdout = await stdoutTask; | |
| var stderr = await stderrTask; | |
| if (process.ExitCode != 0) { | |
| throw new InvalidOperationException(string.IsNullOrWhiteSpace(stderr) | |
| ? $"Background task exited with code {process.ExitCode}." | |
| : stderr.Trim()); | |
| } | |
| return string.IsNullOrWhiteSpace(stdout) ? stderr.Trim() : stdout.Trim(); | |
| } | |
| using var process = Process.Start(startInfo) ?? throw new InvalidOperationException("Failed to start background task process."); | |
| using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | |
| timeoutCts.CancelAfter(TimeSpan.FromSeconds(Math.Max(1, request.TimeoutSeconds))); | |
| var stdoutTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token); | |
| var stderrTask = process.StandardError.ReadToEndAsync(timeoutCts.Token); | |
| try { | |
| await process.WaitForExitAsync(timeoutCts.Token); | |
| } catch (OperationCanceledException) { | |
| try { if (!process.HasExited) process.Kill(entireProcessTree: true); } catch { } | |
| throw new TimeoutException($"Background task exceeded {request.TimeoutSeconds}s timeout."); | |
| } | |
| var stdout = await stdoutTask; | |
| var stderr = await stderrTask; | |
| if (process.ExitCode != 0) { | |
| throw new InvalidOperationException(string.IsNullOrWhiteSpace(stderr) | |
| ? $"Background task exited with code {process.ExitCode}." | |
| : stderr.Trim()); | |
| } | |
| return string.IsNullOrWhiteSpace(stdout) ? stderr.Trim() : stdout.Trim(); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot.SubAgent/Service/SubAgentService.cs` around lines 109 -
126, When the timeoutCts token triggers, WaitForExitAsync currently throws but
the child process is left running; modify the block around
process/timeoutCts/WaitForExitAsync so that if an OperationCanceledException (or
cancellation) occurs you call process.Kill(true) (or Kill() if targeting older
runtimes) to terminate the child (wrap in try/catch to ignore race conditions),
then await process.WaitForExitAsync without the timeout token and read the
remaining stdout/stderr tasks (or cancel them safely) before rethrowing or
returning an appropriate timeout error; update the code that references process,
timeoutCts, WaitForExitAsync, and the stdoutTask/stderrTask to ensure the
process is killed on timeout and resources are awaited/disposed reliably.
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) { | ||
| while (!stoppingToken.IsCancellationRequested) { | ||
| await RunMaintenanceOnceAsync(stoppingToken); | ||
| await Task.Delay(TimeSpan.FromSeconds(Math.Max(5, Env.AgentHeartbeatIntervalSeconds)), stoppingToken); | ||
| } | ||
| } |
There was a problem hiding this comment.
Maintenance loop lacks error handling — one unhandled Redis exception stops the hosted service.
BackgroundService.ExecuteAsync does not restart on exception; if RunMaintenanceOnceAsync throws (transient Redis failure, JSON deserialization in RecoverSessionAsync, launcher error, etc.) the entire maintenance loop dies silently and agents stop being recovered/idle-shutdown until the host restarts.
♻️ Proposed fix
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
while (!stoppingToken.IsCancellationRequested) {
- await RunMaintenanceOnceAsync(stoppingToken);
- await Task.Delay(TimeSpan.FromSeconds(Math.Max(5, Env.AgentHeartbeatIntervalSeconds)), stoppingToken);
+ try {
+ await RunMaintenanceOnceAsync(stoppingToken);
+ } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) {
+ break;
+ } catch (Exception ex) {
+ _logger.LogError(ex, "Agent maintenance cycle failed; continuing.");
+ }
+ try {
+ await Task.Delay(TimeSpan.FromSeconds(Math.Max(5, Env.AgentHeartbeatIntervalSeconds)), stoppingToken);
+ } catch (OperationCanceledException) { break; }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) { | |
| while (!stoppingToken.IsCancellationRequested) { | |
| await RunMaintenanceOnceAsync(stoppingToken); | |
| await Task.Delay(TimeSpan.FromSeconds(Math.Max(5, Env.AgentHeartbeatIntervalSeconds)), stoppingToken); | |
| } | |
| } | |
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) { | |
| while (!stoppingToken.IsCancellationRequested) { | |
| try { | |
| await RunMaintenanceOnceAsync(stoppingToken); | |
| } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { | |
| break; | |
| } catch (Exception ex) { | |
| _logger.LogError(ex, "Agent maintenance cycle failed; continuing."); | |
| } | |
| try { | |
| await Task.Delay(TimeSpan.FromSeconds(Math.Max(5, Env.AgentHeartbeatIntervalSeconds)), stoppingToken); | |
| } catch (OperationCanceledException) { break; } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/Service/AI/LLM/AgentRegistryService.cs` around lines 185 -
190, Wrap the maintenance loop in AgentRegistryService.ExecuteAsync with robust
exception handling so a single error in RunMaintenanceOnceAsync (or deeper like
RecoverSessionAsync) cannot terminate the hosted service: surround the await
RunMaintenanceOnceAsync(...) call with a try/catch that logs the full exception
via the service logger (including exception details) and optionally records
metrics, then continue the loop (possibly with a short backoff) so the
Task.Delay still runs; ensure the catch handles OperationCanceledException
properly by rethrowing when stoppingToken.IsCancellationRequested.
| private async Task<List<AgentHistoryMessage>> LoadHistoryAsync(long chatId, CancellationToken cancellationToken) { | ||
| var history = await _dbContext.Messages.AsNoTracking() | ||
| .Where(x => x.GroupId == chatId && x.DateTime > DateTime.UtcNow.AddHours(-1)) | ||
| .OrderBy(x => x.DateTime) | ||
| .ToListAsync(cancellationToken); | ||
|
|
||
| if (history.Count < 10) { | ||
| history = await _dbContext.Messages.AsNoTracking() | ||
| .Where(x => x.GroupId == chatId) | ||
| .OrderByDescending(x => x.DateTime) | ||
| .Take(10) | ||
| .OrderBy(x => x.DateTime) | ||
| .ToListAsync(cancellationToken); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find where Message.DateTime is assigned, to verify timezone kind used.
ast-grep --pattern 'new Message { $$$ DateTime = $_ $$$ }'
rg -nP --type=cs -C2 '\bMessage\b.*\bDateTime\s*=\s*'Repository: ModerRAS/TelegramSearchBot
Length of output: 52
🏁 Script executed:
# Find the Message class definition
fd -t f --name "*.cs" | xargs grep -l "class Message" | head -5Repository: ModerRAS/TelegramSearchBot
Length of output: 320
🏁 Script executed:
# Search for Message entity with simpler patterns
rg -n "class Message\s" --type=csRepository: ModerRAS/TelegramSearchBot
Length of output: 226
🏁 Script executed:
# Find where Message instances are created (broader search)
rg -n "new Message\(" --type=cs -A 3 | head -40Repository: ModerRAS/TelegramSearchBot
Length of output: 807
🏁 Script executed:
# Check for DbContext and SaveChanges patterns
rg -n "DbContext|SaveChanges" --type=cs -B 2 -A 2 | head -50Repository: ModerRAS/TelegramSearchBot
Length of output: 4702
🏁 Script executed:
# Search for any DateTime.UtcNow assignments related to Message
rg -n "DateTime\.UtcNow|DateTime\.Now" --type=cs -B 2 -A 2 | head -50Repository: ModerRAS/TelegramSearchBot
Length of output: 4450
🏁 Script executed:
# Read the Message entity class definition
cat -n "TelegramSearchBot.Database/Model/Data/Message.cs"Repository: ModerRAS/TelegramSearchBot
Length of output: 958
🏁 Script executed:
# Read MessageService.cs around line 116 to see the complete Message creation with DateTime
sed -n '110,135p' "TelegramSearchBot/Service/Storage/MessageService.cs" | cat -nRepository: ModerRAS/TelegramSearchBot
Length of output: 1607
🏁 Script executed:
# Check DataDbContext for any datetime configuration or value converters
rg -n "ValueConverter|HasDefaultValue|DateTime|OnModelCreating" --type=cs "TelegramSearchBot/Service/Database/DataDbContext.cs" -B 1 -A 2Repository: ModerRAS/TelegramSearchBot
Length of output: 161
🏁 Script executed:
# Find DataDbContext files
fd -t f "DataDbContext.cs"Repository: ModerRAS/TelegramSearchBot
Length of output: 119
🏁 Script executed:
# Search for where DateTime.Kind or Utc conversion might be handled
rg -n "DateTime.Kind|Kind.*Utc|ToUniversalTime|ToLocalTime" --type=csRepository: ModerRAS/TelegramSearchBot
Length of output: 1530
🏁 Script executed:
# Check where messageOption.DateTime comes from (the source of the DateTime value)
rg -n "messageOption\.DateTime\s*=" --type=cs -B 3 -A 1Repository: ModerRAS/TelegramSearchBot
Length of output: 52
🏁 Script executed:
# Find MessageOption class definition
rg -n "class MessageOption" --type=cs -A 20Repository: ModerRAS/TelegramSearchBot
Length of output: 1042
🏁 Script executed:
# Search for MessageOption creation/assignment
rg -n "new MessageOption" --type=cs -B 2 -A 5Repository: ModerRAS/TelegramSearchBot
Length of output: 4808
🏁 Script executed:
# Check SendMessageService.Streaming.cs where DateTime is assigned from tgMsg.Date
cat -n "TelegramSearchBot/Service/BotAPI/SendMessageService.Streaming.cs" | sed -n '320,335p'Repository: ModerRAS/TelegramSearchBot
Length of output: 826
🏁 Script executed:
# Check DataDbContext for datetime configuration
cat -n "TelegramSearchBot.Database/Model/DataDbContext.cs"Repository: ModerRAS/TelegramSearchBot
Length of output: 5228
🏁 Script executed:
# Check what Telegram.Bot.Types.Message.Date returns and its kind
rg -n "\.Date" --type=cs "TelegramSearchBot/Controller/Storage/MessageController.cs" -B 2 -A 2Repository: ModerRAS/TelegramSearchBot
Length of output: 304
🏁 Script executed:
# Search for any other DateTime comparisons with UtcNow to see scope of issue
rg -n "DateTime\.UtcNow|DateTime\.Now" --type=cs -B 1 -A 1 | grep -A 2 -B 2 "Where\|LINQ\|query"Repository: ModerRAS/TelegramSearchBot
Length of output: 2432
Verify and enforce UTC storage for Message.DateTime across all persistence paths.
This concern is valid. The codebase has inconsistent DateTime handling: SendMessageService.Streaming.cs (line 326) explicitly calls .ToUniversalTime(), but MessageController.cs (line 43) stores e.Message.Date directly without conversion. The DataDbContext has no datetime configuration to enforce UTC storage.
The pattern appears in multiple LLM services—LLMTaskQueueService.cs (line 169), AnthropicService.cs (line 145), OpenAIService.cs (line 733), and GeminiService.cs (line 81)—all executing queries like .Where(x => x.DateTime > DateTime.UtcNow.AddHours(-1)). If Message.DateTime is stored with DateTimeKind.Unspecified or local time, these queries will return incorrect results.
SchedulerService.cs (lines 153–158) already acknowledges this issue and uses DateTime.SpecifyKind() to compensate. Ensure all message persistence paths consistently store UTC and add EF Core configuration to enforce or document this requirement.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/Service/AI/LLM/LLMTaskQueueService.cs` around lines 167 -
180, LoadHistoryAsync is querying Message.DateTime against DateTime.UtcNow but
Message.DateTime is stored inconsistently; ensure all message persistence paths
convert/stamp DateTime to UTC before saving (e.g., update MessageController save
path and any SendMessageService/other save points to call ToUniversalTime or
specify DateTimeKind.Utc) and add an EF Core configuration in DataDbContext to
normalize DateTime properties to UTC on save/reading (or add a ValueConverter
for the Message.DateTime property); also remove ad-hoc fixes like
DateTime.SpecifyKind in SchedulerService and centralize UTC enforcement so
LoadHistoryAsync and other LLM services (AnthropicService, OpenAIService,
GeminiService) operate on consistently UTC timestamps.
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) { | ||
| while (!stoppingToken.IsCancellationRequested) { | ||
| var result = await _redis.GetDatabase().ExecuteAsync("BRPOP", LlmAgentRedisKeys.TelegramTaskQueue, 5); | ||
| if (result.IsNull) { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
StackExchange.Redis BRPOP blocking multiplexer recommended pattern
💡 Result:
StackExchange.Redis does not support blocking commands like BRPOP, BLPOP, or BRPOPLPUSH because they would block the entire shared connection multiplexer, stalling all other operations from concurrent callers. This is by design to enable high-throughput multiplexing via automatic pipelining. The recommended pattern for queue-like blocking behavior uses non-blocking RPOP (ListRightPop) combined with Redis pub/sub for notifications: - Producers: LPUSH (ListLeftPush) new work to the queue key (fire-and-forget), then PUBLISH an empty notification to a channel. - Consumers: Subscribe to the channel; on notification, call ListRightPop on the queue. Process if item exists; repeat or assume backlog on restart. This avoids blocking while achieving similar semantics: work buffers in the list if no consumers, only one consumer processes each item, excess consumers get notified but find nothing. Code example from official docs: Consumers: sub.Subscribe(channel, delegate { string work = db.ListRightPop(key); if (work != null) Process(work); }); Producers: db.ListLeftPush(key, newWork, flags: CommandFlags.FireAndForget); sub.Publish(channel, ""); Notes: - Pub/sub only notifies; data stays in the list. - Restarted workers should check for backlog immediately. - For reliable queues (avoid lost messages on crash), consider RPOPLPUSH to a processing list (non-blocking version supported), with separate monitoring for timeouts.
Citations:
- 1: https://github.com/StackExchange/StackExchange.Redis/blob/main/docs/PipelinesMultiplexers.md
- 2: https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html
- 3: https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers
- 4: https://stackoverflow.com/questions/25747211/stackexchange-redis-listrightpop-not-waiting-for-result
🏁 Script executed:
# Find and examine TelegramTaskConsumer.cs to confirm BRPOP usage
find . -name "TelegramTaskConsumer.cs" -type fRepository: ModerRAS/TelegramSearchBot
Length of output: 128
🏁 Script executed:
# Search for GarnetRpcClient to see the recommended pattern mentioned
find . -name "GarnetRpcClient.cs" -type fRepository: ModerRAS/TelegramSearchBot
Length of output: 125
🏁 Script executed:
# Search for how IConnectionMultiplexer is registered and used across services
rg "IConnectionMultiplexer" -A 2 -B 2 --max-count=20Repository: ModerRAS/TelegramSearchBot
Length of output: 34317
🏁 Script executed:
# Search for other services mentioned: LLMTaskQueueService, ChunkPollingService
find . -name "*LLMTaskQueueService*" -o -name "*ChunkPollingService*" -type fRepository: ModerRAS/TelegramSearchBot
Length of output: 321
🏁 Script executed:
# Read TelegramTaskConsumer.cs to see the BRPOP usage
cat -n ./TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs | head -60Repository: ModerRAS/TelegramSearchBot
Length of output: 2717
🏁 Script executed:
# Read GarnetRpcClient.cs to see WaitForTelegramResultAsync pattern
cat -n ./TelegramSearchBot.LLMAgent/Service/GarnetRpcClient.cs | head -100Repository: ModerRAS/TelegramSearchBot
Length of output: 5713
🏁 Script executed:
# Search for other BRPOP usages in codebase
rg "BRPOP" --type csRepository: ModerRAS/TelegramSearchBot
Length of output: 590
🏁 Script executed:
# Verify TelegramTaskConsumer is a BackgroundService registered for DI
rg "TelegramTaskConsumer" -B 2 -A 2 --type csRepository: ModerRAS/TelegramSearchBot
Length of output: 2176
Replace BRPOP with non-blocking RPOP + polling pattern to prevent multiplexer stalls.
_redis.GetDatabase().ExecuteAsync("BRPOP", ...) with a 5-second timeout pins the shared multiplexer for that entire duration, serializing all other Redis operations (chunk polling, task state writes, registry heartbeats, result writes) behind this single blocking call. StackExchange.Redis explicitly discourages blocking commands because the multiplexer is shared across your entire application. With TelegramTaskConsumer, ChunkPollingService, AgentRegistryService, and LLMTaskQueueService all sharing the same IConnectionMultiplexer singleton, this creates a serialization bottleneck.
Use the same pattern already proven in GarnetRpcClient.WaitForTelegramResultAsync (lines 85–99): poll with non-blocking RPOP (or StringGetAsync for keys) and sleep between polls:
while (!stoppingToken.IsCancellationRequested) {
var result = await _redis.GetDatabase().ListRightPopAsync(LlmAgentRedisKeys.TelegramTaskQueue);
if (result.IsNull) {
await Task.Delay(200, stoppingToken);
continue;
}
// process result
}Alternatively, migrate to Redis Streams or Pub/Sub, which StackExchange.Redis handles natively without blocking.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@TelegramSearchBot/Service/AI/LLM/TelegramTaskConsumer.cs` around lines 31 -
36, The ExecuteAsync loop in TelegramTaskConsumer currently calls the blocking
Redis command BRPOP via _redis.GetDatabase().ExecuteAsync which can pin the
shared multiplexer; replace this with a non-blocking pop + polling pattern (use
ListRightPopAsync or equivalent RPOP via the StackExchange.Redis API) and add a
short Task.Delay when null to avoid busy-waiting; mirror the approach used in
GarnetRpcClient.WaitForTelegramResultAsync (polling with
ListRightPopAsync/StringGetAsync and Task.Delay) so other services sharing the
IConnectionMultiplexer are not serialized by a blocking BRPOP.
Split the LLM loop into dedicated LLMAgent/SubAgent processes and wire the main process through Redis/Garnet IPC.
Includes:
Summary by CodeRabbit
New Features
Documentation