From b50d05da82da54a9e57bbddf752bf9683a7ea7bc Mon Sep 17 00:00:00 2001 From: Jaime Still Date: Wed, 18 Feb 2026 17:53:08 -0500 Subject: [PATCH 1/2] kernel observer: root-level observability with OTel-aligned levels (#25) Promotes orchestrate/observability to a root-level package with OTel-compatible severity levels, integrates Observer into the kernel runtime loop replacing *slog.Logger, and migrates all orchestrate imports. --- .claude/CLAUDE.md | 11 +- .../guides/.archive/25-kernel-observer.md | 988 ++++++++++++++++++ .../context/sessions/25-kernel-observer.md | 61 ++ .claude/plans/gentle-growing-leaf.md | 157 +++ .claude/skills/kernel-dev/SKILL.md | 24 +- CHANGELOG.md | 23 + README.md | 3 +- _project/README.md | 11 +- _project/objective.md | 6 +- cmd/kernel/main.go | 9 +- kernel/kernel.go | 81 +- kernel/kernel_test.go | 13 +- kernel/observer.go | 14 + observability/multi.go | 26 + observability/noop.go | 8 + observability/observer.go | 72 ++ observability/observer_test.go | 255 +++++ observability/registry.go | 36 + observability/slog.go | 28 + .../examples/darpa-procurement/main.go | 2 +- .../examples/phase-02-03-state-graphs/main.go | 2 +- .../phase-04-sequential-chains/main.go | 2 +- .../phase-05-parallel-execution/main.go | 2 +- .../examples/phase-06-checkpointing/main.go | 2 +- orchestrate/observability/doc.go | 40 - orchestrate/observability/multi.go | 33 - orchestrate/observability/multi_test.go | 219 ---- orchestrate/observability/noop.go | 17 - orchestrate/observability/observer.go | 84 -- orchestrate/observability/observer_test.go | 125 --- orchestrate/observability/registry.go | 66 -- orchestrate/observability/slog.go | 69 -- orchestrate/observability/slog_test.go | 277 ----- orchestrate/state/checkpoint_test.go | 2 +- orchestrate/state/edge_test.go | 2 +- orchestrate/state/events.go | 26 + orchestrate/state/graph.go | 46 +- orchestrate/state/graph_test.go | 22 +- orchestrate/state/node_test.go | 2 +- orchestrate/state/state.go | 14 +- orchestrate/state/state_test.go | 14 +- orchestrate/workflows/chain.go | 26 +- orchestrate/workflows/chain_test.go | 26 +- orchestrate/workflows/conditional.go | 11 +- orchestrate/workflows/conditional_test.go | 8 +- orchestrate/workflows/events.go | 22 + orchestrate/workflows/integration_test.go | 2 +- orchestrate/workflows/parallel.go | 47 +- 48 files changed, 1972 insertions(+), 1064 deletions(-) create mode 100644 .claude/context/guides/.archive/25-kernel-observer.md create mode 100644 .claude/context/sessions/25-kernel-observer.md create mode 100644 .claude/plans/gentle-growing-leaf.md create mode 100644 kernel/observer.go create mode 100644 observability/multi.go create mode 100644 observability/noop.go create mode 100644 observability/observer.go create mode 100644 observability/observer_test.go create mode 100644 observability/registry.go create mode 100644 observability/slog.go delete mode 100644 orchestrate/observability/doc.go delete mode 100644 orchestrate/observability/multi.go delete mode 100644 orchestrate/observability/multi_test.go delete mode 100644 orchestrate/observability/noop.go delete mode 100644 orchestrate/observability/observer.go delete mode 100644 orchestrate/observability/observer_test.go delete mode 100644 orchestrate/observability/registry.go delete mode 100644 orchestrate/observability/slog.go delete mode 100644 orchestrate/observability/slog_test.go create mode 100644 orchestrate/state/events.go create mode 100644 orchestrate/workflows/events.go diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index da37f1f..4c5b4d6 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -33,7 +33,8 @@ kernel/ ├── _project/ # Project identity, phase, and objective context ├── core/ # Foundational types: protocol, response, config, model ├── agent/ # LLM communication: agent interface, client, providers, request, mock, registry -├── orchestrate/ # Multi-agent coordination: hub, messaging, state, workflows, observability +├── observability/ # Event-based observability: Observer, Event, Level (OTel-aligned), SlogObserver, registry +├── orchestrate/ # Multi-agent coordination: hub, messaging, state, workflows ├── memory/ # Unified context composition: Store, FileStore, Cache. Namespaces: memory/, skills/, agents/ ├── tools/ # Tool execution: global registry with Register, Execute, List ├── session/ # Conversation management: Session interface, in-memory implementation @@ -54,14 +55,14 @@ Level 1: core/response, core/model Level 2: agent/providers, agent/request, agent/client Level 3: agent (root) Level 4: agent/mock -Level 5: orchestrate/observability, orchestrate/messaging, orchestrate/config +Level 5: orchestrate/messaging, orchestrate/config Level 6: orchestrate/hub, orchestrate/state Level 7: orchestrate/workflows -Foundation (Level 0 — depend only on core/protocol): - memory, tools, session +Foundation (Level 0 — no internal dependencies): + observability, memory, tools, session -Level 8: kernel (depends on agent, session, memory, tools, core) +Level 8: kernel (depends on agent, session, memory, tools, core, observability) ``` ## Design Principles diff --git a/.claude/context/guides/.archive/25-kernel-observer.md b/.claude/context/guides/.archive/25-kernel-observer.md new file mode 100644 index 0000000..c888de7 --- /dev/null +++ b/.claude/context/guides/.archive/25-kernel-observer.md @@ -0,0 +1,988 @@ +# 25 - Kernel Observer + +## Problem Context + +The kernel's ad-hoc `*slog.Logger` and the orchestrate Observer pattern share the same integration points. The observability package is tightly coupled to `orchestrate/`, but both kernel and orchestrate need it. This promotes observability to a root-level package with OTel-compatible severity levels, integrates it into the kernel runtime loop, and migrates all orchestrate imports. + +## Architecture Approach + +- **Root-level `observability/` package** — foundation-level (Level 0), no internal dependencies +- **OTel-aligned severity levels** — Level values ARE OTel SeverityNumbers (5=DEBUG, 9=INFO, 13=WARN, 17=ERROR) +- **Level-aware SlogObserver** — maps event levels to slog levels, uses event type as log message +- **Separate `node.state` event** — state snapshots split from `node.complete` into dedicated event type +- **Decentralized event types** — each package defines its own event type constants using `observability.EventType` +- **SlogObserver default** — observable out of the box via `slog.Default()` + +## Implementation + +### Step 1: Create `observability/` package + +```bash +mkdir -p observability +``` + +#### observability/observer.go + +```go +package observability + +import ( + "context" + "log/slog" + "time" +) + +type Level int + +const ( + LevelVerbose Level = 5 + LevelInfo Level = 9 + LevelWarning Level = 13 + LevelError Level = 17 +) + +func (l Level) String() string { + switch { + case l <= 4: + return "TRACE" + case l <= 8: + return "DEBUG" + case l <= 12: + return "INFO" + case l <= 16: + return "WARN" + case l <= 20: + return "ERROR" + default: + return "FATAL" + } +} + +func (l Level) SlogLevel() slog.Level { + switch { + case l <= 8: + return slog.LevelDebug + case l <= 12: + return slog.LevelInfo + case l <= 16: + return slog.LevelWarn + default: + return slog.LevelError + } +} + +type EventType string + +type Event struct { + Type EventType + Level Level + Timestamp time.Time + Source string + Data map[string]any +} + +type Observer interface { + OnEvent(ctx context.Context, event Event) +} +``` + +#### observability/noop.go + +```go +package observability + +import "context" + +type NoOpObserver struct{} + +func (NoOpObserver) OnEvent(ctx context.Context, event Event) {} +``` + +#### observability/multi.go + +```go +package observability + +import "context" + +type MultiObserver struct { + observers []Observer +} + +func NewMultiObserver(observers ...Observer) *MultiObserver { + filtered := make([]Observer, 0, len(observers)) + for _, obs := range observers { + if obs != nil { + filtered = append(filtered, obs) + } + } + return &MultiObserver{observers: filtered} +} + +func (m *MultiObserver) OnEvent(ctx context.Context, event Event) { + for _, obs := range m.observers { + obs.OnEvent(ctx, event) + } +} +``` + +#### observability/slog.go + +```go +package observability + +import ( + "context" + "log/slog" +) + +type SlogObserver struct { + logger *slog.Logger +} + +func NewSlogObserver(logger *slog.Logger) *SlogObserver { + return &SlogObserver{logger: logger} +} + +func (o *SlogObserver) OnEvent(ctx context.Context, event Event) { + attrs := make([]slog.Attr, 0, len(event.Data)+1) + attrs = append(attrs, slog.String("source", event.Source)) + for k, v := range event.Data { + attrs = append(attrs, slog.Any(k, v)) + } + + o.logger.LogAttrs(ctx, event.Level.SlogLevel(), string(event.Type), attrs...) +} +``` + +#### observability/registry.go + +```go +package observability + +import ( + "fmt" + "log/slog" + "sync" +) + +var ( + observers = map[string]Observer{ + "noop": NoOpObserver{}, + "slog": NewSlogObserver(slog.Default()), + } + mutex sync.RWMutex +) + +func GetObserver(name string) (Observer, error) { + mutex.RLock() + defer mutex.RUnlock() + + obs, exists := observers[name] + if !exists { + return nil, fmt.Errorf("unknown observer: %s", name) + } + return obs, nil +} + +func RegisterObserver(name string, observer Observer) { + mutex.Lock() + defer mutex.Unlock() + + observers[name] = observer +} +``` + +### Step 2: Migrate orchestrate imports + +Every orchestrate file changes its import path, event type constants move from `observability/` to the emitting package, and every `observability.Event{}` literal gains a `Level` field. + +#### orchestrate/state/events.go + +New file. Event type constants for state, graph, checkpoint, and node operations: + +```go +package state + +import "github.com/tailored-agentic-units/kernel/observability" + +const ( + EventStateCreate EventType = "state.create" + EventStateClone EventType = "state.clone" + EventStateSet EventType = "state.set" + EventStateMerge EventType = "state.merge" + + EventGraphStart EventType = "graph.start" + EventGraphComplete EventType = "graph.complete" + EventNodeStart EventType = "node.start" + EventNodeComplete EventType = "node.complete" + EventNodeState EventType = "node.state" + EventEdgeEvaluate EventType = "edge.evaluate" + EventEdgeTransition EventType = "edge.transition" + EventCycleDetected EventType = "cycle.detected" + + EventCheckpointSave EventType = "checkpoint.save" + EventCheckpointLoad EventType = "checkpoint.load" + EventCheckpointResume EventType = "checkpoint.resume" +) + +type EventType = observability.EventType +``` + +The type alias `EventType = observability.EventType` lets the constants use the unqualified name while remaining the same type. + +#### orchestrate/workflows/events.go + +New file. Event type constants for chains, parallel, and conditional routing: + +```go +package workflows + +import "github.com/tailored-agentic-units/kernel/observability" + +const ( + EventChainStart EventType = "chain.start" + EventChainComplete EventType = "chain.complete" + EventStepStart EventType = "step.start" + EventStepComplete EventType = "step.complete" + + EventParallelStart EventType = "parallel.start" + EventParallelComplete EventType = "parallel.complete" + EventWorkerStart EventType = "worker.start" + EventWorkerComplete EventType = "worker.complete" + + EventRouteEvaluate EventType = "route.evaluate" + EventRouteSelect EventType = "route.select" + EventRouteExecute EventType = "route.execute" +) + +type EventType = observability.EventType +``` + +#### orchestrate/state/state.go + +Import change: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +`EventStateCreate` in `New()`: + +```go + observer.OnEvent(context.Background(), observability.Event{ + Type: EventStateCreate, + Level: observability.LevelVerbose, + Timestamp: s.Timestamp, + Source: "state", + Data: map[string]any{}, + }) +``` + +`EventStateClone` in `Clone()`: + +```go + s.Observer.OnEvent(context.Background(), observability.Event{ + Type: EventStateClone, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "state", + Data: map[string]any{"keys": len(newState.Data)}, + }) +``` + +`EventStateSet` in `Set()`: + +```go + s.Observer.OnEvent(context.Background(), observability.Event{ + Type: EventStateSet, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "state", + Data: map[string]any{"key": key}, + }) +``` + +`EventStateMerge` in `Merge()`: + +```go + s.Observer.OnEvent(context.Background(), observability.Event{ + Type: EventStateMerge, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "state", + Data: map[string]any{"keys": len(other.Data)}, + }) +``` + +#### orchestrate/state/graph.go + +Import change: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +`EventCheckpointLoad` in `Resume()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventCheckpointLoad, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": state.CheckpointNode, + "run_id": runID, + }, + }) +``` + +`EventCheckpointResume` in `Resume()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventCheckpointResume, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "checkpoint_node": state.CheckpointNode, + "resume_node": nextNode, + "run_id": runID, + }, + }) +``` + +`EventGraphStart` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventGraphStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "entry_point": g.entryPoint, + "run_id": initialState.RunID, + "exit_points": len(g.exitPoints), + }, + }) +``` + +`EventCycleDetected` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventCycleDetected, + Level: observability.LevelWarning, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "visit_count": visited[current], + "iteration": iterations, + "path_length": len(path), + }, + }) +``` + +`EventNodeStart` in `execute()` — remove `input_snapshot`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventNodeStart, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "iteration": iterations, + }, + }) +``` + +`EventNodeComplete` in `execute()` — remove `output_snapshot`, add new `EventNodeState` emission immediately after: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventNodeComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "iteration": iterations, + "error": err != nil, + }, + }) + + g.observer.OnEvent(ctx, observability.Event{ + Type: EventNodeState, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "iteration": iterations, + "input_snapshot": maps.Clone(state.Data), + "output_snapshot": maps.Clone(newState.Data), + }, + }) +``` + +`EventCheckpointSave` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventCheckpointSave, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "run_id": state.RunID, + }, + }) +``` + +`EventGraphComplete` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventGraphComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "exit_point": current, + "iterations": iterations, + "path_length": len(path), + }, + }) +``` + +`EventEdgeEvaluate` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventEdgeEvaluate, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "from": edge.From, + "to": edge.To, + "edge_index": i, + "has_predicate": edge.Predicate != nil, + }, + }) +``` + +`EventEdgeTransition` in `execute()`: + +```go + g.observer.OnEvent(ctx, observability.Event{ + Type: EventEdgeTransition, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "from": edge.From, + "to": edge.To, + "edge_index": i, + "predicate_name": edge.Name, + "predicate_result": true, + }, + }) +``` + +#### orchestrate/workflows/chain.go + +Import change: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +`EventChainStart` in `ProcessChain()`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventChainStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "item_count": len(items), + "has_progress_callback": progress != nil, + "capture_intermediate": cfg.CaptureIntermediateStates, + }, + }) +``` + +`EventChainComplete` — empty items path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventChainComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "steps_completed": 0, + "error": false, + }, + }) +``` + +`EventChainComplete` — cancellation path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventChainComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "steps_completed": i, + "error": true, + "error_type": "cancellation", + }, + }) +``` + +`EventStepStart`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventStepStart, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "step_index": i, + "total_steps": len(items), + }, + }) +``` + +`EventStepComplete` — error path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventStepComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "step_index": i, + "total_steps": len(items), + "error": true, + }, + }) +``` + +`EventChainComplete` — processor error path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventChainComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "steps_completed": i, + "error": true, + "error_type": "processor", + }, + }) +``` + +`EventStepComplete` — success path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventStepComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "step_index": i, + "total_steps": len(items), + "error": false, + }, + }) +``` + +`EventChainComplete` — success path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventChainComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessChain", + Data: map[string]any{ + "steps_completed": len(items), + "error": false, + }, + }) +``` + +#### orchestrate/workflows/conditional.go + +Import change: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +`EventRouteEvaluate`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventRouteEvaluate, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "conditional", + Data: map[string]any{ + "route_count": len(routes.Handlers), + }, + }) +``` + +`EventRouteSelect`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventRouteSelect, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "conditional", + Data: map[string]any{ + "route": route, + "has_default": routes.Default != nil, + }, + }) +``` + +`EventRouteExecute`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventRouteExecute, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "conditional", + Data: map[string]any{ + "route": route, + "error": false, + }, + }) +``` + +#### orchestrate/workflows/parallel.go + +Import change: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +`EventParallelStart` — empty items path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventParallelStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "item_count": 0, + "worker_count": 0, + "fail_fast": cfg.FailFast(), + "has_progress_callback": progress != nil, + }, + }) +``` + +`EventParallelComplete` — empty items path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventParallelComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "items_processed": 0, + "items_failed": 0, + "error": false, + }, + }) +``` + +`EventParallelStart` — normal path: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventParallelStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "item_count": len(items), + "worker_count": workerCount, + "fail_fast": cfg.FailFast(), + "has_progress_callback": progress != nil, + }, + }) +``` + +All 4 `EventParallelComplete` emissions (collector error, context cancelled, fail-fast/all-failed, success) follow the same pattern — add `Level: observability.LevelInfo,`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventParallelComplete, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "items_processed": len(results), + "items_failed": len(errors), + "error": ..., // true or false depending on path + }, + }) +``` + +`EventWorkerStart` in `processWorker()`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventWorkerStart, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "worker_id": workerID, + "item_index": work.index, + "total_items": total, + }, + }) +``` + +`EventWorkerComplete` in `processWorker()`: + +```go + observer.OnEvent(ctx, observability.Event{ + Type: EventWorkerComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "workflows.ProcessParallel", + Data: map[string]any{ + "worker_id": workerID, + "item_index": work.index, + "total_items": total, + "error": err != nil, + }, + }) +``` + +#### orchestrate/examples/ + +All 5 example programs need only the import path change. Replace: + +```go + "github.com/tailored-agentic-units/kernel/orchestrate/observability" +``` + +with: + +```go + "github.com/tailored-agentic-units/kernel/observability" +``` + +Files: +- `orchestrate/examples/phase-02-03-state-graphs/main.go` +- `orchestrate/examples/phase-04-sequential-chains/main.go` +- `orchestrate/examples/phase-05-parallel-execution/main.go` +- `orchestrate/examples/phase-06-checkpointing/main.go` +- `orchestrate/examples/darpa-procurement/main.go` + +### Step 3: Delete `orchestrate/observability/` + +```bash +rm -rf orchestrate/observability +``` + +### Step 4: Create `kernel/observer.go` + +```go +package kernel + +import "github.com/tailored-agentic-units/kernel/observability" + +const ( + EventRunStart observability.EventType = "kernel.run.start" + EventRunComplete observability.EventType = "kernel.run.complete" + EventIterationStart observability.EventType = "kernel.iteration.start" + EventToolCall observability.EventType = "kernel.tool.call" + EventToolComplete observability.EventType = "kernel.tool.complete" + EventResponse observability.EventType = "kernel.response" + EventError observability.EventType = "kernel.error" +) +``` + +### Step 5: Modify `kernel/kernel.go` + +**Imports**: Remove `"io"` and `"log/slog"`. Add `"github.com/tailored-agentic-units/kernel/observability"`. + +**Struct field**: Replace `log *slog.Logger` with `observer observability.Observer`. + +**Option**: Replace `WithLogger` with: + +```go +func WithObserver(o observability.Observer) Option { + return func(k *Kernel) { k.observer = o } +} +``` + +**Default in `New`**: Replace `slog.New(slog.NewTextHandler(io.Discard, nil))` with `observability.NewSlogObserver(slog.Default())`. Keep the `"log/slog"` import. + +**Event emissions** — replace all `k.log.*` calls: + +`Run` method — run started (line 168): +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventRunStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "prompt_length": len(prompt), + "max_iterations": k.maxIterations, + "tools": len(k.tools.List()), + }, + }) +``` + +`Run` method — iteration started (line 175): +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventIterationStart, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{"iteration": iteration + 1}, + }) +``` + +`Run` method — final response (line 198, replace `k.log.Info("run complete", ...)`): +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventResponse, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "response_length": len(result.Response), + }, + }) +``` + +`Run` method — tool call (line 210): +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventToolCall, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "name": tc.Function.Name, + }, + }) +``` + +After tool execution (after the if/else block that handles toolErr, around line 242), add tool complete event: +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventToolComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "name": tc.Function.Name, + "error": record.IsError, + }, + }) +``` + +`Run` method — max iterations (line 248): +```go + k.observer.OnEvent(ctx, observability.Event{ + Type: EventError, + Level: observability.LevelWarning, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "error": "max iterations reached", + "iterations": k.maxIterations, + }, + }) +``` + +`buildSystemContent` — memory loaded (line 287): Remove the `k.log.Debug("memory loaded", ...)` call. Memory loading is an internal implementation detail not covered by the kernel event types. + +Add `"time"` to the import list. + +### Step 6: Modify `cmd/kernel/main.go` + +Replace logger creation and `WithLogger` usage. Change: + +```go +import ( + ... + "log/slog" + ... + "github.com/tailored-agentic-units/kernel/kernel" +) +``` + +to: + +```go +import ( + ... + "log/slog" + ... + "github.com/tailored-agentic-units/kernel/kernel" + "github.com/tailored-agentic-units/kernel/observability" +) +``` + +Replace: +```go + runtime, err := kernel.New(cfg, kernel.WithLogger(logger)) +``` + +with: +```go + runtime, err := kernel.New(cfg, kernel.WithObserver(observability.NewSlogObserver(logger))) +``` + +## Validation Criteria + +- [ ] `go vet ./...` passes +- [ ] `go test ./...` passes +- [ ] `go mod tidy` produces no changes +- [ ] No `orchestrate/observability` imports remain in codebase +- [ ] Kernel event types cover all 7 loop integration points +- [ ] `WithObserver` option replaces `WithLogger` +- [ ] Default `SlogObserver` uses `slog.Default()` — observable out of the box +- [ ] SlogObserver respects event levels (Info vs Debug filtering) +- [ ] Level values match OTel SeverityNumber ranges +- [ ] `node.state` events separate from `node.complete` diff --git a/.claude/context/sessions/25-kernel-observer.md b/.claude/context/sessions/25-kernel-observer.md new file mode 100644 index 0000000..b1b550b --- /dev/null +++ b/.claude/context/sessions/25-kernel-observer.md @@ -0,0 +1,61 @@ +# 25 - Kernel Observer + +## Summary + +Promoted `orchestrate/observability/` to a root-level `observability/` package with OTel-aligned severity levels, integrated it into the kernel runtime loop replacing the ad-hoc `*slog.Logger`, and migrated all orchestrate imports. Event types are decentralized — each package defines its own constants. + +## Key Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Package location | Root-level `observability/` | Foundation package needed by both kernel and orchestrate — can't be nested under either | +| Severity levels | OTel SeverityNumbers (5, 9, 13, 17) | Zero translation for OTel collectors; maps cleanly to slog levels | +| Event type ownership | Decentralized — each package defines its own | Consistent with kernel event types in `kernel/observer.go`; each package owns its domain | +| Default observer | `SlogObserver(slog.Default())` | Observable out of the box; `WithObserver` allows override | +| State snapshots | Separate `node.state` event | Keeps `node.complete` clean; event type is discriminator for snapshot filtering | +| Level filtering | slog handler responsibility | Observer translates levels; handler filters — single responsibility | + +## Files Modified + +- `observability/observer.go` — Core types: Observer, Event, EventType, Level (OTel-aligned) +- `observability/noop.go` — NoOpObserver +- `observability/multi.go` — MultiObserver (fan-out) +- `observability/slog.go` — Level-aware SlogObserver +- `observability/registry.go` — Global observer registry +- `observability/observer_test.go` — Full test suite (100% coverage) +- `orchestrate/state/events.go` — State/graph/checkpoint event constants +- `orchestrate/workflows/events.go` — Chain/parallel/conditional event constants +- `orchestrate/state/state.go` — Import migration + Level field +- `orchestrate/state/graph.go` — Import migration + Level field + node.state event +- `orchestrate/workflows/chain.go` — Import migration + Level field +- `orchestrate/workflows/conditional.go` — Import migration + Level field +- `orchestrate/workflows/parallel.go` — Import migration + Level field +- `orchestrate/examples/*/main.go` — Import migration (5 files) +- `orchestrate/state/*_test.go` — Import + event type reference updates (5 files) +- `orchestrate/workflows/*_test.go` — Import + event type reference updates (3 files) +- `kernel/observer.go` — Kernel event type constants +- `kernel/kernel.go` — Observer replaces slog logger +- `kernel/kernel_test.go` — TestWithObserver replaces TestWithLogger +- `cmd/kernel/main.go` — WithObserver replaces WithLogger +- `_project/README.md` — Architecture, topology, hierarchy updates +- `_project/objective.md` — Status update, known gaps section +- `.claude/CLAUDE.md` — Structure and hierarchy updates +- `.claude/skills/kernel-dev/SKILL.md` — Package responsibilities and hierarchy updates +- `README.md` — Subsystem descriptions +- Deleted: `orchestrate/observability/` (entire directory) + +## Patterns Established + +- **OTel-aligned severity**: Level values ARE OTel SeverityNumbers — no translation layer needed +- **Decentralized event types**: Each package defines `observability.EventType` constants for its domain +- **Type alias for event constants**: `type EventType = observability.EventType` in event files allows unqualified constant names +- **Level-based log filtering**: Info for lifecycle boundaries, Verbose for execution details, Warning for anomalies +- **Separate state snapshot events**: `node.state` splits from `node.complete` for clean observer filtering + +## Validation Results + +- `go vet ./...` — passes +- `go test ./...` — all 18 test packages pass +- `go mod tidy` — no changes +- `observability/` coverage: 100% +- No `orchestrate/observability` imports remain in `.go` files diff --git a/.claude/plans/gentle-growing-leaf.md b/.claude/plans/gentle-growing-leaf.md new file mode 100644 index 0000000..5ec05f2 --- /dev/null +++ b/.claude/plans/gentle-growing-leaf.md @@ -0,0 +1,157 @@ +# Plan: Issue #25 — Kernel Observer + +## Context + +The kernel's `*slog.Logger` and the orchestrate Observer pattern share the same integration points. The observability package is currently tightly coupled to `orchestrate/`, but both kernel and orchestrate need it. This issue promotes observability to a root-level package with OTel-compatible severity levels, integrates it into the kernel, and migrates orchestrate imports. + +## Architecture: Root-Level `observability/` + +### Event with OTel-aligned severity + +```go +type Level int + +const ( + LevelVerbose Level = 5 // OTel DEBUG (5-8), slog.Debug + LevelInfo Level = 9 // OTel INFO (9-12), slog.Info + LevelWarning Level = 13 // OTel WARN (13-16), slog.Warn + LevelError Level = 17 // OTel ERROR (17-20), slog.Error +) +``` + +Level values ARE OTel SeverityNumbers — zero translation for OTel collectors. `Level.SlogLevel()` maps to Go's slog levels. `Level.String()` returns OTel severity text ("DEBUG", "INFO", "WARN", "ERROR"). + +```go +type Event struct { + Type EventType // OTel EventName + Level Level // OTel SeverityNumber + Timestamp time.Time // OTel Timestamp + Source string // OTel InstrumentationScope + Data map[string]any // OTel Attributes +} +``` + +### SlogObserver (level-aware) + +Uses `event.Level.SlogLevel()` to emit at the correct slog level. Flattens `Data` keys as top-level slog attributes. Uses `string(event.Type)` as the log message. + +``` +time=... level=INFO msg=kernel.run.start source=kernel.Run prompt_length=42 tools=3 +time=... level=DEBUG msg=kernel.tool.call source=kernel.Run name=greet iteration=1 +``` + +### Log readability via levels + +The primary readability improvement is level-based filtering: + +- **Info** consumers see lifecycle boundaries only (run start/complete, graph start/complete, chain start/complete) +- **Verbose** consumers see execution details (iterations, tool calls, node/edge/step events) +- State snapshots are separated into dedicated `node.state` events at Verbose — observers can filter by type within the Verbose tier + +### Kernel event types (defined in `kernel/observer.go`) + +| Constant | Value | Level | +|----------|-------|-------| +| `EventRunStart` | `kernel.run.start` | Info | +| `EventRunComplete` | `kernel.run.complete` | Info | +| `EventIterationStart` | `kernel.iteration.start` | Verbose | +| `EventToolCall` | `kernel.tool.call` | Verbose | +| `EventToolComplete` | `kernel.tool.complete` | Verbose | +| `EventResponse` | `kernel.response` | Info | +| `EventError` | `kernel.error` | Warning | + +### Orchestrate event level assignments + +| Category | Events | Level | +|----------|--------|-------| +| Lifecycle boundaries | graph.start/complete, chain.start/complete, parallel.start/complete, checkpoint.* | Info | +| Internal execution | state.*, node.start/complete, node.state, edge.*, step.*, worker.*, route.* | Verbose | +| Anomalies | cycle.detected | Warning | + +### New event type: `node.state` + +Separates state snapshots from `node.complete`: +- **`node.complete`** — clean metadata: node name, iteration, error status (no snapshots) +- **`node.state`** — full state snapshot: emitted after `node.complete`, carries `input_snapshot` and `output_snapshot` + +Both at LevelVerbose. The event type is the discriminator for observers that want to filter snapshots. + +## Implementation Steps + +### Step 1: Create `observability/` package + +New root-level package with these files: + +| File | Contents | +|------|----------| +| `observability/observer.go` | Observer interface, Event struct (with Level), EventType, Level type (OTel-aligned constants, String(), SlogLevel()) | +| `observability/events.go` | All orchestrate event type constants (moved from orchestrate/observability/observer.go) + new `EventNodeState` | +| `observability/noop.go` | NoOpObserver (zero-cost discard) | +| `observability/multi.go` | MultiObserver (fan-out to multiple observers) | +| `observability/slog.go` | SlogObserver (level-aware, flattened attrs) | +| `observability/registry.go` | Global observer registry (GetObserver, RegisterObserver) | + +### Step 2: Migrate orchestrate imports + +Update all orchestrate source and test files: + +**Source** (import path change + add Level to event emissions): +- `orchestrate/state/state.go` — 4 events, all LevelVerbose +- `orchestrate/state/graph.go` — 10 existing events + 1 new `node.state` event. Remove `input_snapshot`/`output_snapshot` from `node.start`/`node.complete`, add new `node.state` emission after `node.complete` +- `orchestrate/workflows/chain.go` — 5 events, Info for lifecycle, Verbose for steps +- `orchestrate/workflows/conditional.go` — 3 events, all LevelVerbose +- `orchestrate/workflows/parallel.go` — 4 events, Info for lifecycle, Verbose for workers + +**Tests** (import path change + add Level to expected events): +- `orchestrate/state/state_test.go`, `graph_test.go`, `edge_test.go`, `node_test.go`, `checkpoint_test.go` +- `orchestrate/workflows/chain_test.go`, `conditional_test.go`, `integration_test.go` + +**Examples** (import path change): +- `orchestrate/examples/*/main.go` (5 example programs) + +### Step 3: Delete `orchestrate/observability/` + +Remove the entire `orchestrate/observability/` directory. + +### Step 4: Create `kernel/observer.go` + +Kernel event type constants using `observability.EventType`. No kernel-specific SlogObserver needed — the root-level one handles levels correctly. + +### Step 5: Modify `kernel/kernel.go` + +- Replace `log *slog.Logger` → `observer observability.Observer` +- Replace `WithLogger` → `WithObserver` +- Default: `observability.NewSlogObserver(slog.Default())` (observable out of the box) +- Replace all `k.log.*` calls → `k.observer.OnEvent(ctx, ...)` +- Remove `"io"` and `"log/slog"` imports, add `"github.com/tailored-agentic-units/kernel/observability"` + +### Step 6: Modify `cmd/kernel/main.go` + +- Replace `kernel.WithLogger(logger)` → `kernel.WithObserver(observability.NewSlogObserver(logger))` + +## Dependency Hierarchy Change + +``` +Before: + orchestrate/observability → (none) + orchestrate/state → orchestrate/observability + kernel → agent, session, memory, tools, core + +After: + observability → (none) ← new Level 0 foundation package + orchestrate/state → observability ← import path change + kernel → agent, session, memory, tools, core, observability ← new dependency +``` + +## Validation + +- [ ] `go vet ./...` passes +- [ ] `go test ./...` passes +- [ ] `go mod tidy` produces no changes +- [ ] No `orchestrate/observability` imports remain +- [ ] Kernel event types cover all 7 loop integration points +- [ ] `WithObserver` option works, `WithLogger` removed +- [ ] Default `SlogObserver` uses `slog.Default()` +- [ ] SlogObserver respects event levels (Info vs Debug filtering works) +- [ ] Level values match OTel SeverityNumber ranges +- [ ] `node.state` events separate from `node.complete` (no inline snapshots) diff --git a/.claude/skills/kernel-dev/SKILL.md b/.claude/skills/kernel-dev/SKILL.md index df90af4..7eb445f 100644 --- a/.claude/skills/kernel-dev/SKILL.md +++ b/.claude/skills/kernel-dev/SKILL.md @@ -31,16 +31,15 @@ Level 1: core/response, core/model (depends on Level 0) Level 2: agent/providers, agent/request (depends on Level 0-1) Level 3: agent (root), agent/client (depends on Level 0-2) Level 4: agent/mock (depends on Level 0-3) -Level 5: orchestrate/observability (zero internal deps) -Level 6: orchestrate/messaging, orchestrate/config (Level 5) -Level 7: orchestrate/hub (depends on Level 3-6) -Level 8: orchestrate/state (depends on Level 5-6) -Level 9: orchestrate/workflows (depends on Level 5-8) +Level 5: orchestrate/messaging, orchestrate/config +Level 6: orchestrate/hub (depends on Level 3-5) +Level 7: orchestrate/state (depends on observability, Level 5) +Level 8: orchestrate/workflows (depends on observability, Level 5-7) -Foundation (Level 0 — depend only on core/protocol): - memory, tools, session +Foundation (Level 0 — no internal dependencies): + observability, memory, tools, session -Level 10: kernel (depends on agent, session, memory, tools, core) +Level 9: kernel (depends on agent, session, memory, tools, core, observability) ``` Dependencies only flow downward. Never import a higher-level package from a lower-level one. @@ -61,13 +60,13 @@ Dependencies only flow downward. Never import a higher-level package from a lowe | `orchestrate/config` | Orchestration config | `HubConfig`, `GraphConfig` | | `orchestrate/hub` | Agent coordination | `Hub` | | `orchestrate/messaging` | Message structures | `Message`, builders | -| `orchestrate/observability` | Execution tracing | `Observer` | +| `observability` | Event-based observability | `Observer`, `Event`, `Level`, `SlogObserver` | | `orchestrate/state` | State graphs, checkpoints | `State`, `Graph`, `CheckpointStore` | | `orchestrate/workflows` | Workflow patterns | `ProcessChain`, `ProcessParallel`, `ProcessConditional` | | `memory` | Context composition pipeline | `Store`, `Cache`, `Entry`, `NewFileStore`, `NewCache` | | `tools` | Tool execution and registry | `Handler`, `Result`, `Register`, `Execute`, `List` | | `session` | Conversation management | `Session`, `NewMemorySession` | -| `kernel` | Agent runtime loop | `Kernel`, `Config`, `Result`, `ToolExecutor`, `WithLogger` | +| `kernel` | Agent runtime loop | `Kernel`, `Config`, `Result`, `ToolExecutor`, `WithObserver` | ## Extension Patterns @@ -80,9 +79,10 @@ Dependencies only flow downward. Never import a higher-level package from a lowe ### Adding an Observer -1. Create new file in `orchestrate/observability/` +1. Create new file in `observability/` 2. Implement `Observer` interface -3. Add tests alongside implementation +3. Register in `observability/registry.go` +4. Add tests alongside implementation ### Adding a Workflow Pattern diff --git a/CHANGELOG.md b/CHANGELOG.md index c8ea0c3..599b437 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +## v0.1.0-dev.2.25 + +### observability + +- Add root-level `observability` package with OTel-aligned severity levels (#25) +- Add `Observer` interface, `Event` struct, and `Level` type with OTel SeverityNumber values (#25) +- Add `SlogObserver` with level-aware log emission and flattened attributes (#25) +- Add `NoOpObserver`, `MultiObserver`, and global observer registry (#25) + +### orchestrate + +- Migrate `orchestrate/observability` to root-level `observability` package (#25) +- Add `Level` field to all event emissions with Info/Verbose/Warning assignments (#25) +- Add `node.state` event type separating state snapshots from `node.complete` (#25) +- Decentralize event type constants into `orchestrate/state/events.go` and `orchestrate/workflows/events.go` (#25) + +### kernel + +- Replace `*slog.Logger` with `observability.Observer` in kernel runtime loop (#25) +- Add `WithObserver` option replacing `WithLogger` (#25) +- Add kernel event type constants: run, iteration, tool call, response, error (#25) +- Default to `SlogObserver(slog.Default())` for out-of-the-box observability (#25) + ## v0.1.0-dev.2.24 ### agent diff --git a/README.md b/README.md index 99029d9..55f58a7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ github.com/tailored-agentic-units/kernel |---------|-------------| | `core/` | Foundational type vocabulary: protocol constants, response types, configuration, model | | `agent/` | LLM communication: agent interface, HTTP client, providers (Ollama, Azure), request construction, named agent registry | -| `orchestrate/` | Multi-agent coordination: hubs, messaging, state graphs, workflow patterns, observability | +| `observability/` | Event-based observability: Observer, Event, Level (OTel-aligned), SlogObserver, registry | +| `orchestrate/` | Multi-agent coordination: hubs, messaging, state graphs, workflow patterns | | `memory/` | Unified context composition: Store interface, FileStore, Cache. Namespaces: `memory/`, `skills/`, `agents/` | | `tools/` | Tool execution: global registry with Register, Execute, List | | `session/` | Conversation management: Session interface, in-memory implementation | diff --git a/_project/README.md b/_project/README.md index 263d37c..520bcf1 100644 --- a/_project/README.md +++ b/_project/README.md @@ -16,6 +16,7 @@ The kernel consolidates all TAU subsystems into a single Go module: - **session** — Conversation history and context management - **memory** — Unified context composition: persistent memory, skills, agent profiles - **mcp** — MCP client with transport abstraction +- **observability** — Event-based observability with OTel-aligned severity levels - **orchestrate** — Multi-agent coordination and workflow patterns - **kernel** — Agent runtime: closed-loop processing with ConnectRPC interface @@ -40,20 +41,22 @@ Extension ecosystem (external services connecting through the interface): |-----------|--------|------------|--------| | **core** | Foundational types: Protocol, Message, Response, Config, Model | uuid | Complete | | **agent** | LLM client: Agent, Client, Provider, Request, Mock, Registry | core | Complete | -| **orchestrate** | Coordination: Hub, State, Workflows, Observability, Checkpoint | agent | Complete | +| **observability** | Event-based observability: Observer, Event, Level (OTel-aligned), SlogObserver, registry | *(none)* | Complete | +| **orchestrate** | Coordination: Hub, State, Workflows, Checkpoint | agent, observability | Complete | | **memory** | Unified context composition: Store interface, FileStore, Cache. Namespaces: `memory/`, `skills/`, `agents/` | *(none)* | Complete | | **tools** | Tool system: global registry with Register, Execute, List | core | Complete | | **session** | Conversation management: Session interface, in-memory implementation | core | Complete | | **mcp** | MCP client: transport abstraction, tool discovery, stdio/SSE | tools | Skeleton | -| **kernel** | Agent runtime: agentic loop, config-driven initialization, CLI entry point | all above | Complete | +| **kernel** | Agent runtime: agentic loop, config-driven initialization, observer integration | all above | Complete | ## Dependency Hierarchy - **kernel** — all subsystems below - - **orchestrate** — agent + - **orchestrate** — agent, observability - **agent** — core - **mcp** — tools - **tools** — core + - **observability** — *(no internal dependencies)* - **memory** — *(no internal dependencies)* - **session** — core - **core** — *(external: uuid)* @@ -62,7 +65,7 @@ Key properties: - **Acyclic**: No circular dependencies at any level - **Shallow**: Maximum depth of 3 (kernel → mcp → tools → core) -- **Independent foundations**: memory has zero internal dependencies; tools and session depend only on core types +- **Independent foundations**: observability and memory have zero internal dependencies; tools and session depend only on core types - **Clean separation**: Each subsystem owns a single domain with no overlap - **Enforced by Go**: Import rules and the type system enforce boundaries within the single module diff --git a/_project/objective.md b/_project/objective.md index 0d81370..e0057e4 100644 --- a/_project/objective.md +++ b/_project/objective.md @@ -13,7 +13,7 @@ Establish the kernel's HTTP interface — the sole extensibility boundary throug |---|-------|--------| | 23 | Streaming tools protocol | PR #30 | | 24 | Agent registry | PR #31 | -| 25 | Kernel observer | Open | +| 25 | Kernel observer | In Progress | | 26 | Multi-session kernel | Open | | 27 | HTTP API with SSE streaming | Open | | 28 | Server entry point | Open | @@ -33,6 +33,10 @@ Establish the kernel's HTTP interface — the sole extensibility boundary throug [#28: Server Entry Point] ``` +## Known Gaps + +- **Subsystem observability** — foundation packages (`memory`, `tools`, `session`) should accept an Observer and define their own event types, similar to `orchestrate`. The kernel would pass its observer down during initialization. The kernel's `memory loaded` log was removed in #25 because the kernel shouldn't log on behalf of a subsystem — when `memory` gets its own Observer, it would emit a `memory.load` event at the appropriate level. Follow-up to #25. + ## Architecture Decisions - **Agent registry is kernel infrastructure** — named agents (model-aligned: qwen3-8b, llava-13b, gpt-5), capability querying. Instance-owned, not global. The `memory/agents/` namespace is reserved for subagent profile content. diff --git a/cmd/kernel/main.go b/cmd/kernel/main.go index 609d6cb..7c9ed75 100644 --- a/cmd/kernel/main.go +++ b/cmd/kernel/main.go @@ -10,6 +10,7 @@ import ( "os/signal" "github.com/tailored-agentic-units/kernel/kernel" + "github.com/tailored-agentic-units/kernel/observability" ) func main() { @@ -57,7 +58,13 @@ func main() { registerBuiltinTools() - runtime, err := kernel.New(cfg, kernel.WithLogger(logger)) + runtime, err := kernel.New( + cfg, + kernel.WithObserver( + observability.NewSlogObserver(logger), + ), + ) + if err != nil { log.Fatalf("Failed to create kernel runtime: %v", err) } diff --git a/kernel/kernel.go b/kernel/kernel.go index 5922ac4..92b57c4 100644 --- a/kernel/kernel.go +++ b/kernel/kernel.go @@ -12,12 +12,13 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" + "time" "github.com/tailored-agentic-units/kernel/agent" "github.com/tailored-agentic-units/kernel/core/protocol" "github.com/tailored-agentic-units/kernel/memory" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/session" "github.com/tailored-agentic-units/kernel/tools" ) @@ -82,9 +83,9 @@ func WithMemoryStore(s memory.Store) Option { return func(k *Kernel) { k.store = s } } -// WithLogger overrides the default discard logger for runtime observability. -func WithLogger(l *slog.Logger) Option { - return func(k *Kernel) { k.log = l } +// WithObserver overrides the default SlogObserver. +func WithObserver(o observability.Observer) Option { + return func(k *Kernel) { k.observer = o } } // Kernel is the single-agent runtime that executes the agentic loop. @@ -94,7 +95,7 @@ type Kernel struct { session session.Session store memory.Store tools ToolExecutor - log *slog.Logger + observer observability.Observer maxIterations int systemPrompt string } @@ -125,13 +126,15 @@ func New(cfg *Config, opts ...Option) (*Kernel, error) { } } + observer := observability.NewSlogObserver(slog.Default()) + k := &Kernel{ agent: a, registry: reg, session: sesh, store: store, + observer: observer, tools: globalToolExecutor{}, - log: slog.New(slog.NewTextHandler(io.Discard, nil)), maxIterations: cfg.MaxIterations, systemPrompt: cfg.SystemPrompt, } @@ -165,14 +168,30 @@ func (k *Kernel) Run(ctx context.Context, prompt string) (*Result, error) { return result, err } - k.log.Info("run started", "prompt_length", len(prompt), "max_iterations", k.maxIterations, "tools", len(k.tools.List())) + k.observer.OnEvent(ctx, observability.Event{ + Type: EventRunStart, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "prompt_length": len(prompt), + "max_iterations": k.maxIterations, + "tools": len(k.tools.List()), + }, + }) for iteration := 0; k.maxIterations == 0 || iteration < k.maxIterations; iteration++ { if err := ctx.Err(); err != nil { return result, err } - k.log.Debug("iteration started", "iteration", iteration+1) + k.observer.OnEvent(ctx, observability.Event{ + Type: EventIterationStart, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{"iteration": iteration + 1}, + }) messages := k.buildMessages(systemContent) @@ -195,7 +214,16 @@ func (k *Kernel) Run(ctx context.Context, prompt string) (*Result, error) { result.Response = choice.Message.Content result.Iterations = iteration + 1 - k.log.Info("run complete", "iterations", iteration+1, "response_length", len(result.Response)) + k.observer.OnEvent(ctx, observability.Event{ + Type: EventResponse, + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "response_length": len(result.Response), + }, + }) return result, nil } @@ -207,7 +235,16 @@ func (k *Kernel) Run(ctx context.Context, prompt string) (*Result, error) { }) for _, tc := range choice.Message.ToolCalls { - k.log.Debug("tool call", "iteration", iteration+1, "name", tc.Function.Name) + k.observer.OnEvent(ctx, observability.Event{ + Type: EventToolCall, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "name": tc.Function.Name, + }, + }) record := ToolCallRecord{ ToolCall: tc, @@ -239,13 +276,34 @@ func (k *Kernel) Run(ctx context.Context, prompt string) (*Result, error) { record.IsError = toolResult.IsError } + k.observer.OnEvent(ctx, observability.Event{ + Type: EventToolComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "iteration": iteration + 1, + "name": tc.Function.Name, + "error": record.IsError, + }, + }) + result.ToolCalls = append(result.ToolCalls, record) } result.Iterations = iteration + 1 } - k.log.Warn("max iterations reached", "iterations", k.maxIterations) + k.observer.OnEvent(ctx, observability.Event{ + Type: EventError, + Level: observability.LevelWarning, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "error": "max iterations reached", + "iterations": k.maxIterations, + }, + }) return result, ErrMaxIterations } @@ -284,7 +342,6 @@ func (k *Kernel) buildSystemContent(ctx context.Context) (string, error) { } for _, entry := range entries { - k.log.Debug("memory loaded", "key", entry.Key, "bytes", len(entry.Value)) content += "\n\n" + string(entry.Value) } diff --git a/kernel/kernel_test.go b/kernel/kernel_test.go index fa84c42..e612f88 100644 --- a/kernel/kernel_test.go +++ b/kernel/kernel_test.go @@ -17,6 +17,7 @@ import ( "github.com/tailored-agentic-units/kernel/core/response" "github.com/tailored-agentic-units/kernel/kernel" "github.com/tailored-agentic-units/kernel/memory" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/tools" ) @@ -745,7 +746,7 @@ func TestRun_UnlimitedIterations(t *testing.T) { } } -func TestWithLogger(t *testing.T) { +func TestWithObserver(t *testing.T) { agent := newSequentialAgent( []*response.ToolsResponse{makeFinalResponse("ok")}, nil, @@ -760,7 +761,7 @@ func TestWithLogger(t *testing.T) { kernel.WithAgent(agent), kernel.WithSession(newTestSession()), kernel.WithToolExecutor(&mockToolExecutor{}), - kernel.WithLogger(logger), + kernel.WithObserver(observability.NewSlogObserver(logger)), ) if err != nil { t.Fatalf("New failed: %v", err) @@ -772,11 +773,11 @@ func TestWithLogger(t *testing.T) { } output := buf.String() - if !strings.Contains(output, "run started") { - t.Error("expected 'run started' log entry") + if !strings.Contains(output, "kernel.run.start") { + t.Error("expected 'kernel.run.start' log entry") } - if !strings.Contains(output, "run complete") { - t.Error("expected 'run complete' log entry") + if !strings.Contains(output, "kernel.response") { + t.Error("expected 'kernel.response' log entry") } } diff --git a/kernel/observer.go b/kernel/observer.go new file mode 100644 index 0000000..200ce9f --- /dev/null +++ b/kernel/observer.go @@ -0,0 +1,14 @@ +package kernel + +import "github.com/tailored-agentic-units/kernel/observability" + +// Kernel event types emitted during the agentic loop. +const ( + EventRunStart observability.EventType = "kernel.run.start" + EventRunComplete observability.EventType = "kernel.run.complete" + EventIterationStart observability.EventType = "kernel.iteration.start" + EventToolCall observability.EventType = "kernel.tool.call" + EventToolComplete observability.EventType = "kernel.tool.complete" + EventResponse observability.EventType = "kernel.response" + EventError observability.EventType = "kernel.error" +) diff --git a/observability/multi.go b/observability/multi.go new file mode 100644 index 0000000..e99b8d6 --- /dev/null +++ b/observability/multi.go @@ -0,0 +1,26 @@ +package observability + +import "context" + +// MultiObserver fans out events to multiple observers. +type MultiObserver struct { + observers []Observer +} + +// NewMultiObserver creates a MultiObserver that forwards events to all +// non-nil observers. +func NewMultiObserver(observers ...Observer) *MultiObserver { + filtered := make([]Observer, 0, len(observers)) + for _, obs := range observers { + if obs != nil { + filtered = append(filtered, obs) + } + } + return &MultiObserver{observers: filtered} +} + +func (m *MultiObserver) OnEvent(ctx context.Context, event Event) { + for _, obs := range m.observers { + obs.OnEvent(ctx, event) + } +} diff --git a/observability/noop.go b/observability/noop.go new file mode 100644 index 0000000..fb7ea89 --- /dev/null +++ b/observability/noop.go @@ -0,0 +1,8 @@ +package observability + +import "context" + +// NoOpObserver discards all events with zero overhead. +type NoOpObserver struct{} + +func (NoOpObserver) OnEvent(ctx context.Context, event Event) {} diff --git a/observability/observer.go b/observability/observer.go new file mode 100644 index 0000000..42d4aa4 --- /dev/null +++ b/observability/observer.go @@ -0,0 +1,72 @@ +// Package observability provides event-based observability for kernel and +// orchestrate subsystems. Level values align with OpenTelemetry SeverityNumbers +// for zero-translation compatibility with OTel collectors. +package observability + +import ( + "context" + "log/slog" + "time" +) + +// Level represents event severity aligned with OTel SeverityNumber ranges. +type Level int + +const ( + LevelVerbose Level = 5 // OTel DEBUG (5-8), maps to slog.LevelDebug + LevelInfo Level = 9 // OTel INFO (9-12), maps to slog.LevelInfo + LevelWarning Level = 13 // OTel WARN (13-16), maps to slog.LevelWarn + LevelError Level = 17 // OTel ERROR (17-20), maps to slog.LevelError +) + +// String returns the OTel severity text for the level. +func (l Level) String() string { + switch { + case l <= 4: + return "TRACE" + case l <= 8: + return "DEBUG" + case l <= 12: + return "INFO" + case l <= 16: + return "WARN" + case l <= 20: + return "ERROR" + default: + return "FATAL" + } +} + +// SlogLevel maps this level to the corresponding slog.Level for log emission. +func (l Level) SlogLevel() slog.Level { + switch { + case l <= 8: + return slog.LevelDebug + case l <= 12: + return slog.LevelInfo + case l <= 16: + return slog.LevelWarn + default: + return slog.LevelError + } +} + +// EventType identifies the kind of event. Each subsystem defines its own +// constants using this type (e.g., "kernel.run.start", "graph.complete"). +type EventType string + +// Event is an observability event emitted by subsystems. Fields map to +// OTel LogRecord fields: Type→EventName, Level→SeverityNumber, +// Timestamp→Timestamp, Source→InstrumentationScope, Data→Attributes. +type Event struct { + Type EventType + Level Level + Timestamp time.Time + Source string + Data map[string]any +} + +// Observer receives events from subsystems for logging, tracing, or metrics. +type Observer interface { + OnEvent(ctx context.Context, event Event) +} diff --git a/observability/observer_test.go b/observability/observer_test.go new file mode 100644 index 0000000..0cb4889 --- /dev/null +++ b/observability/observer_test.go @@ -0,0 +1,255 @@ +package observability_test + +import ( + "bytes" + "context" + "log/slog" + "testing" + "time" + + "github.com/tailored-agentic-units/kernel/observability" +) + +func TestLevel_String(t *testing.T) { + tests := []struct { + name string + level observability.Level + want string + }{ + {name: "trace range", level: 1, want: "TRACE"}, + {name: "verbose maps to DEBUG", level: observability.LevelVerbose, want: "DEBUG"}, + {name: "info maps to INFO", level: observability.LevelInfo, want: "INFO"}, + {name: "warning maps to WARN", level: observability.LevelWarning, want: "WARN"}, + {name: "error maps to ERROR", level: observability.LevelError, want: "ERROR"}, + {name: "fatal range", level: 21, want: "FATAL"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.level.String(); got != tt.want { + t.Errorf("Level(%d).String() = %q, want %q", tt.level, got, tt.want) + } + }) + } +} + +func TestLevel_SlogLevel(t *testing.T) { + tests := []struct { + name string + level observability.Level + want slog.Level + }{ + {name: "verbose maps to Debug", level: observability.LevelVerbose, want: slog.LevelDebug}, + {name: "info maps to Info", level: observability.LevelInfo, want: slog.LevelInfo}, + {name: "warning maps to Warn", level: observability.LevelWarning, want: slog.LevelWarn}, + {name: "error maps to Error", level: observability.LevelError, want: slog.LevelError}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.level.SlogLevel(); got != tt.want { + t.Errorf("Level(%d).SlogLevel() = %v, want %v", tt.level, got, tt.want) + } + }) + } +} + +func TestLevel_OTelAlignment(t *testing.T) { + if observability.LevelVerbose != 5 { + t.Errorf("LevelVerbose = %d, want 5 (OTel DEBUG range)", observability.LevelVerbose) + } + if observability.LevelInfo != 9 { + t.Errorf("LevelInfo = %d, want 9 (OTel INFO range)", observability.LevelInfo) + } + if observability.LevelWarning != 13 { + t.Errorf("LevelWarning = %d, want 13 (OTel WARN range)", observability.LevelWarning) + } + if observability.LevelError != 17 { + t.Errorf("LevelError = %d, want 17 (OTel ERROR range)", observability.LevelError) + } +} + +func TestNoOpObserver(t *testing.T) { + obs := observability.NoOpObserver{} + obs.OnEvent(context.Background(), observability.Event{ + Type: "test.event", + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "test", + Data: map[string]any{"key": "value"}, + }) +} + +func TestMultiObserver(t *testing.T) { + var events1, events2 []observability.Event + + obs1 := &captureObserver{events: &events1} + obs2 := &captureObserver{events: &events2} + + multi := observability.NewMultiObserver(obs1, obs2) + + event := observability.Event{ + Type: "test.event", + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "test", + Data: map[string]any{"key": "value"}, + } + + multi.OnEvent(context.Background(), event) + + if len(events1) != 1 { + t.Errorf("observer 1 received %d events, want 1", len(events1)) + } + if len(events2) != 1 { + t.Errorf("observer 2 received %d events, want 1", len(events2)) + } + if events1[0].Type != "test.event" { + t.Errorf("observer 1 event type = %q, want %q", events1[0].Type, "test.event") + } +} + +func TestMultiObserver_NilFiltering(t *testing.T) { + var events []observability.Event + obs := &captureObserver{events: &events} + + multi := observability.NewMultiObserver(nil, obs, nil) + + multi.OnEvent(context.Background(), observability.Event{ + Type: "test.event", + Level: observability.LevelInfo, + }) + + if len(events) != 1 { + t.Errorf("received %d events, want 1 (nil observers should be filtered)", len(events)) + } +} + +func TestSlogObserver_LevelMapping(t *testing.T) { + tests := []struct { + name string + level observability.Level + minLevel slog.Level + expectLog bool + }{ + {name: "verbose at debug handler", level: observability.LevelVerbose, minLevel: slog.LevelDebug, expectLog: true}, + {name: "verbose at info handler", level: observability.LevelVerbose, minLevel: slog.LevelInfo, expectLog: false}, + {name: "info at info handler", level: observability.LevelInfo, minLevel: slog.LevelInfo, expectLog: true}, + {name: "info at warn handler", level: observability.LevelInfo, minLevel: slog.LevelWarn, expectLog: false}, + {name: "warning at warn handler", level: observability.LevelWarning, minLevel: slog.LevelWarn, expectLog: true}, + {name: "error at error handler", level: observability.LevelError, minLevel: slog.LevelError, expectLog: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: tt.minLevel, + })) + + obs := observability.NewSlogObserver(logger) + obs.OnEvent(context.Background(), observability.Event{ + Type: "test.event", + Level: tt.level, + Timestamp: time.Now(), + Source: "test", + }) + + hasOutput := buf.Len() > 0 + if hasOutput != tt.expectLog { + t.Errorf("log output = %v, want %v (buf: %q)", hasOutput, tt.expectLog, buf.String()) + } + }) + } +} + +func TestSlogObserver_EventTypeAsMessage(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + obs := observability.NewSlogObserver(logger) + obs.OnEvent(context.Background(), observability.Event{ + Type: "kernel.run.start", + Level: observability.LevelInfo, + Timestamp: time.Now(), + Source: "kernel.Run", + Data: map[string]any{ + "prompt_length": 42, + }, + }) + + output := buf.String() + if !contains(output, "kernel.run.start") { + t.Errorf("expected event type as log message, got: %s", output) + } + if !contains(output, "source=kernel.Run") { + t.Errorf("expected source attribute, got: %s", output) + } + if !contains(output, "prompt_length=42") { + t.Errorf("expected data attributes, got: %s", output) + } +} + +func TestRegistry_GetObserver(t *testing.T) { + tests := []struct { + name string + key string + wantErr bool + }{ + {name: "noop exists", key: "noop", wantErr: false}, + {name: "slog exists", key: "slog", wantErr: false}, + {name: "unknown fails", key: "nonexistent", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + obs, err := observability.GetObserver(tt.key) + if (err != nil) != tt.wantErr { + t.Errorf("GetObserver(%q) error = %v, wantErr %v", tt.key, err, tt.wantErr) + } + if !tt.wantErr && obs == nil { + t.Errorf("GetObserver(%q) returned nil observer", tt.key) + } + }) + } +} + +func TestRegistry_RegisterAndGet(t *testing.T) { + var events []observability.Event + custom := &captureObserver{events: &events} + + observability.RegisterObserver("test-custom", custom) + + obs, err := observability.GetObserver("test-custom") + if err != nil { + t.Fatalf("GetObserver failed: %v", err) + } + + obs.OnEvent(context.Background(), observability.Event{ + Type: "test.event", + Level: observability.LevelInfo, + }) + + if len(events) != 1 { + t.Errorf("received %d events, want 1", len(events)) + } +} + +type captureObserver struct { + events *[]observability.Event +} + +func (c *captureObserver) OnEvent(ctx context.Context, event observability.Event) { + *c.events = append(*c.events, event) +} + +func contains(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/observability/registry.go b/observability/registry.go new file mode 100644 index 0000000..7078278 --- /dev/null +++ b/observability/registry.go @@ -0,0 +1,36 @@ +package observability + +import ( + "fmt" + "log/slog" + "sync" +) + +var ( + observers = map[string]Observer{ + "noop": NoOpObserver{}, + "slog": NewSlogObserver(slog.Default()), + } + mutex sync.RWMutex +) + +// GetObserver returns a registered observer by name. +// Pre-registered observers: "noop" (NoOpObserver) and "slog" (default logger). +func GetObserver(name string) (Observer, error) { + mutex.RLock() + defer mutex.RUnlock() + + obs, exists := observers[name] + if !exists { + return nil, fmt.Errorf("unknown observer: %s", name) + } + return obs, nil +} + +// RegisterObserver adds or replaces a named observer in the global registry. +func RegisterObserver(name string, observer Observer) { + mutex.Lock() + defer mutex.Unlock() + + observers[name] = observer +} diff --git a/observability/slog.go b/observability/slog.go new file mode 100644 index 0000000..10f3d10 --- /dev/null +++ b/observability/slog.go @@ -0,0 +1,28 @@ +package observability + +import ( + "context" + "log/slog" +) + +// SlogObserver emits events to a slog.Logger. Event levels are mapped via +// SlogLevel, the event type becomes the log message, and Data keys are +// flattened as top-level slog attributes. +type SlogObserver struct { + logger *slog.Logger +} + +// NewSlogObserver creates a SlogObserver that emits to the given logger. +func NewSlogObserver(logger *slog.Logger) *SlogObserver { + return &SlogObserver{logger: logger} +} + +func (o *SlogObserver) OnEvent(ctx context.Context, event Event) { + attrs := make([]slog.Attr, 0, len(event.Data)+1) + attrs = append(attrs, slog.String("source", event.Source)) + for k, v := range event.Data { + attrs = append(attrs, slog.Any(k, v)) + } + + o.logger.LogAttrs(ctx, event.Level.SlogLevel(), string(event.Type), attrs...) +} diff --git a/orchestrate/examples/darpa-procurement/main.go b/orchestrate/examples/darpa-procurement/main.go index ede8f08..6575107 100644 --- a/orchestrate/examples/darpa-procurement/main.go +++ b/orchestrate/examples/darpa-procurement/main.go @@ -8,7 +8,7 @@ import ( "os" "time" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/examples/phase-02-03-state-graphs/main.go b/orchestrate/examples/phase-02-03-state-graphs/main.go index 7f5acb4..f0fb519 100644 --- a/orchestrate/examples/phase-02-03-state-graphs/main.go +++ b/orchestrate/examples/phase-02-03-state-graphs/main.go @@ -11,8 +11,8 @@ import ( "github.com/tailored-agentic-units/kernel/agent" agentconfig "github.com/tailored-agentic-units/kernel/core/config" "github.com/tailored-agentic-units/kernel/core/protocol" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/examples/phase-04-sequential-chains/main.go b/orchestrate/examples/phase-04-sequential-chains/main.go index c522272..1f3e0ef 100644 --- a/orchestrate/examples/phase-04-sequential-chains/main.go +++ b/orchestrate/examples/phase-04-sequential-chains/main.go @@ -11,8 +11,8 @@ import ( "github.com/tailored-agentic-units/kernel/agent" agentconfig "github.com/tailored-agentic-units/kernel/core/config" "github.com/tailored-agentic-units/kernel/core/protocol" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" "github.com/tailored-agentic-units/kernel/orchestrate/workflows" ) diff --git a/orchestrate/examples/phase-05-parallel-execution/main.go b/orchestrate/examples/phase-05-parallel-execution/main.go index 533d56a..548328b 100644 --- a/orchestrate/examples/phase-05-parallel-execution/main.go +++ b/orchestrate/examples/phase-05-parallel-execution/main.go @@ -11,8 +11,8 @@ import ( "github.com/tailored-agentic-units/kernel/agent" agentconfig "github.com/tailored-agentic-units/kernel/core/config" "github.com/tailored-agentic-units/kernel/core/protocol" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/tailored-agentic-units/kernel/orchestrate/workflows" ) diff --git a/orchestrate/examples/phase-06-checkpointing/main.go b/orchestrate/examples/phase-06-checkpointing/main.go index 798eed5..01a3b86 100644 --- a/orchestrate/examples/phase-06-checkpointing/main.go +++ b/orchestrate/examples/phase-06-checkpointing/main.go @@ -11,8 +11,8 @@ import ( "github.com/tailored-agentic-units/kernel/agent" agentconfig "github.com/tailored-agentic-units/kernel/core/config" "github.com/tailored-agentic-units/kernel/core/protocol" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/observability/doc.go b/orchestrate/observability/doc.go deleted file mode 100644 index ccf46a2..0000000 --- a/orchestrate/observability/doc.go +++ /dev/null @@ -1,40 +0,0 @@ -// Package observability provides minimal observability primitives for orchestration workflows. -// -// This package establishes the foundation for production-grade observability without -// impacting performance when observability is not needed. It defines interfaces and types -// for capturing execution events across state management, graph execution, and workflow patterns. -// -// # Core Components -// -// Observer - Interface for receiving execution events -// -// Event - Structure containing event metadata (type, timestamp, source, data) -// -// EventType - Constants for all observable events across orchestration primitives -// -// NoOpObserver - Zero-cost observer implementation when observability not needed -// -// # Observer Registry -// -// The package provides a registry pattern enabling configuration-driven observer selection: -// -// observability.RegisterObserver("slog", NewSlogObserver()) -// observer, err := observability.GetObserver("slog") -// -// This enables JSON configuration: -// -// {"name": "my-graph", "observer": "slog", "max_iterations": 100} -// -// # Usage -// -// Observer hooks are integrated into all orchestration primitives from Phase 2 onwards. -// Events are emitted at key execution points without affecting normal execution flow. -// -// When observability is not needed, use NoOpObserver for zero performance overhead: -// -// observer := observability.NoOpObserver{} -// state := state.New(observer) // No events emitted -// -// Phase 8 will provide production Observer implementations (structured logging, metrics, -// trace correlation). The minimal interface established here prevents retrofit friction. -package observability diff --git a/orchestrate/observability/multi.go b/orchestrate/observability/multi.go deleted file mode 100644 index 80cc970..0000000 --- a/orchestrate/observability/multi.go +++ /dev/null @@ -1,33 +0,0 @@ -package observability - -import "context" - -// MultiObserver broadcasts events to multiple wrapped observers. -// -// Use MultiObserver when events need to reach multiple destinations simultaneously, -// such as persisting to a database while also streaming to clients. The implementation -// is not thread-safe for modification after construction; all observers should be -// provided at creation time. -type MultiObserver struct { - observers []Observer -} - -// NewMultiObserver creates a MultiObserver that broadcasts to all provided observers. -// Nil observers are filtered out during construction to prevent nil pointer panics. -func NewMultiObserver(observers ...Observer) *MultiObserver { - filtered := make([]Observer, 0, len(observers)) - for _, obs := range observers { - if obs != nil { - filtered = append(filtered, obs) - } - } - return &MultiObserver{observers: filtered} -} - -// OnEvent forwards the event to all wrapped observers sequentially. -// The context is propagated to each observer for cancellation support. -func (m *MultiObserver) OnEvent(ctx context.Context, event Event) { - for _, obs := range m.observers { - obs.OnEvent(ctx, event) - } -} diff --git a/orchestrate/observability/multi_test.go b/orchestrate/observability/multi_test.go deleted file mode 100644 index cdd1267..0000000 --- a/orchestrate/observability/multi_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package observability_test - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/tailored-agentic-units/kernel/orchestrate/observability" -) - -type captureObserver struct { - mu sync.Mutex - events []observability.Event -} - -func (o *captureObserver) OnEvent(ctx context.Context, event observability.Event) { - o.mu.Lock() - defer o.mu.Unlock() - o.events = append(o.events, event) -} - -func (o *captureObserver) getEvents() []observability.Event { - o.mu.Lock() - defer o.mu.Unlock() - return o.events -} - -func TestMultiObserver_BroadcastsToAllObservers(t *testing.T) { - obs1 := &captureObserver{} - obs2 := &captureObserver{} - obs3 := &captureObserver{} - - multi := observability.NewMultiObserver(obs1, obs2, obs3) - - event := observability.Event{ - Type: observability.EventNodeStart, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{"key": "value"}, - } - - multi.OnEvent(context.Background(), event) - - observers := []*captureObserver{obs1, obs2, obs3} - for i, obs := range observers { - events := obs.getEvents() - if len(events) != 1 { - t.Errorf("Observer %d: got %d events, want 1", i, len(events)) - } - if events[0].Type != observability.EventNodeStart { - t.Errorf("Observer %d: got type %v, want %v", i, events[0].Type, observability.EventNodeStart) - } - } -} - -func TestMultiObserver_EmptyObservers(t *testing.T) { - multi := observability.NewMultiObserver() - - event := observability.Event{ - Type: observability.EventNodeStart, - Timestamp: time.Now(), - Source: "test", - } - - multi.OnEvent(context.Background(), event) -} - -func TestMultiObserver_FiltersNilObservers(t *testing.T) { - obs1 := &captureObserver{} - obs2 := &captureObserver{} - - multi := observability.NewMultiObserver(obs1, nil, obs2, nil) - - event := observability.Event{ - Type: observability.EventNodeComplete, - Timestamp: time.Now(), - Source: "test", - } - - multi.OnEvent(context.Background(), event) - - if len(obs1.getEvents()) != 1 { - t.Errorf("obs1: got %d events, want 1", len(obs1.getEvents())) - } - if len(obs2.getEvents()) != 1 { - t.Errorf("obs2: got %d events, want 1", len(obs2.getEvents())) - } -} - -func TestMultiObserver_SingleObserver(t *testing.T) { - obs := &captureObserver{} - multi := observability.NewMultiObserver(obs) - - event := observability.Event{ - Type: observability.EventGraphStart, - Timestamp: time.Now(), - Source: "graph", - Data: map[string]any{"name": "test-graph"}, - } - - multi.OnEvent(context.Background(), event) - - events := obs.getEvents() - if len(events) != 1 { - t.Fatalf("got %d events, want 1", len(events)) - } - if events[0].Data["name"] != "test-graph" { - t.Errorf("got name %v, want test-graph", events[0].Data["name"]) - } -} - -func TestMultiObserver_PreservesEventData(t *testing.T) { - obs := &captureObserver{} - multi := observability.NewMultiObserver(obs) - - originalData := map[string]any{ - "string": "value", - "number": 42, - "nested": map[string]any{"inner": "data"}, - } - - event := observability.Event{ - Type: observability.EventStateSet, - Timestamp: time.Now(), - Source: "state", - Data: originalData, - } - - multi.OnEvent(context.Background(), event) - - events := obs.getEvents() - if len(events) != 1 { - t.Fatalf("got %d events, want 1", len(events)) - } - - receivedData := events[0].Data - if receivedData["string"] != "value" { - t.Errorf("string: got %v, want value", receivedData["string"]) - } - if receivedData["number"] != 42 { - t.Errorf("number: got %v, want 42", receivedData["number"]) - } -} - -func TestMultiObserver_PropagatesContext(t *testing.T) { - type ctxKey string - key := ctxKey("test-key") - - var receivedCtx context.Context - obs := &captureObserver{} - - wrapper := &contextCapture{ - inner: obs, - capturedFn: func(ctx context.Context) { receivedCtx = ctx }, - } - - multi := observability.NewMultiObserver(wrapper) - - ctx := context.WithValue(context.Background(), key, "test-value") - event := observability.Event{ - Type: observability.EventNodeStart, - Timestamp: time.Now(), - Source: "test", - } - - multi.OnEvent(ctx, event) - - if receivedCtx == nil { - t.Fatal("context was not propagated") - } - if receivedCtx.Value(key) != "test-value" { - t.Errorf("context value: got %v, want test-value", receivedCtx.Value(key)) - } -} - -type contextCapture struct { - inner observability.Observer - capturedFn func(ctx context.Context) -} - -func (c *contextCapture) OnEvent(ctx context.Context, event observability.Event) { - c.capturedFn(ctx) - c.inner.OnEvent(ctx, event) -} - -func TestMultiObserver_ConcurrentEvents(t *testing.T) { - obs := &captureObserver{} - multi := observability.NewMultiObserver(obs) - - const numGoroutines = 10 - const eventsPerGoroutine = 100 - - var wg sync.WaitGroup - wg.Add(numGoroutines) - - for i := range numGoroutines { - go func(id int) { - defer wg.Done() - for j := range eventsPerGoroutine { - event := observability.Event{ - Type: observability.EventNodeStart, - Timestamp: time.Now(), - Source: "concurrent-test", - Data: map[string]any{"goroutine": id, "event": j}, - } - multi.OnEvent(context.Background(), event) - } - }(i) - } - - wg.Wait() - - events := obs.getEvents() - expected := numGoroutines * eventsPerGoroutine - if len(events) != expected { - t.Errorf("got %d events, want %d", len(events), expected) - } -} diff --git a/orchestrate/observability/noop.go b/orchestrate/observability/noop.go deleted file mode 100644 index 838949b..0000000 --- a/orchestrate/observability/noop.go +++ /dev/null @@ -1,17 +0,0 @@ -package observability - -import "context" - -// NoOpObserver provides a zero-cost Observer implementation that discards all events. -// -// Use NoOpObserver when observability is not needed to avoid performance overhead. -// The implementation is stateless and can be safely reused across goroutines. -// -// Example: -// -// observer := observability.NoOpObserver{} -// state := state.New(observer) // No events emitted, zero overhead -type NoOpObserver struct{} - -// OnEvent discards the event without any processing. -func (NoOpObserver) OnEvent(ctx context.Context, event Event) {} diff --git a/orchestrate/observability/observer.go b/orchestrate/observability/observer.go deleted file mode 100644 index 5fe0a05..0000000 --- a/orchestrate/observability/observer.go +++ /dev/null @@ -1,84 +0,0 @@ -package observability - -import ( - "context" - "time" -) - -// Observer receives execution events from orchestration primitives. -// -// Observer implementations can log events, collect metrics, trace execution flow, -// or capture decision points. The interface is intentionally minimal to avoid -// coupling orchestration primitives to specific observability implementations. -// -// Implementations should not affect execution flow - errors or delays in OnEvent -// should not propagate to the caller. -type Observer interface { - // OnEvent receives an execution event with metadata about what happened. - // The context provides cancellation/timeout control for expensive operations. - OnEvent(ctx context.Context, event Event) -} - -// Event represents an observable occurrence during workflow execution. -// -// Events capture execution metadata rather than application data. This approach -// enables observability without exposing sensitive information or impacting performance. -type Event struct { - // Type categorizes the event (state.set, node.execute, etc.) - Type EventType - - // Timestamp records when the event occurred - Timestamp time.Time - - // Source identifies the component that emitted the event (state, graph, chain, etc.) - Source string - - // Data contains metadata about the event (keys changed, duration, progress, etc.) - // This is execution telemetry, not application data - Data map[string]any -} - -// EventType categorizes observable events across orchestration primitives. -// -// Event types are defined for all phases (2-8) to establish a consistent event -// model across the entire orchestration infrastructure. -type EventType string - -const ( - // Phase 2: State operations - EventStateCreate EventType = "state.create" - EventStateClone EventType = "state.clone" - EventStateSet EventType = "state.set" - EventStateMerge EventType = "state.merge" - - // Phase 3: Graph execution - EventGraphStart EventType = "graph.start" - EventGraphComplete EventType = "graph.complete" - EventNodeStart EventType = "node.start" - EventNodeComplete EventType = "node.complete" - EventEdgeEvaluate EventType = "edge.evaluate" - EventEdgeTransition EventType = "edge.transition" - EventCycleDetected EventType = "cycle.detected" - - // Phase 4: Sequential chains - EventChainStart EventType = "chain.start" - EventChainComplete EventType = "chain.complete" - EventStepStart EventType = "step.start" - EventStepComplete EventType = "step.complete" - - // Phase 5: Parallel execution - EventParallelStart EventType = "parallel.start" - EventParallelComplete EventType = "parallel.complete" - EventWorkerStart EventType = "worker.start" - EventWorkerComplete EventType = "worker.complete" - - // Phase 6: Checkpointing - EventCheckpointSave EventType = "checkpoint.save" - EventCheckpointLoad EventType = "checkpoint.load" - EventCheckpointResume EventType = "checkpoint.resume" - - // Phase 7: Conditional routing - EventRouteEvaluate EventType = "route.evaluate" - EventRouteSelect EventType = "route.select" - EventRouteExecute EventType = "route.execute" -) diff --git a/orchestrate/observability/observer_test.go b/orchestrate/observability/observer_test.go deleted file mode 100644 index ed656ce..0000000 --- a/orchestrate/observability/observer_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package observability_test - -import ( - "context" - "testing" - "time" - - "github.com/tailored-agentic-units/kernel/orchestrate/observability" -) - -func TestObserver_NoOpObserver(t *testing.T) { - observer := observability.NoOpObserver{} - event := observability.Event{ - Type: observability.EventStateCreate, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{"key": "value"}, - } - - observer.OnEvent(context.Background(), event) -} - -func TestObserverRegistry_GetObserver(t *testing.T) { - tests := []struct { - name string - observerKey string - wantErr bool - }{ - { - name: "noop observer exists", - observerKey: "noop", - wantErr: false, - }, - { - name: "unknown observer returns error", - observerKey: "unknown", - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - observer, err := observability.GetObserver(tt.observerKey) - if (err != nil) != tt.wantErr { - t.Errorf("GetObserver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !tt.wantErr && observer == nil { - t.Error("GetObserver() returned nil observer for valid key") - } - }) - } -} - -type testObserver struct{} - -func (testObserver) OnEvent(ctx context.Context, event observability.Event) {} - -func TestObserverRegistry_RegisterObserver(t *testing.T) { - observability.RegisterObserver("test-observer", testObserver{}) - - observer, err := observability.GetObserver("test-observer") - if err != nil { - t.Errorf("GetObserver() after registration failed: %v", err) - } - if observer == nil { - t.Error("GetObserver() returned nil for registered observer") - } -} - -func TestEvent_Structure(t *testing.T) { - now := time.Now() - event := observability.Event{ - Type: observability.EventStateSet, - Timestamp: now, - Source: "test-source", - Data: map[string]any{"key": "test-key"}, - } - - if event.Type != observability.EventStateSet { - t.Errorf("Event.Type = %v, want %v", event.Type, observability.EventStateSet) - } - if event.Source != "test-source" { - t.Errorf("Event.Source = %v, want %v", event.Source, "test-source") - } - if event.Data["key"] != "test-key" { - t.Errorf("Event.Data[key] = %v, want %v", event.Data["key"], "test-key") - } -} - -func TestEventType_Constants(t *testing.T) { - eventTypes := []observability.EventType{ - observability.EventStateCreate, - observability.EventStateClone, - observability.EventStateSet, - observability.EventStateMerge, - observability.EventGraphStart, - observability.EventGraphComplete, - observability.EventNodeStart, - observability.EventNodeComplete, - observability.EventEdgeEvaluate, - observability.EventEdgeTransition, - observability.EventCycleDetected, - observability.EventChainStart, - observability.EventChainComplete, - observability.EventStepStart, - observability.EventStepComplete, - observability.EventParallelStart, - observability.EventParallelComplete, - observability.EventWorkerStart, - observability.EventWorkerComplete, - observability.EventCheckpointSave, - observability.EventCheckpointLoad, - observability.EventCheckpointResume, - observability.EventRouteEvaluate, - observability.EventRouteSelect, - observability.EventRouteExecute, - } - - for _, et := range eventTypes { - if et == "" { - t.Errorf("EventType constant is empty string") - } - } -} diff --git a/orchestrate/observability/registry.go b/orchestrate/observability/registry.go deleted file mode 100644 index b2989b3..0000000 --- a/orchestrate/observability/registry.go +++ /dev/null @@ -1,66 +0,0 @@ -package observability - -import ( - "fmt" - "log/slog" - "sync" -) - -// observers registry maps observer names to implementations. -// Initialized with "noop" observer for zero-overhead observability. -var ( - observers = map[string]Observer{ - "noop": NoOpObserver{}, - "slog": NewSlogObserver(slog.Default()), - } - mutex sync.RWMutex -) - -// GetObserver retrieves a registered observer by name. -// -// This function enables configuration-driven observer selection, allowing JSON -// configurations to specify observers as strings that are resolved at runtime. -// -// Returns an error if the observer name is not registered. -// -// Example: -// -// observer, err := observability.GetObserver("slog") -// if err != nil { -// log.Fatal(err) -// } -func GetObserver(name string) (Observer, error) { - mutex.RLock() - defer mutex.RUnlock() - - obs, exists := observers[name] - if !exists { - return nil, fmt.Errorf("unknown observer: %s", name) - } - return obs, nil -} - -// RegisterObserver registers a custom observer implementation under the given name. -// -// This enables extensibility - users can provide custom Observer implementations -// and register them for use via configuration. -// -// Phase 8 will use this to register production observers: -// - "slog" - Structured logging via Go's slog package -// - "otel" - OpenTelemetry integration -// - Custom implementations provided by users -// -// Example: -// -// type MyObserver struct{ logger *slog.Logger } -// func (o *MyObserver) OnEvent(ctx context.Context, event Event) { -// o.logger.Info("event", "type", event.Type, "source", event.Source) -// } -// -// observability.RegisterObserver("my-observer", &MyObserver{logger}) -func RegisterObserver(name string, observer Observer) { - mutex.Lock() - defer mutex.Unlock() - - observers[name] = observer -} diff --git a/orchestrate/observability/slog.go b/orchestrate/observability/slog.go deleted file mode 100644 index 3487720..0000000 --- a/orchestrate/observability/slog.go +++ /dev/null @@ -1,69 +0,0 @@ -package observability - -import ( - "context" - "log/slog" -) - -// SlogObserver provides structured logging observability using Go's slog package. -// -// SlogObserver writes all orchestration events to a structured logger at Info level, -// capturing event type, source, timestamp, and associated metadata. This enables -// debugging and monitoring of workflow execution through standard log aggregation tools. -// -// The observer uses slog's context-aware logging (InfoContext) to propagate cancellation -// signals and tracing context from the workflow execution context. -// -// Example: -// -// logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) -// observer := observability.NewSlogObserver(logger) -// observability.RegisterObserver("production", observer) -// -// cfg := config.ChainConfig{Observer: "production"} -// result, err := workflows.ProcessChain(ctx, cfg, items, initial, processor, nil) -type SlogObserver struct { - logger *slog.Logger -} - -// NewSlogObserver creates a new SlogObserver with the specified logger. -// -// The logger parameter allows customization of the slog handler, output destination, -// and log level filtering. Pass slog.Default() for the default logger configuration. -// -// Example: -// -// observer := observability.NewSlogObserver(slog.Default()) -// -// Example with custom handler: -// -// handler := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ -// Level: slog.LevelDebug, -// }) -// logger := slog.New(handler) -// observer := observability.NewSlogObserver(logger) -func NewSlogObserver(logger *slog.Logger) *SlogObserver { - return &SlogObserver{ - logger: logger, - } -} - -// OnEvent logs the event at Info level with structured fields. -// -// The event is logged with the following slog attributes: -// - type: The EventType constant (e.g., "chain.start") -// - source: The component that emitted the event (e.g., "workflows.ProcessChain") -// - timestamp: When the event occurred -// - data: Event-specific metadata map -// -// The context is propagated to InfoContext for cancellation and tracing integration. -func (o *SlogObserver) OnEvent(ctx context.Context, event Event) { - o.logger.InfoContext( - ctx, - "Event", - "type", event.Type, - "source", event.Source, - "timestamp", event.Timestamp, - "data", event.Data, - ) -} diff --git a/orchestrate/observability/slog_test.go b/orchestrate/observability/slog_test.go deleted file mode 100644 index 8098ff1..0000000 --- a/orchestrate/observability/slog_test.go +++ /dev/null @@ -1,277 +0,0 @@ -package observability_test - -import ( - "bytes" - "context" - "log/slog" - "strings" - "testing" - "time" - - "github.com/tailored-agentic-units/kernel/orchestrate/observability" -) - -func TestSlogObserver_OnEvent_LogsEventFields(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventChainStart, - Timestamp: time.Now(), - Source: "test.source", - Data: map[string]any{ - "item_count": 5, - "test_key": "test_value", - }, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, "Event") { - t.Error("Expected log message to contain 'Event'") - } - if !strings.Contains(output, "chain.start") { - t.Error("Expected log to contain event type 'chain.start'") - } - if !strings.Contains(output, "test.source") { - t.Error("Expected log to contain source 'test.source'") - } - if !strings.Contains(output, "item_count") { - t.Error("Expected log to contain data field 'item_count'") - } -} - -func TestSlogObserver_OnEvent_HandlesAllEventTypes(t *testing.T) { - tests := []struct { - name string - eventType observability.EventType - }{ - {"StateCreate", observability.EventStateCreate}, - {"StateClone", observability.EventStateClone}, - {"StateSet", observability.EventStateSet}, - {"StateMerge", observability.EventStateMerge}, - {"GraphStart", observability.EventGraphStart}, - {"GraphComplete", observability.EventGraphComplete}, - {"NodeStart", observability.EventNodeStart}, - {"NodeComplete", observability.EventNodeComplete}, - {"EdgeEvaluate", observability.EventEdgeEvaluate}, - {"EdgeTransition", observability.EventEdgeTransition}, - {"CycleDetected", observability.EventCycleDetected}, - {"ChainStart", observability.EventChainStart}, - {"ChainComplete", observability.EventChainComplete}, - {"StepStart", observability.EventStepStart}, - {"StepComplete", observability.EventStepComplete}, - {"ParallelStart", observability.EventParallelStart}, - {"ParallelComplete", observability.EventParallelComplete}, - {"WorkerStart", observability.EventWorkerStart}, - {"WorkerComplete", observability.EventWorkerComplete}, - {"CheckpointSave", observability.EventCheckpointSave}, - {"CheckpointLoad", observability.EventCheckpointLoad}, - {"CheckpointResume", observability.EventCheckpointResume}, - {"RouteEvaluate", observability.EventRouteEvaluate}, - {"RouteSelect", observability.EventRouteSelect}, - {"RouteExecute", observability.EventRouteExecute}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: tt.eventType, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{}, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, string(tt.eventType)) { - t.Errorf("Expected log to contain event type %q", tt.eventType) - } - }) - } -} - -func TestSlogObserver_OnEvent_HandlesEmptyData(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventChainStart, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{}, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, "Event") { - t.Error("Expected log message even with empty data") - } -} - -func TestSlogObserver_OnEvent_HandlesNilData(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventChainStart, - Timestamp: time.Now(), - Source: "test", - Data: nil, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, "Event") { - t.Error("Expected log message even with nil data") - } -} - -func TestSlogObserver_OnEvent_HandlesComplexDataTypes(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventParallelStart, - Timestamp: time.Now(), - Source: "workflows.ProcessParallel", - Data: map[string]any{ - "item_count": 100, - "worker_count": 8, - "fail_fast": true, - "nested": map[string]any{ - "key1": "value1", - "key2": 42, - }, - "slice": []string{"a", "b", "c"}, - }, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, "item_count") { - t.Error("Expected log to contain 'item_count'") - } - if !strings.Contains(output, "worker_count") { - t.Error("Expected log to contain 'worker_count'") - } -} - -func TestSlogObserver_OnEvent_PropagatesContext(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - event := observability.Event{ - Type: observability.EventChainStart, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{}, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if len(output) == 0 { - t.Error("Expected log output with valid context") - } -} - -func TestSlogObserver_JSONHandler(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewJSONHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventWorkerStart, - Timestamp: time.Now(), - Source: "workflows.ProcessParallel", - Data: map[string]any{ - "worker_id": 3, - "item_index": 42, - }, - } - - observer.OnEvent(ctx, event) - - output := buf.String() - if !strings.Contains(output, "{") { - t.Error("Expected JSON output") - } - if !strings.Contains(output, "worker.start") { - t.Error("Expected event type in JSON output") - } - if !strings.Contains(output, "worker_id") { - t.Error("Expected data fields in JSON output") - } -} - -func TestNewSlogObserver_WithDefaultLogger(t *testing.T) { - observer := observability.NewSlogObserver(slog.Default()) - - ctx := context.Background() - event := observability.Event{ - Type: observability.EventChainStart, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{}, - } - - observer.OnEvent(ctx, event) -} - -func TestSlogObserver_ConcurrentEvents(t *testing.T) { - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, nil)) - observer := observability.NewSlogObserver(logger) - - ctx := context.Background() - done := make(chan struct{}) - eventCount := 100 - - for i := range eventCount { - go func(id int) { - event := observability.Event{ - Type: observability.EventWorkerStart, - Timestamp: time.Now(), - Source: "test", - Data: map[string]any{ - "worker_id": id, - }, - } - observer.OnEvent(ctx, event) - done <- struct{}{} - }(i) - } - - for range eventCount { - <-done - } - - output := buf.String() - if len(output) == 0 { - t.Error("Expected log output from concurrent events") - } -} diff --git a/orchestrate/state/checkpoint_test.go b/orchestrate/state/checkpoint_test.go index 16c0197..437c178 100644 --- a/orchestrate/state/checkpoint_test.go +++ b/orchestrate/state/checkpoint_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/state/edge_test.go b/orchestrate/state/edge_test.go index 37f2e39..2f69cfd 100644 --- a/orchestrate/state/edge_test.go +++ b/orchestrate/state/edge_test.go @@ -3,7 +3,7 @@ package state_test import ( "testing" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/state/events.go b/orchestrate/state/events.go new file mode 100644 index 0000000..7d137e7 --- /dev/null +++ b/orchestrate/state/events.go @@ -0,0 +1,26 @@ +package state + +import "github.com/tailored-agentic-units/kernel/observability" + +const ( + // State operations + EventStateCreate observability.EventType = "state.create" + EventStateClone observability.EventType = "state.clone" + EventStateSet observability.EventType = "state.set" + EventStateMerge observability.EventType = "state.merge" + + // Graph execution + EventGraphStart observability.EventType = "graph.start" + EventGraphComplete observability.EventType = "graph.complete" + EventNodeStart observability.EventType = "node.start" + EventNodeComplete observability.EventType = "node.complete" + EventNodeState observability.EventType = "node.state" + EventEdgeEvaluate observability.EventType = "edge.evaluate" + EventEdgeTransition observability.EventType = "edge.transition" + EventCycleDetected observability.EventType = "cycle.detected" + + // Checkpointing + EventCheckpointSave observability.EventType = "checkpoint.save" + EventCheckpointLoad observability.EventType = "checkpoint.load" + EventCheckpointResume observability.EventType = "checkpoint.resume" +) diff --git a/orchestrate/state/graph.go b/orchestrate/state/graph.go index d7a6ed3..2bd1c18 100644 --- a/orchestrate/state/graph.go +++ b/orchestrate/state/graph.go @@ -6,8 +6,8 @@ import ( "maps" "time" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" ) // StateGraph defines a workflow as a directed graph of nodes and edges. @@ -312,7 +312,8 @@ func (g *stateGraph) Resume(ctx context.Context, runID string) (State, error) { } g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventCheckpointLoad, + Type: EventCheckpointLoad, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -327,7 +328,8 @@ func (g *stateGraph) Resume(ctx context.Context, runID string) (State, error) { } g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventCheckpointResume, + Type: EventCheckpointResume, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -346,7 +348,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState } g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventGraphStart, + Type: EventGraphStart, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -387,7 +390,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState if visited[current] > 1 { g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventCycleDetected, + Type: EventCycleDetected, + Level: observability.LevelWarning, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -410,7 +414,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState } g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventNodeStart, + Type: EventNodeStart, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -423,13 +428,26 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState newState, err := node.Execute(ctx, state) g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventNodeComplete, + Type: EventNodeComplete, + Level: observability.LevelVerbose, + Timestamp: time.Now(), + Source: g.name, + Data: map[string]any{ + "node": current, + "iteration": iterations, + "error": err != nil, + }, + }) + + g.observer.OnEvent(ctx, observability.Event{ + Type: EventNodeState, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ "node": current, "iteration": iterations, - "error": err != nil, + "input_snapshot": maps.Clone(state.Data), "output_snapshot": maps.Clone(newState.Data), }, }) @@ -456,7 +474,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState } g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventCheckpointSave, + Type: EventCheckpointSave, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -468,7 +487,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState if g.exitPoints[current] { g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventGraphComplete, + Type: EventGraphComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -498,7 +518,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState nextNode := "" for i, edge := range edges { g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventEdgeEvaluate, + Type: EventEdgeEvaluate, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ @@ -513,7 +534,8 @@ func (g *stateGraph) execute(ctx context.Context, startNode string, initialState nextNode = edge.To g.observer.OnEvent(ctx, observability.Event{ - Type: observability.EventEdgeTransition, + Type: EventEdgeTransition, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: g.name, Data: map[string]any{ diff --git a/orchestrate/state/graph_test.go b/orchestrate/state/graph_test.go index adab912..3929152 100644 --- a/orchestrate/state/graph_test.go +++ b/orchestrate/state/graph_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) @@ -402,7 +402,7 @@ func TestStateGraph_Execute_Cycle(t *testing.T) { cycleEvents := 0 for _, event := range observer.events { - if event.Type == observability.EventCycleDetected { + if event.Type == state.EventCycleDetected { cycleEvents++ } } @@ -629,14 +629,16 @@ func TestStateGraph_Execute_ObserverEvents(t *testing.T) { } expectedEvents := []observability.EventType{ - observability.EventGraphStart, - observability.EventNodeStart, - observability.EventNodeComplete, - observability.EventEdgeEvaluate, - observability.EventEdgeTransition, - observability.EventNodeStart, - observability.EventNodeComplete, - observability.EventGraphComplete, + state.EventGraphStart, + state.EventNodeStart, + state.EventNodeComplete, + state.EventNodeState, + state.EventEdgeEvaluate, + state.EventEdgeTransition, + state.EventNodeStart, + state.EventNodeComplete, + state.EventNodeState, + state.EventGraphComplete, } if len(observer.events) != len(expectedEvents) { diff --git a/orchestrate/state/node_test.go b/orchestrate/state/node_test.go index 6621eea..2167a6a 100644 --- a/orchestrate/state/node_test.go +++ b/orchestrate/state/node_test.go @@ -5,7 +5,7 @@ import ( "errors" "testing" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) diff --git a/orchestrate/state/state.go b/orchestrate/state/state.go index 436393d..502ab2d 100644 --- a/orchestrate/state/state.go +++ b/orchestrate/state/state.go @@ -5,8 +5,8 @@ import ( "maps" "time" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" "github.com/google/uuid" + "github.com/tailored-agentic-units/kernel/observability" ) // State represents immutable workflow state flowing through graph execution. @@ -58,7 +58,8 @@ func New(observer observability.Observer) State { } observer.OnEvent(context.Background(), observability.Event{ - Type: observability.EventStateCreate, + Type: EventStateCreate, + Level: observability.LevelVerbose, Timestamp: s.Timestamp, Source: "state", Data: map[string]any{}, @@ -91,7 +92,8 @@ func (s State) Clone() State { } s.Observer.OnEvent(context.Background(), observability.Event{ - Type: observability.EventStateClone, + Type: EventStateClone, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "state", Data: map[string]any{"keys": len(newState.Data)}, @@ -135,7 +137,8 @@ func (s State) Set(key string, value any) State { newState.Data[key] = value s.Observer.OnEvent(context.Background(), observability.Event{ - Type: observability.EventStateSet, + Type: EventStateSet, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "state", Data: map[string]any{"key": key}, @@ -185,7 +188,8 @@ func (s State) Merge(other State) State { maps.Copy(newState.Data, other.Data) s.Observer.OnEvent(context.Background(), observability.Event{ - Type: observability.EventStateMerge, + Type: EventStateMerge, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "state", Data: map[string]any{"keys": len(other.Data)}, diff --git a/orchestrate/state/state_test.go b/orchestrate/state/state_test.go index 175d732..b31ca34 100644 --- a/orchestrate/state/state_test.go +++ b/orchestrate/state/state_test.go @@ -6,7 +6,7 @@ import ( "strings" "testing" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" ) @@ -59,9 +59,9 @@ func TestState_New_EmitsEvent(t *testing.T) { if len(observer.events) != 1 { t.Errorf("New() emitted %d events, want 1", len(observer.events)) } - if observer.events[0].Type != observability.EventStateCreate { + if observer.events[0].Type != state.EventStateCreate { t.Errorf("New() emitted event type %v, want %v", - observer.events[0].Type, observability.EventStateCreate) + observer.events[0].Type, state.EventStateCreate) } } @@ -78,9 +78,9 @@ func TestState_Clone(t *testing.T) { if len(observer.events) != 1 { t.Errorf("Clone() emitted %d events, want 1", len(observer.events)) } - if observer.events[0].Type != observability.EventStateClone { + if observer.events[0].Type != state.EventStateClone { t.Errorf("Clone() emitted event type %v, want %v", - observer.events[0].Type, observability.EventStateClone) + observer.events[0].Type, state.EventStateClone) } val1, exists1 := cloned.Get("key1") @@ -171,7 +171,7 @@ func TestState_Set(t *testing.T) { hasSet := false for _, event := range observer.events { - if event.Type == observability.EventStateSet { + if event.Type == state.EventStateSet { hasSet = true if event.Data["key"] != "key" { t.Errorf("EventStateSet data[key] = %v, want %v", @@ -222,7 +222,7 @@ func TestState_Merge(t *testing.T) { hasMerge := false for _, event := range observer.events { - if event.Type == observability.EventStateMerge { + if event.Type == state.EventStateMerge { hasMerge = true if event.Data["keys"] != 2 { t.Errorf("EventStateMerge data[keys] = %v, want %v", diff --git a/orchestrate/workflows/chain.go b/orchestrate/workflows/chain.go index 2e6dd02..322deb4 100644 --- a/orchestrate/workflows/chain.go +++ b/orchestrate/workflows/chain.go @@ -5,8 +5,8 @@ import ( "fmt" "time" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" ) // StepProcessor processes a single item and updates the accumulated context. @@ -161,7 +161,8 @@ func ProcessChain[TItem, TContext any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventChainStart, + Type: EventChainStart, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -173,7 +174,8 @@ func ProcessChain[TItem, TContext any]( if len(items) == 0 { observer.OnEvent(ctx, observability.Event{ - Type: observability.EventChainComplete, + Type: EventChainComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -201,7 +203,8 @@ func ProcessChain[TItem, TContext any]( Err: fmt.Errorf("processing cancelled: %w", err), } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventChainComplete, + Type: EventChainComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -214,7 +217,8 @@ func ProcessChain[TItem, TContext any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventStepStart, + Type: EventStepStart, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -232,7 +236,8 @@ func ProcessChain[TItem, TContext any]( Err: err, } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventStepComplete, + Type: EventStepComplete, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -242,7 +247,8 @@ func ProcessChain[TItem, TContext any]( }, }) observer.OnEvent(ctx, observability.Event{ - Type: observability.EventChainComplete, + Type: EventChainComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -261,7 +267,8 @@ func ProcessChain[TItem, TContext any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventStepComplete, + Type: EventStepComplete, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ @@ -281,7 +288,8 @@ func ProcessChain[TItem, TContext any]( result.Steps = len(items) observer.OnEvent(ctx, observability.Event{ - Type: observability.EventChainComplete, + Type: EventChainComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessChain", Data: map[string]any{ diff --git a/orchestrate/workflows/chain_test.go b/orchestrate/workflows/chain_test.go index f056001..aaf211b 100644 --- a/orchestrate/workflows/chain_test.go +++ b/orchestrate/workflows/chain_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/workflows" ) @@ -316,12 +316,12 @@ func TestProcessChain_ObserverIntegration(t *testing.T) { } expectedEvents := []observability.EventType{ - observability.EventChainStart, - observability.EventStepStart, - observability.EventStepComplete, - observability.EventStepStart, - observability.EventStepComplete, - observability.EventChainComplete, + workflows.EventChainStart, + workflows.EventStepStart, + workflows.EventStepComplete, + workflows.EventStepStart, + workflows.EventStepComplete, + workflows.EventChainComplete, } if len(observer.events) != len(expectedEvents) { @@ -365,12 +365,12 @@ func TestProcessChain_ObserverOnError(t *testing.T) { } expectedEvents := []observability.EventType{ - observability.EventChainStart, - observability.EventStepStart, - observability.EventStepComplete, - observability.EventStepStart, - observability.EventStepComplete, - observability.EventChainComplete, + workflows.EventChainStart, + workflows.EventStepStart, + workflows.EventStepComplete, + workflows.EventStepStart, + workflows.EventStepComplete, + workflows.EventChainComplete, } if len(observer.events) != len(expectedEvents) { diff --git a/orchestrate/workflows/conditional.go b/orchestrate/workflows/conditional.go index 3a162a7..4ecedfa 100644 --- a/orchestrate/workflows/conditional.go +++ b/orchestrate/workflows/conditional.go @@ -5,8 +5,8 @@ import ( "fmt" "time" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" ) // RoutePredicate evaluates state and returns a route name for conditional routing. @@ -153,7 +153,8 @@ func ProcessConditional[TState any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventRouteEvaluate, + Type: EventRouteEvaluate, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "conditional", Data: map[string]any{ @@ -183,7 +184,8 @@ func ProcessConditional[TState any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventRouteSelect, + Type: EventRouteSelect, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "conditional", Data: map[string]any{ @@ -210,7 +212,8 @@ func ProcessConditional[TState any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventRouteExecute, + Type: EventRouteExecute, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "conditional", Data: map[string]any{ diff --git a/orchestrate/workflows/conditional_test.go b/orchestrate/workflows/conditional_test.go index 60afb94..4dea218 100644 --- a/orchestrate/workflows/conditional_test.go +++ b/orchestrate/workflows/conditional_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/workflows" ) @@ -323,9 +323,9 @@ func TestProcessConditional_ObserverEvents(t *testing.T) { } expectedEvents := []observability.EventType{ - observability.EventRouteEvaluate, - observability.EventRouteSelect, - observability.EventRouteExecute, + workflows.EventRouteEvaluate, + workflows.EventRouteSelect, + workflows.EventRouteExecute, } if len(capture.events) != len(expectedEvents) { diff --git a/orchestrate/workflows/events.go b/orchestrate/workflows/events.go new file mode 100644 index 0000000..20a82f5 --- /dev/null +++ b/orchestrate/workflows/events.go @@ -0,0 +1,22 @@ +package workflows + +import "github.com/tailored-agentic-units/kernel/observability" + +const ( + // Sequential chains + EventChainStart observability.EventType = "chain.start" + EventChainComplete observability.EventType = "chain.complete" + EventStepStart observability.EventType = "step.start" + EventStepComplete observability.EventType = "step.complete" + + // Parallel execution + EventParallelStart observability.EventType = "parallel.start" + EventParallelComplete observability.EventType = "parallel.complete" + EventWorkerStart observability.EventType = "worker.start" + EventWorkerComplete observability.EventType = "worker.complete" + + // Conditional routing + EventRouteEvaluate observability.EventType = "route.evaluate" + EventRouteSelect observability.EventType = "route.select" + EventRouteExecute observability.EventType = "route.execute" +) diff --git a/orchestrate/workflows/integration_test.go b/orchestrate/workflows/integration_test.go index dea6b5c..56c14e6 100644 --- a/orchestrate/workflows/integration_test.go +++ b/orchestrate/workflows/integration_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/state" "github.com/tailored-agentic-units/kernel/orchestrate/workflows" ) diff --git a/orchestrate/workflows/parallel.go b/orchestrate/workflows/parallel.go index e108d6d..a94cad6 100644 --- a/orchestrate/workflows/parallel.go +++ b/orchestrate/workflows/parallel.go @@ -8,8 +8,8 @@ import ( "sync/atomic" "time" + "github.com/tailored-agentic-units/kernel/observability" "github.com/tailored-agentic-units/kernel/orchestrate/config" - "github.com/tailored-agentic-units/kernel/orchestrate/observability" ) // TaskProcessor processes a single item and returns a result. @@ -163,7 +163,8 @@ func ProcessParallel[TItem, TResult any]( if len(items) == 0 { observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelStart, + Type: EventParallelStart, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -175,7 +176,8 @@ func ProcessParallel[TItem, TResult any]( }) observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelComplete, + Type: EventParallelComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -194,7 +196,8 @@ func ProcessParallel[TItem, TResult any]( workerCount := calculateWorkerCount(cfg.MaxWorkers, cfg.WorkerCap, len(items)) observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelStart, + Type: EventParallelStart, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -262,7 +265,8 @@ func ProcessParallel[TItem, TResult any]( if collectorErr != nil { observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelComplete, + Type: EventParallelComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -279,7 +283,8 @@ func ProcessParallel[TItem, TResult any]( if ctx.Err() != nil { observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelComplete, + Type: EventParallelComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -297,7 +302,8 @@ func ProcessParallel[TItem, TResult any]( if len(errors) > 0 { if cfg.FailFast() || len(results) == 0 { observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelComplete, + Type: EventParallelComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -314,7 +320,8 @@ func ProcessParallel[TItem, TResult any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventParallelComplete, + Type: EventParallelComplete, + Level: observability.LevelInfo, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -356,11 +363,11 @@ func calculateWorkerCount(maxWorkers, workerCap, itemCount int) int { // processWorker implements individual worker goroutine logic. // // Each worker runs this function concurrently with other workers. The worker: -// 1. Reads items from workQueue until closed or context cancelled -// 2. Processes each item via processor function -// 3. Sends indexed results to resultChannel -// 4. Calls progress callback on success (thread-safe via atomic counter) -// 5. Cancels context on error if FailFast enabled +// 1. Reads items from workQueue until closed or context cancelled +// 2. Processes each item via processor function +// 3. Sends indexed results to resultChannel +// 4. Calls progress callback on success (thread-safe via atomic counter) +// 5. Cancels context on error if FailFast enabled // // Workers exit when: // - workQueue is closed (all items distributed) @@ -391,7 +398,8 @@ func processWorker[TItem, TResult any]( } observer.OnEvent(ctx, observability.Event{ - Type: observability.EventWorkerStart, + Type: EventWorkerStart, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -404,7 +412,8 @@ func processWorker[TItem, TResult any]( result, err := processor(ctx, work.item) observer.OnEvent(ctx, observability.Event{ - Type: observability.EventWorkerComplete, + Type: EventWorkerComplete, + Level: observability.LevelVerbose, Timestamp: time.Now(), Source: "workflows.ProcessParallel", Data: map[string]any{ @@ -445,10 +454,10 @@ func processWorker[TItem, TResult any]( // the result channel buffer fills. // // The collector: -// 1. Reads all results from resultChannel until closed -// 2. Separates successes into resultMap, failures into errorMap (keyed by index) -// 3. Builds ordered slices by iterating 0 to itemCount -// 4. Returns dense slices (successes-only and failures-only) +// 1. Reads all results from resultChannel until closed +// 2. Separates successes into resultMap, failures into errorMap (keyed by index) +// 3. Builds ordered slices by iterating 0 to itemCount +// 4. Returns dense slices (successes-only and failures-only) // // Order preservation is achieved through indexed results - even though workers complete // out of order, the final slices are built by iterating indices sequentially. From 09831990295cdbc1a09222234c9802848f00d5dc Mon Sep 17 00:00:00 2001 From: Jaime Still Date: Wed, 18 Feb 2026 17:53:41 -0500 Subject: [PATCH 2/2] =?UTF-8?q?update=20objective=20tracking:=20#25=20?= =?UTF-8?q?=E2=86=92=20PR=20#32?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _project/objective.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_project/objective.md b/_project/objective.md index e0057e4..1aef32a 100644 --- a/_project/objective.md +++ b/_project/objective.md @@ -13,7 +13,7 @@ Establish the kernel's HTTP interface — the sole extensibility boundary throug |---|-------|--------| | 23 | Streaming tools protocol | PR #30 | | 24 | Agent registry | PR #31 | -| 25 | Kernel observer | In Progress | +| 25 | Kernel observer | PR #32 | | 26 | Multi-session kernel | Open | | 27 | HTTP API with SSE streaming | Open | | 28 | Server entry point | Open |