背景
fix/agent-store 分支实现了从 chrono-storage 到 Actor-Based State Management 的全量迁移。这个 issue 记录完整方案、设计决策和已知 concerns。
问题
Aevatar 平台已有完整的 Orleans virtual actor (GAgents) + event-sourcing + Protobuf serialization 基础设施,但 10+ 类业务状态通过 ChronoStorage*Store 以 JSON blob 存储在外部 chrono-storage 服务中。这违反了 CLAUDE.md 架构约束:
chrono-storage 服务不可靠,且缺乏 per-user 授权 (chrono-storage issue #1 )。Orleans actor 基础设施是平台内核,可靠性远高于外部服务依赖。
整体架构对比
迁移前(chrono-storage)
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
EP["Endpoints"] --> IF["IXxxStore"]
IF --> CS["ChronoStorageXxxStore"]
CS -->|"HTTP JSON"| BLOB["chrono-storage service\n(external, unreliable)"]
BLOB -->|"JSON blob"| S3["Object Storage"]
style BLOB fill:#f66,stroke:#333
style CS fill:#faa,stroke:#333
Loading
问题 :每次读写都是 HTTP 往返,JSON 序列化,无 per-user 隔离,服务不可靠。
迁移后(Actor-Backed + ReadModel Actor)
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
EP["Endpoints"] --> IF["IXxxStore\n(unchanged)"]
IF --> AB["ActorBackedXxxStore\n(stateless)"]
AB -->|"write: command"| GA["XxxGAgent\n(event-sourced)"]
GA -->|"SendToAsync\nstate push"| RM["XxxReadModelGAgent\n(persistent actor)"]
AB -->|"read: temp subscribe\n-> snapshot -> unsub"| RM
GA -->|"persist"| ES["Event Store\n(Orleans, local)"]
style GA fill:#6f6,stroke:#333
style RM fill:#6af,stroke:#333
style AB fill:#afa,stroke:#333
style ES fill:#6af,stroke:#333
Loading
关键 :接口不变,调用方零改动。写入状态由 Write GAgent 持有,投影快照由 ReadModel GAgent(持久化 actor)持有。ActorBackedStore 完全无状态。
方案:Interface-Preserving Actor Backend
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart TB
subgraph "Application Layer (unchanged)"
EP["Endpoints\nCoordinators\nMiddleware"]
IF["IXxxStore\ninterface"]
end
subgraph "Infrastructure Layer"
AB["ActorBackedXxxStore\n(stateless, no fields)"]
end
subgraph "ReadModel Layer"
RM["XxxReadModelGAgent\n(persistent actor,\nholds projected snapshot)"]
end
subgraph "Write Actor Layer"
GA["XxxGAgent\n(event-sourced, Protobuf state)"]
ES["Event Store\n(Orleans grain state)"]
end
EP --> IF
IF -->|"DI: singleton"| AB
AB -->|"write:\nIActorRuntime\nHandleEventAsync"| GA
GA -->|"PersistDomainEventAsync"| ES
GA -->|"SendToAsync\n(state push after commit)"| RM
AB -->|"read:\ntemp subscribe\n-> snapshot -> unsub"| RM
style EP fill:#eef,stroke:#333
style IF fill:#eef,stroke:#333
style AB fill:#afa,stroke:#333
style RM fill:#6af,stroke:#333
style GA fill:#6f6,stroke:#333
style ES fill:#6af,stroke:#333
Loading
不保留 chrono-storage fallback 。Orleans actor 是平台内核基础设施,可靠性远高于外部 chrono-storage 服务。ChronoStorage*Store 实现类应在本轮迁移中删除。
WorkflowStorageGAgent 和 ScriptStorageGAgent 是纯写入 actor ,不需要配对 ReadModel GAgent。它们的读取通过直接查询 write actor 的已提交状态完成,因为消费方与写入方同源。
迁移清单
#
接口
GAgent
优先级
状态
1
IGAgentActorStore
GAgentRegistryGAgent (per-scope)
P0
✅ 已实现
2
IStreamingProxyParticipantStore
StreamingProxyParticipantGAgent (singleton)
P0
✅ 已实现
3
IUserConfigStore
UserConfigGAgent
P1
✅ 已实现
4
INyxIdUserLlmPreferencesStore
(merged into UserConfigGAgent)
P1
✅ 已实现
5
IUserMemoryStore
UserMemoryGAgent
P1
✅ 已实现
6
IConnectorCatalogStore
ConnectorCatalogGAgent
P1
✅ 已实现
7
IRoleCatalogStore
RoleCatalogGAgent
P1
✅ 已实现
8
IChatHistoryStore
ChatConversationGAgent + ChatHistoryIndexGAgent
P2
✅ 已实现
9
IWorkflowStoragePort
WorkflowStorageGAgent (write-only, no readmodel)
P2
✅ 已实现
10
IScriptStoragePort
ScriptStorageGAgent (write-only, no readmodel)
P2
✅ 已实现
写入路径
所有写操作通过 IActorRuntime + HandleEventAsync 发送命令事件到 Write GAgent。Write GAgent 提交事件后,通过 SendToAsync 将状态更新推送到配对的 ReadModel GAgent。
%%{init: {"maxTextSize": 100000, "sequenceDiagram": {"useMaxWidth": false}}}%%
sequenceDiagram
participant C as "Caller"
participant S as "ActorBackedStore (stateless)"
participant R as "IActorRuntime"
participant A as "XxxGAgent (write)"
participant E as "Event Store"
participant RM as "XxxReadModelGAgent"
C->>S: "SaveAsync(data)"
S->>S: "Resolve scopeId"
S->>S: "Build XxxUpdatedEvent"
S->>R: "GetAsync(actorId)"
R-->>S: "IActor"
S->>A: "HandleEventAsync(EventEnvelope)"
A->>A: "[EventHandler] validates"
A->>E: "PersistDomainEventAsync"
A->>A: "TransitionState (pure fn)"
A->>RM: "SendToAsync(StateSnapshotEvent)"
RM->>RM: "Persist projected snapshot"
Loading
读取路径(ReadModel Actor Subscription)
读操作通过 per-request 临时订阅到 ReadModel GAgent 获取快照。ActorBackedStore 完全无状态,不持有任何快照缓存。
%%{init: {"maxTextSize": 100000, "sequenceDiagram": {"useMaxWidth": false}}}%%
sequenceDiagram
participant C as "Caller"
participant S as "ActorBackedStore (stateless)"
participant RM as "XxxReadModelGAgent\n(persistent actor)"
C->>S: "GetAsync()"
S->>S: "ResolveScopeId()"
S->>S: "Create method-local\nTaskCompletionSource<T>"
S->>RM: "SubscribeAsync(readModelActorId)"
Note over RM: "Actor activates if not\nalready active, replays\nfrom event store"
RM-->>S: "Publish current snapshot"
S->>S: "TCS.SetResult(snapshot)"
S->>RM: "UnsubscribeAsync()"
S-->>C: "Return snapshot from TCS"
Note over S: "No state retained between calls.\nNext GetAsync() repeats the\nsubscribe/unsubscribe cycle."
Loading
合规性 :方法内局部 TaskCompletionSource 符合 CLAUDE.md 中间层状态约束("方法内局部临时集合可用,不得提升为服务级/单例级事实状态字段")。
ReadModel Actor 模式
每个 per-user store 对应一个持久化的 ReadModel GAgent,它作为 actor 持有投影快照。ActorBackedStore 本身完全无状态(无字段、无字典、无快照缓存)。
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart TB
subgraph "ActorBackedStore (stateless, no fields)"
SR["IAppScopeResolver"]
NOTE["No state fields.\nEach read is a fresh\nsubscribe/unsubscribe cycle."]
end
subgraph "ReadModel GAgents (persistent, event-sourced)"
RM1["UserConfigReadModelGAgent\nuser-config-rm-alice\n(holds projected snapshot)"]
RM2["UserConfigReadModelGAgent\nuser-config-rm-bob\n(holds projected snapshot)"]
end
subgraph "Write GAgents (event-sourced, sole authority)"
A1["UserConfigGAgent\nuser-config-alice"]
A2["UserConfigGAgent\nuser-config-bob"]
end
SR -->|"alice: temp subscribe"| RM1
SR -->|"bob: temp subscribe"| RM2
A1 -->|"SendToAsync\nstate push"| RM1
A2 -->|"SendToAsync\nstate push"| RM2
style RM1 fill:#6af,stroke:#333
style RM2 fill:#6af,stroke:#333
style A1 fill:#afa,stroke:#333
style A2 fill:#ffa,stroke:#333
style NOTE fill:#fff,stroke:#ccc
Loading
架构特点 :
ReadModel 是持久化 actor :ReadModel GAgent 是 Orleans persistent actor,状态通过 event-sourcing 持久化。不是进程内缓存。
ActorBackedStore 完全无状态 :无 _snapshot 字段、无 ScopeState、无任何服务级/单例级状态。每次读取都是独立的 subscribe -> wait -> return -> unsubscribe 周期。
Write GAgent 推送状态 :Write GAgent 在每次状态变更提交后,通过 SendToAsync 主动将快照推送到配对的 ReadModel GAgent。推送前确保 ReadModel actor 已激活。
用户隔离 :每个 scope 有独立的 Write + ReadModel GAgent 实例对(Orleans virtual actor 天然隔离)。所有 store 的 actor ID 包含 scope({prefix}{scopeId})。
方法内局部 TaskCompletionSource :读取时的临时协调对象是方法内局部变量,符合 CLAUDE.md 中间层状态约束。
共享工具类消除重复 :ActorCommandDispatcher(统一命令分发)、ReadModelSnapshotReader(统一 per-request 临时订阅读取)、AppScopeResolverExtensions(统一 scope 解析)。
ChatHistory 事务协调 :ConversationGAgent 在 committed event 后通过 SendToAsync 内部触发 IndexGAgent 更新,store 层不直接写两个 actor(业务推进内聚)。
强类型 Protobuf :所有 actor state 使用强类型 proto sub-message,包括 ConnectorCatalog 的嵌套配置结构。无 JSON string 字段。
设计决策
D1: ChatHistory, 双 actor 架构
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart TB
AB["ActorBackedChatHistoryStore"]
subgraph "Per-User Index"
IDX["ChatHistoryIndexGAgent\nchat-index-{scopeId}"]
IDXS["State: conversation list\n+ metadata + updatedAt"]
end
subgraph "Per-Conversation"
C1["ChatConversationGAgent\nchat-{scopeId}-conv1"]
C2["ChatConversationGAgent\nchat-{scopeId}-conv2"]
C1S["State: messages[]\n(max 500)"]
C2S["State: messages[]"]
end
AB -->|"GetIndexAsync"| IDX
AB -->|"GetMessagesAsync"| C1
AB -->|"GetMessagesAsync"| C2
AB -->|"SaveMessagesAsync"| C1
AB -->|"SaveMessagesAsync"| IDX
IDX --- IDXS
C1 --- C1S
C2 --- C2S
style IDX fill:#6af,stroke:#333
style C1 fill:#6f6,stroke:#333
style C2 fill:#6f6,stroke:#333
Loading
ChatConversationGAgent:per-conversation actor, actorId = chat-{scopeId}-{conversationId}
ChatHistoryIndexGAgent:per-user index actor, actorId = chat-index-{scopeId}
消息上限 500 条/conversation
D2: GAgentRegistryGAgent, per-scope registry
D3: 读路径, ReadModel Actor 投影
原始设计建议直接读 actor state
Eng review 修正为强制走 readmodel projection
实现方式 :Write GAgent 在每次状态变更提交后,通过 SendToAsync 将快照推送到配对的 ReadModel GAgent(持久化 actor,event-sourced)。ActorBackedStore 读取时通过 per-request 临时订阅到 ReadModel GAgent(subscribe -> wait for snapshot via method-local TaskCompletionSource -> return -> unsubscribe)。
ActorBackedStore 完全无状态 :不持有任何字段级状态。方法内局部 TaskCompletionSource 符合 CLAUDE.md("方法内局部临时集合可用")。
ReadModel GAgent 是持久化 actor ,不是进程内缓存。投影快照由 actor 自身 event-sourcing 持久化。
D4: ConnectorCatalog/RoleCatalog, 混合模式
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
AB["ActorBackedConnectorCatalogStore"]
subgraph "Remote (actor-backed)"
GA["ConnectorCatalogGAgent"]
ES["Event Store"]
end
subgraph "Local (file system)"
WS["IStudioWorkspaceStore"]
FS["Local files"]
end
AB -->|"GetCatalogAsync\nSaveCatalogAsync"| GA
GA --> ES
AB -->|"ImportLocalCatalogAsync\nDraft operations"| WS
WS --> FS
style GA fill:#6f6,stroke:#333
style WS fill:#ff6,stroke:#333
Loading
Remote 持久化 -> actor
Local workspace(draft、import)-> IStudioWorkspaceStore
D5: UserMemory, eviction 在 actor 内
Category-scoped max + global cap (50) 逻辑在 TransitionState 内执行
D6: NyxIdUserLlmPreferences, 合并
不单独建 GAgent,从 UserConfigGAgent state 提取 default_model 和 preferred_route
D7: WorkflowStorage / ScriptStorage, 纯写入
WorkflowStorageGAgent 和 ScriptStorageGAgent 是纯写入 actor,不需要配对 ReadModel GAgent。
消费方与写入方同源(workflow/script engine 自身),读取通过直接查询 write actor 完成。
不存在跨 actor 读取的 readmodel 需求。
数据流总览
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart TB
subgraph "HTTP Endpoints"
STUDIO["StudioEndpoints"]
SCOPE["ScopeEndpoints"]
SP["StreamingProxyEndpoints"]
CHAT["ChatEndpoints"]
end
subgraph "IXxxStore Interfaces"
UC["IUserConfigStore"]
UM["IUserMemoryStore"]
CC["IConnectorCatalogStore"]
RC["IRoleCatalogStore"]
GA["IGAgentActorStore"]
PP["IStreamingProxyParticipantStore"]
CH["IChatHistoryStore"]
WF["IWorkflowStoragePort"]
SC["IScriptStoragePort"]
NX["INyxIdUserLlmPreferencesStore"]
end
subgraph "ActorBackedStores (stateless)"
ABUC["ActorBackedUserConfigStore"]
ABUM["ActorBackedUserMemoryStore"]
ABCC["ActorBackedConnectorCatalogStore"]
ABRC["ActorBackedRoleCatalogStore"]
ABGA["ActorBackedGAgentActorStore"]
ABPP["ActorBackedStreamingProxyParticipantStore"]
ABCH["ActorBackedChatHistoryStore"]
ABWF["ActorBackedWorkflowStoragePort"]
ABSC["ActorBackedScriptStoragePort"]
end
subgraph "ReadModel GAgents (persistent actors)"
RMUC["UserConfigReadModelGAgent"]
RMUM["UserMemoryReadModelGAgent"]
RMCC["ConnectorCatalogReadModelGAgent"]
RMRC["RoleCatalogReadModelGAgent"]
RMGA["GAgentRegistryReadModelGAgent"]
RMPP["StreamingProxyParticipantReadModelGAgent"]
RMCH["ChatReadModelGAgents"]
end
subgraph "Write GAgents (event-sourced, sole authority)"
UCG["UserConfigGAgent\nuser-config-{scope}"]
UMG["UserMemoryGAgent\nuser-memory-{scope}"]
CCG["ConnectorCatalogGAgent"]
RCG["RoleCatalogGAgent"]
GAG["GAgentRegistryGAgent\ngagent-registry-{scope}"]
PPG["StreamingProxyParticipantGAgent"]
CHC["ChatConversationGAgent\nchat-{scope}-{conv}"]
CHI["ChatHistoryIndexGAgent\nchat-index-{scope}"]
WFG["WorkflowStorageGAgent\n(write-only, no readmodel)"]
SCG["ScriptStorageGAgent\n(write-only, no readmodel)"]
end
subgraph "Media Only (temporary)"
BLOB["ChronoStorageCatalogBlobClient\n(media files only, to be replaced)"]
end
STUDIO --> UC & UM & CC & RC & NX
SCOPE --> GA & WF & SC
SP --> PP
CHAT --> CH
UC --> ABUC
NX -.->|"reads from"| ABUC
UM --> ABUM
CC --> ABCC
RC --> ABRC
GA --> ABGA
PP --> ABPP
CH --> ABCH
WF --> ABWF
SC --> ABSC
ABUC -->|"read: temp sub"| RMUC
ABUM -->|"read: temp sub"| RMUM
ABCC -->|"read: temp sub"| RMCC
ABRC -->|"read: temp sub"| RMRC
ABGA -->|"read: temp sub"| RMGA
ABPP -->|"read: temp sub"| RMPP
ABCH -->|"read: temp sub"| RMCH
ABUC -->|"write: command"| UCG
ABUM -->|"write: command"| UMG
ABCC -->|"write: command"| CCG
ABRC -->|"write: command"| RCG
ABGA -->|"write: command"| GAG
ABPP -->|"write: command"| PPG
ABCH -->|"write: command"| CHC & CHI
ABWF -->|"write: command"| WFG
ABSC -->|"write: command"| SCG
UCG -->|"SendToAsync"| RMUC
UMG -->|"SendToAsync"| RMUM
CCG -->|"SendToAsync"| RMCC
RCG -->|"SendToAsync"| RMRC
GAG -->|"SendToAsync"| RMGA
PPG -->|"SendToAsync"| RMPP
CHC -->|"SendToAsync"| RMCH
CHI -->|"SendToAsync"| RMCH
SP -.->|"media upload only"| BLOB
style BLOB fill:#faa,stroke:#333
style ABUC fill:#afa,stroke:#333
style ABUM fill:#afa,stroke:#333
style ABCC fill:#afa,stroke:#333
style ABRC fill:#afa,stroke:#333
style ABGA fill:#afa,stroke:#333
style ABPP fill:#afa,stroke:#333
style ABCH fill:#afa,stroke:#333
style ABWF fill:#afa,stroke:#333
style ABSC fill:#afa,stroke:#333
style RMUC fill:#6af,stroke:#333
style RMUM fill:#6af,stroke:#333
style RMCC fill:#6af,stroke:#333
style RMRC fill:#6af,stroke:#333
style RMGA fill:#6af,stroke:#333
style RMPP fill:#6af,stroke:#333
style RMCH fill:#6af,stroke:#333
style UCG fill:#6f6,stroke:#333
style UMG fill:#6f6,stroke:#333
style CCG fill:#6f6,stroke:#333
style RCG fill:#6f6,stroke:#333
style GAG fill:#6f6,stroke:#333
style PPG fill:#6f6,stroke:#333
style CHC fill:#6f6,stroke:#333
style CHI fill:#6af,stroke:#333
style WFG fill:#6f6,stroke:#333
style SCG fill:#6f6,stroke:#333
Loading
已知 Concerns / 待处理
1. 媒体文件暂时存储在 chrono-storage
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
EXP["ExplorerEndpoints"]
subgraph "已迁移到 Actor (删除 chrono-storage 依赖)"
BIZ["connectors.json\nroles.json\nconfig.json\nactors.json\nchat-histories"]
end
subgraph "暂留 chrono-storage (待替换)"
MEDIA["chat-media/\nimages/\nattachments/\n任意二进制文件"]
end
EXP -.->|"结构化数据\n(已不需要)"| BIZ
EXP -->|"媒体文件\n(暂时保留)"| MEDIA
MEDIA --> BLOB["ChronoStorageCatalogBlobClient"]
style BIZ fill:#6f6,stroke:#333
style MEDIA fill:#ffa,stroke:#333
style BLOB fill:#faa,stroke:#333
Loading
ChronoStorageCatalogBlobClient 仅保留用于媒体文件上传。这不是 fallback,而是 object storage 语义(二进制文件)不适合 actor state。未来应迁移到独立的 object storage 服务。
2. 不做数据迁移
新系统空起步。用户的历史配置、聊天记录、连接器列表等数据在切换后清空。这是已接受的产品行为变化。
3. Registry 扩展性
GAgentRegistryGAgent 是 per-scope 的。低频场景下安全,高频场景可能需要进一步分片。见 #154 。
4. 最终一致性
读路径通过 per-request 临时订阅到 ReadModel GAgent 获取快照。写入后到 ReadModel GAgent 接收 SendToAsync 推送有毫秒级延迟。对 UserConfig/LLM preferences 等影响实时命令执行的路径,这个延迟需要验证。ReadModel GAgent 是持久化 actor,快照不会因进程重启丢失。
5. ChronoStorage 业务状态实现已删除
10 个 ChronoStorage*Store 业务状态实现类已删除。对应的 5 个旧测试也已删除。
保留(仅用于媒体文件):
ChronoStorageCatalogBlobClient.cs
ConnectorCatalogStorageOptions.cs(blob client 配置)
新增文件
9 个 GAgent 项目 (agents/Aevatar.GAgents.*),每个包含:
Write GAgent + proto + csproj
ReadModel GAgent(8 个,Workflow/Script 是 write-only 不需要)
10 个 ActorBacked store (src/Aevatar.Studio.Infrastructure/ActorBacked/)
3 个共享工具类:
ActorCommandDispatcher.cs — 统一命令分发(消除 9 处重复)
ReadModelSnapshotReader.cs — 统一 per-request 临时订阅读取(消除 7 处重复)
AppScopeResolverExtensions.cs — 统一 scope 解析(消除 7 处重复)
DI 注册切换 (ServiceCollectionExtensions.cs)
验证
dotnet build aevatar.slnx -> 0 errors
63 个单元测试全部通过(state transition + store adapter)
所有 IXxxStore 接口保持不变,调用方零改动
Codex review 三轮共 15 findings,全部修复:scope 隔离、subscription 顺序、readmodel actor 存在性、URL defaults
Codex CLAUDE.md 合规审计:7/7 PASS(中间层状态、读写分离、Protobuf 序列化、业务推进内聚、投影管道、actor lifecycle、方法内局部)
中间层零 ConcurrentDictionary/volatile/ScopeState 违规
相关
后续工作
背景
fix/agent-store分支实现了从 chrono-storage 到 Actor-Based State Management 的全量迁移。这个 issue 记录完整方案、设计决策和已知 concerns。问题
Aevatar 平台已有完整的 Orleans virtual actor (GAgents) + event-sourcing + Protobuf serialization 基础设施,但 10+ 类业务状态通过
ChronoStorage*Store以 JSON blob 存储在外部 chrono-storage 服务中。这违反了 CLAUDE.md 架构约束:chrono-storage 服务不可靠,且缺乏 per-user 授权 (chrono-storage issue #1)。Orleans actor 基础设施是平台内核,可靠性远高于外部服务依赖。
整体架构对比
迁移前(chrono-storage)
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart LR EP["Endpoints"] --> IF["IXxxStore"] IF --> CS["ChronoStorageXxxStore"] CS -->|"HTTP JSON"| BLOB["chrono-storage service\n(external, unreliable)"] BLOB -->|"JSON blob"| S3["Object Storage"] style BLOB fill:#f66,stroke:#333 style CS fill:#faa,stroke:#333问题:每次读写都是 HTTP 往返,JSON 序列化,无 per-user 隔离,服务不可靠。
迁移后(Actor-Backed + ReadModel Actor)
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart LR EP["Endpoints"] --> IF["IXxxStore\n(unchanged)"] IF --> AB["ActorBackedXxxStore\n(stateless)"] AB -->|"write: command"| GA["XxxGAgent\n(event-sourced)"] GA -->|"SendToAsync\nstate push"| RM["XxxReadModelGAgent\n(persistent actor)"] AB -->|"read: temp subscribe\n-> snapshot -> unsub"| RM GA -->|"persist"| ES["Event Store\n(Orleans, local)"] style GA fill:#6f6,stroke:#333 style RM fill:#6af,stroke:#333 style AB fill:#afa,stroke:#333 style ES fill:#6af,stroke:#333关键:接口不变,调用方零改动。写入状态由 Write GAgent 持有,投影快照由 ReadModel GAgent(持久化 actor)持有。ActorBackedStore 完全无状态。
方案:Interface-Preserving Actor Backend
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart TB subgraph "Application Layer (unchanged)" EP["Endpoints\nCoordinators\nMiddleware"] IF["IXxxStore\ninterface"] end subgraph "Infrastructure Layer" AB["ActorBackedXxxStore\n(stateless, no fields)"] end subgraph "ReadModel Layer" RM["XxxReadModelGAgent\n(persistent actor,\nholds projected snapshot)"] end subgraph "Write Actor Layer" GA["XxxGAgent\n(event-sourced, Protobuf state)"] ES["Event Store\n(Orleans grain state)"] end EP --> IF IF -->|"DI: singleton"| AB AB -->|"write:\nIActorRuntime\nHandleEventAsync"| GA GA -->|"PersistDomainEventAsync"| ES GA -->|"SendToAsync\n(state push after commit)"| RM AB -->|"read:\ntemp subscribe\n-> snapshot -> unsub"| RM style EP fill:#eef,stroke:#333 style IF fill:#eef,stroke:#333 style AB fill:#afa,stroke:#333 style RM fill:#6af,stroke:#333 style GA fill:#6f6,stroke:#333 style ES fill:#6af,stroke:#333迁移清单
IGAgentActorStoreGAgentRegistryGAgent(per-scope)IStreamingProxyParticipantStoreStreamingProxyParticipantGAgent(singleton)IUserConfigStoreUserConfigGAgentINyxIdUserLlmPreferencesStoreIUserMemoryStoreUserMemoryGAgentIConnectorCatalogStoreConnectorCatalogGAgentIRoleCatalogStoreRoleCatalogGAgentIChatHistoryStoreChatConversationGAgent+ChatHistoryIndexGAgentIWorkflowStoragePortWorkflowStorageGAgent(write-only, no readmodel)IScriptStoragePortScriptStorageGAgent(write-only, no readmodel)写入路径
所有写操作通过
IActorRuntime+HandleEventAsync发送命令事件到 Write GAgent。Write GAgent 提交事件后,通过SendToAsync将状态更新推送到配对的 ReadModel GAgent。%%{init: {"maxTextSize": 100000, "sequenceDiagram": {"useMaxWidth": false}}}%% sequenceDiagram participant C as "Caller" participant S as "ActorBackedStore (stateless)" participant R as "IActorRuntime" participant A as "XxxGAgent (write)" participant E as "Event Store" participant RM as "XxxReadModelGAgent" C->>S: "SaveAsync(data)" S->>S: "Resolve scopeId" S->>S: "Build XxxUpdatedEvent" S->>R: "GetAsync(actorId)" R-->>S: "IActor" S->>A: "HandleEventAsync(EventEnvelope)" A->>A: "[EventHandler] validates" A->>E: "PersistDomainEventAsync" A->>A: "TransitionState (pure fn)" A->>RM: "SendToAsync(StateSnapshotEvent)" RM->>RM: "Persist projected snapshot"读取路径(ReadModel Actor Subscription)
读操作通过 per-request 临时订阅到 ReadModel GAgent 获取快照。ActorBackedStore 完全无状态,不持有任何快照缓存。
%%{init: {"maxTextSize": 100000, "sequenceDiagram": {"useMaxWidth": false}}}%% sequenceDiagram participant C as "Caller" participant S as "ActorBackedStore (stateless)" participant RM as "XxxReadModelGAgent\n(persistent actor)" C->>S: "GetAsync()" S->>S: "ResolveScopeId()" S->>S: "Create method-local\nTaskCompletionSource<T>" S->>RM: "SubscribeAsync(readModelActorId)" Note over RM: "Actor activates if not\nalready active, replays\nfrom event store" RM-->>S: "Publish current snapshot" S->>S: "TCS.SetResult(snapshot)" S->>RM: "UnsubscribeAsync()" S-->>C: "Return snapshot from TCS" Note over S: "No state retained between calls.\nNext GetAsync() repeats the\nsubscribe/unsubscribe cycle."合规性:方法内局部
TaskCompletionSource符合 CLAUDE.md 中间层状态约束("方法内局部临时集合可用,不得提升为服务级/单例级事实状态字段")。ReadModel Actor 模式
每个 per-user store 对应一个持久化的 ReadModel GAgent,它作为 actor 持有投影快照。ActorBackedStore 本身完全无状态(无字段、无字典、无快照缓存)。
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart TB subgraph "ActorBackedStore (stateless, no fields)" SR["IAppScopeResolver"] NOTE["No state fields.\nEach read is a fresh\nsubscribe/unsubscribe cycle."] end subgraph "ReadModel GAgents (persistent, event-sourced)" RM1["UserConfigReadModelGAgent\nuser-config-rm-alice\n(holds projected snapshot)"] RM2["UserConfigReadModelGAgent\nuser-config-rm-bob\n(holds projected snapshot)"] end subgraph "Write GAgents (event-sourced, sole authority)" A1["UserConfigGAgent\nuser-config-alice"] A2["UserConfigGAgent\nuser-config-bob"] end SR -->|"alice: temp subscribe"| RM1 SR -->|"bob: temp subscribe"| RM2 A1 -->|"SendToAsync\nstate push"| RM1 A2 -->|"SendToAsync\nstate push"| RM2 style RM1 fill:#6af,stroke:#333 style RM2 fill:#6af,stroke:#333 style A1 fill:#afa,stroke:#333 style A2 fill:#ffa,stroke:#333 style NOTE fill:#fff,stroke:#ccc架构特点:
_snapshot字段、无ScopeState、无任何服务级/单例级状态。每次读取都是独立的 subscribe -> wait -> return -> unsubscribe 周期。SendToAsync主动将快照推送到配对的 ReadModel GAgent。推送前确保 ReadModel actor 已激活。{prefix}{scopeId})。ActorCommandDispatcher(统一命令分发)、ReadModelSnapshotReader(统一 per-request 临时订阅读取)、AppScopeResolverExtensions(统一 scope 解析)。SendToAsync内部触发 IndexGAgent 更新,store 层不直接写两个 actor(业务推进内聚)。设计决策
D1: ChatHistory, 双 actor 架构
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart TB AB["ActorBackedChatHistoryStore"] subgraph "Per-User Index" IDX["ChatHistoryIndexGAgent\nchat-index-{scopeId}"] IDXS["State: conversation list\n+ metadata + updatedAt"] end subgraph "Per-Conversation" C1["ChatConversationGAgent\nchat-{scopeId}-conv1"] C2["ChatConversationGAgent\nchat-{scopeId}-conv2"] C1S["State: messages[]\n(max 500)"] C2S["State: messages[]"] end AB -->|"GetIndexAsync"| IDX AB -->|"GetMessagesAsync"| C1 AB -->|"GetMessagesAsync"| C2 AB -->|"SaveMessagesAsync"| C1 AB -->|"SaveMessagesAsync"| IDX IDX --- IDXS C1 --- C1S C2 --- C2S style IDX fill:#6af,stroke:#333 style C1 fill:#6f6,stroke:#333 style C2 fill:#6f6,stroke:#333ChatConversationGAgent:per-conversation actor, actorId =chat-{scopeId}-{conversationId}ChatHistoryIndexGAgent:per-user index actor, actorId =chat-index-{scopeId}D2: GAgentRegistryGAgent, per-scope registry
gagent-registry-{scopeId}D3: 读路径, ReadModel Actor 投影
SendToAsync将快照推送到配对的 ReadModel GAgent(持久化 actor,event-sourced)。ActorBackedStore 读取时通过 per-request 临时订阅到 ReadModel GAgent(subscribe -> wait for snapshot via method-local TaskCompletionSource -> return -> unsubscribe)。TaskCompletionSource符合 CLAUDE.md("方法内局部临时集合可用")。D4: ConnectorCatalog/RoleCatalog, 混合模式
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart LR AB["ActorBackedConnectorCatalogStore"] subgraph "Remote (actor-backed)" GA["ConnectorCatalogGAgent"] ES["Event Store"] end subgraph "Local (file system)" WS["IStudioWorkspaceStore"] FS["Local files"] end AB -->|"GetCatalogAsync\nSaveCatalogAsync"| GA GA --> ES AB -->|"ImportLocalCatalogAsync\nDraft operations"| WS WS --> FS style GA fill:#6f6,stroke:#333 style WS fill:#ff6,stroke:#333IStudioWorkspaceStoreD5: UserMemory, eviction 在 actor 内
TransitionState内执行D6: NyxIdUserLlmPreferences, 合并
UserConfigGAgentstate 提取default_model和preferred_routeD7: WorkflowStorage / ScriptStorage, 纯写入
WorkflowStorageGAgent和ScriptStorageGAgent是纯写入 actor,不需要配对 ReadModel GAgent。数据流总览
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart TB subgraph "HTTP Endpoints" STUDIO["StudioEndpoints"] SCOPE["ScopeEndpoints"] SP["StreamingProxyEndpoints"] CHAT["ChatEndpoints"] end subgraph "IXxxStore Interfaces" UC["IUserConfigStore"] UM["IUserMemoryStore"] CC["IConnectorCatalogStore"] RC["IRoleCatalogStore"] GA["IGAgentActorStore"] PP["IStreamingProxyParticipantStore"] CH["IChatHistoryStore"] WF["IWorkflowStoragePort"] SC["IScriptStoragePort"] NX["INyxIdUserLlmPreferencesStore"] end subgraph "ActorBackedStores (stateless)" ABUC["ActorBackedUserConfigStore"] ABUM["ActorBackedUserMemoryStore"] ABCC["ActorBackedConnectorCatalogStore"] ABRC["ActorBackedRoleCatalogStore"] ABGA["ActorBackedGAgentActorStore"] ABPP["ActorBackedStreamingProxyParticipantStore"] ABCH["ActorBackedChatHistoryStore"] ABWF["ActorBackedWorkflowStoragePort"] ABSC["ActorBackedScriptStoragePort"] end subgraph "ReadModel GAgents (persistent actors)" RMUC["UserConfigReadModelGAgent"] RMUM["UserMemoryReadModelGAgent"] RMCC["ConnectorCatalogReadModelGAgent"] RMRC["RoleCatalogReadModelGAgent"] RMGA["GAgentRegistryReadModelGAgent"] RMPP["StreamingProxyParticipantReadModelGAgent"] RMCH["ChatReadModelGAgents"] end subgraph "Write GAgents (event-sourced, sole authority)" UCG["UserConfigGAgent\nuser-config-{scope}"] UMG["UserMemoryGAgent\nuser-memory-{scope}"] CCG["ConnectorCatalogGAgent"] RCG["RoleCatalogGAgent"] GAG["GAgentRegistryGAgent\ngagent-registry-{scope}"] PPG["StreamingProxyParticipantGAgent"] CHC["ChatConversationGAgent\nchat-{scope}-{conv}"] CHI["ChatHistoryIndexGAgent\nchat-index-{scope}"] WFG["WorkflowStorageGAgent\n(write-only, no readmodel)"] SCG["ScriptStorageGAgent\n(write-only, no readmodel)"] end subgraph "Media Only (temporary)" BLOB["ChronoStorageCatalogBlobClient\n(media files only, to be replaced)"] end STUDIO --> UC & UM & CC & RC & NX SCOPE --> GA & WF & SC SP --> PP CHAT --> CH UC --> ABUC NX -.->|"reads from"| ABUC UM --> ABUM CC --> ABCC RC --> ABRC GA --> ABGA PP --> ABPP CH --> ABCH WF --> ABWF SC --> ABSC ABUC -->|"read: temp sub"| RMUC ABUM -->|"read: temp sub"| RMUM ABCC -->|"read: temp sub"| RMCC ABRC -->|"read: temp sub"| RMRC ABGA -->|"read: temp sub"| RMGA ABPP -->|"read: temp sub"| RMPP ABCH -->|"read: temp sub"| RMCH ABUC -->|"write: command"| UCG ABUM -->|"write: command"| UMG ABCC -->|"write: command"| CCG ABRC -->|"write: command"| RCG ABGA -->|"write: command"| GAG ABPP -->|"write: command"| PPG ABCH -->|"write: command"| CHC & CHI ABWF -->|"write: command"| WFG ABSC -->|"write: command"| SCG UCG -->|"SendToAsync"| RMUC UMG -->|"SendToAsync"| RMUM CCG -->|"SendToAsync"| RMCC RCG -->|"SendToAsync"| RMRC GAG -->|"SendToAsync"| RMGA PPG -->|"SendToAsync"| RMPP CHC -->|"SendToAsync"| RMCH CHI -->|"SendToAsync"| RMCH SP -.->|"media upload only"| BLOB style BLOB fill:#faa,stroke:#333 style ABUC fill:#afa,stroke:#333 style ABUM fill:#afa,stroke:#333 style ABCC fill:#afa,stroke:#333 style ABRC fill:#afa,stroke:#333 style ABGA fill:#afa,stroke:#333 style ABPP fill:#afa,stroke:#333 style ABCH fill:#afa,stroke:#333 style ABWF fill:#afa,stroke:#333 style ABSC fill:#afa,stroke:#333 style RMUC fill:#6af,stroke:#333 style RMUM fill:#6af,stroke:#333 style RMCC fill:#6af,stroke:#333 style RMRC fill:#6af,stroke:#333 style RMGA fill:#6af,stroke:#333 style RMPP fill:#6af,stroke:#333 style RMCH fill:#6af,stroke:#333 style UCG fill:#6f6,stroke:#333 style UMG fill:#6f6,stroke:#333 style CCG fill:#6f6,stroke:#333 style RCG fill:#6f6,stroke:#333 style GAG fill:#6f6,stroke:#333 style PPG fill:#6f6,stroke:#333 style CHC fill:#6f6,stroke:#333 style CHI fill:#6af,stroke:#333 style WFG fill:#6f6,stroke:#333 style SCG fill:#6f6,stroke:#333已知 Concerns / 待处理
1. 媒体文件暂时存储在 chrono-storage
%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% flowchart LR EXP["ExplorerEndpoints"] subgraph "已迁移到 Actor (删除 chrono-storage 依赖)" BIZ["connectors.json\nroles.json\nconfig.json\nactors.json\nchat-histories"] end subgraph "暂留 chrono-storage (待替换)" MEDIA["chat-media/\nimages/\nattachments/\n任意二进制文件"] end EXP -.->|"结构化数据\n(已不需要)"| BIZ EXP -->|"媒体文件\n(暂时保留)"| MEDIA MEDIA --> BLOB["ChronoStorageCatalogBlobClient"] style BIZ fill:#6f6,stroke:#333 style MEDIA fill:#ffa,stroke:#333 style BLOB fill:#faa,stroke:#333ChronoStorageCatalogBlobClient仅保留用于媒体文件上传。这不是 fallback,而是 object storage 语义(二进制文件)不适合 actor state。未来应迁移到独立的 object storage 服务。2. 不做数据迁移
新系统空起步。用户的历史配置、聊天记录、连接器列表等数据在切换后清空。这是已接受的产品行为变化。
3. Registry 扩展性
GAgentRegistryGAgent是 per-scope 的。低频场景下安全,高频场景可能需要进一步分片。见 #154。4. 最终一致性
读路径通过 per-request 临时订阅到 ReadModel GAgent 获取快照。写入后到 ReadModel GAgent 接收 SendToAsync 推送有毫秒级延迟。对 UserConfig/LLM preferences 等影响实时命令执行的路径,这个延迟需要验证。ReadModel GAgent 是持久化 actor,快照不会因进程重启丢失。
5. ChronoStorage 业务状态实现已删除
10 个
ChronoStorage*Store业务状态实现类已删除。对应的 5 个旧测试也已删除。保留(仅用于媒体文件):
ChronoStorageCatalogBlobClient.csConnectorCatalogStorageOptions.cs(blob client 配置)新增文件
agents/Aevatar.GAgents.*),每个包含:src/Aevatar.Studio.Infrastructure/ActorBacked/)ActorCommandDispatcher.cs— 统一命令分发(消除 9 处重复)ReadModelSnapshotReader.cs— 统一 per-request 临时订阅读取(消除 7 处重复)AppScopeResolverExtensions.cs— 统一 scope 解析(消除 7 处重复)ServiceCollectionExtensions.cs)验证
dotnet build aevatar.slnx-> 0 errors相关
~/.gstack/projects/aevatarAI-aevatar/chronoai-fix/agent-store-design-20260408-164250.md后续工作
删除 10 个✅ 已完成ChronoStorage*Store业务状态实现文件将 ConnectorCatalog 的 JSON string 字段拆为强类型 proto✅ 已完成实现 ReadModel GAgent 层✅ 已完成(8 个 ReadModel GAgent)提取共享工具类消除重复✅ 已完成(3 个工具类)docs/canon/,docs/decisions/)