Skip to content

[feat] Implement checkpointing mechanism for task execution#199

Draft
sroussey wants to merge 1 commit intomainfrom
checkpoint
Draft

[feat] Implement checkpointing mechanism for task execution#199
sroussey wants to merge 1 commit intomainfrom
checkpoint

Conversation

@sroussey
Copy link
Collaborator

  • Introduced CheckpointSaver interface and its implementations (InMemoryCheckpointSaver, TabularCheckpointSaver) for persisting execution state.
  • Enhanced task runners (GraphAsTaskRunner, IteratorTaskRunner, TaskRunner, WhileTaskRunner) to support checkpointing, allowing for state recovery and iteration-level checkpoints.
  • Updated TaskGraph to manage checkpointing configuration and resume execution from checkpoints.
  • Added tests to validate checkpoint saving and retrieval functionality, ensuring robust handling of task execution states.

Plan:

name: Checkpointing & Retry
overview: Add execution checkpointing to @workglow/task-graph that saves graph state (task statuses, outputs, dataflow data) after each task completion, with configurable granularity. Build InMemory and Tabular checkpoint savers, add thread_id isolation, enable resume-from-checkpoint. On the builder side, wire checkpoint data into activities and build iteration time-travel UI (builder not a part) todos:

  • id: checkpoint-types content: Create checkpoint data model types (CheckpointData, TaskCheckpointState, etc.) in packages/task-graph/src/checkpoint/ status: pending
  • id: checkpoint-saver-interface content: Create abstract CheckpointSaver class with save/get/getLatest/getHistory/delete methods status: pending
  • id: inmemory-saver content: Implement InMemoryCheckpointSaver using Map with threadId index status: pending
  • id: tabular-saver content: Implement TabularCheckpointSaver using existing ITabularStorage interface status: pending
  • id: runner-checkpoint-hooks content: Add captureCheckpoint() to TaskGraphRunner, call after each task completion with configurable granularity status: pending
  • id: runner-restore content: Add restoreFromCheckpoint() to TaskGraphRunner for resume-on-failure status: pending
  • id: config-extensions content: Extend TaskGraphRunConfig, IRunConfig, and IExecuteContext with checkpointSaver/threadId/granularity status: pending
  • id: iteration-checkpoints content: Add iteration checkpointing to WhileTask and IteratorTaskRunner after each subgraph run status: pending
  • id: checkpoint-events content: Add 'checkpoint' event to TaskGraphEvents and emit from runner status: pending
  • id: exports content: Export all checkpoint types/classes from common.ts and add checkpoint/index.ts status: pending
  • id: tests content: Write tests for checkpoint save/restore, resume-from-failure, and iteration checkpoints status: pending isProject: false

Checkpointing & Retry

Architecture Overview

flowchart TD
    subgraph taskGraph ["@workglow/task-graph"]
        CheckpointSaver["CheckpointSaver (abstract)"]
        InMemory["InMemoryCheckpointSaver"]
        Tabular["TabularCheckpointSaver"]
        CheckpointData["CheckpointData"]
        TaskGraphRunner_CP["TaskGraphRunner (checkpoint hooks)"]
        WhileTask_CP["WhileTask / IteratorTask (iteration checkpoints)"]
    end

    subgraph builder ["Builder (frontend)"]
        ActivityRepo["ActivityRepository + checkpoint_id refs"]
        ActivityViewer["ActivityViewer (per-task drill-in)"]
        TimeTravelUI["Iteration Time Travel UI"]
    end

    TaskGraphRunner_CP -->|"after each task"| CheckpointSaver
    WhileTask_CP -->|"after each iteration"| CheckpointSaver
    CheckpointSaver --> InMemory
    CheckpointSaver --> Tabular
    ActivityRepo -->|"reads"| CheckpointSaver
    ActivityViewer -->|"reads"| ActivityRepo
    TimeTravelUI -->|"navigates"| ActivityViewer
Loading

Part 1: Checkpoint Data Model & Saver Interface

New directory: packages/task-graph/src/checkpoint/

1a. Checkpoint Types (CheckpointTypes.ts)

Define the core data structures:

export type CheckpointId = string;
export type ThreadId = string;

export interface TaskCheckpointState {
  taskId: unknown;
  taskType: string;
  status: TaskStatus;
  inputData: TaskInput;
  outputData: TaskOutput;
  progress: number;
  error?: string;
  startedAt?: string;
  completedAt?: string;
}

export interface DataflowCheckpointState {
  id: string;
  sourceTaskId: unknown;
  targetTaskId: unknown;
  status: TaskStatus;
  portData?: TaskOutput;
}

export interface CheckpointData {
  checkpointId: CheckpointId;
  threadId: ThreadId;
  parentCheckpointId?: CheckpointId;
  graphJson: TaskGraphJson; // structural definition
  taskStates: TaskCheckpointState[]; // runtime state per task
  dataflowStates: DataflowCheckpointState[];
  metadata: {
    createdAt: string;
    triggerTaskId?: unknown; // task that just completed
    iterationIndex?: number; // for while/map loops
    iterationParentTaskId?: unknown; // which iterator task owns this
  };
}

1b. CheckpointSaver Interface (CheckpointSaver.ts)

export abstract class CheckpointSaver {
  abstract saveCheckpoint(data: CheckpointData): Promise<void>;
  abstract getCheckpoint(checkpointId: CheckpointId): Promise<CheckpointData | undefined>;
  abstract getLatestCheckpoint(threadId: ThreadId): Promise<CheckpointData | undefined>;
  abstract getCheckpointHistory(threadId: ThreadId): Promise<CheckpointData[]>;
  abstract getCheckpointsForIteration(
    threadId: ThreadId,
    iterationParentTaskId: unknown
  ): Promise<CheckpointData[]>;
  abstract deleteCheckpoints(threadId: ThreadId): Promise<void>;
}

Modeled after the existing TaskOutputRepository pattern with EventEmitter support and a service token (CHECKPOINT_SAVER).

1c. InMemoryCheckpointSaver (InMemoryCheckpointSaver.ts)

Simple Map<CheckpointId, CheckpointData> with a secondary index on threadId. Follows the same pattern as existing in-memory storage implementations.

1d. TabularCheckpointSaver (TabularCheckpointSaver.ts)

Uses the existing ITabularStorage interface (same as TaskOutputTabularRepository). Schema:

  • Primary key: checkpoint_id
  • Columns: thread_id, parent_checkpoint_id, graph_json (compressed JSON), task_states (compressed JSON), dataflow_states (compressed JSON), metadata (JSON), created_at
  • Searchable by: thread_id

This automatically gives us SQLite, Postgres, IndexedDB, Supabase, and File-backed checkpoint storage via the existing tabular storage backends.

1e. Exports

Add all checkpoint exports to [packages/task-graph/src/common.ts](packages/task-graph/src/common.ts):

export * from "./checkpoint/CheckpointTypes";
export * from "./checkpoint/CheckpointSaver";
export * from "./checkpoint/InMemoryCheckpointSaver";
export * from "./checkpoint/TabularCheckpointSaver";

Part 2: Integrate Checkpointing into Execution

2a. Add CheckpointSaver to TaskGraphRunConfig

In [packages/task-graph/src/task-graph/TaskGraph.ts](packages/task-graph/src/task-graph/TaskGraph.ts), extend TaskGraphRunConfig:

export interface TaskGraphRunConfig {
  outputCache?: TaskOutputRepository | boolean;
  parentSignal?: AbortSignal;
  registry?: ServiceRegistry;
  checkpointSaver?: CheckpointSaver; // NEW
  threadId?: string; // NEW
  resumeFromCheckpoint?: CheckpointId; // NEW
  checkpointGranularity?: "every-task" | "top-level-only" | "none"; // NEW, default 'every-task'
}

2b. Checkpoint Hook in TaskGraphRunner

In [packages/task-graph/src/task-graph/TaskGraphRunner.ts](packages/task-graph/src/task-graph/TaskGraphRunner.ts):

  1. Store checkpointSaver, threadId, and checkpointGranularity as instance properties (set in handleStart).
  2. Add a captureCheckpoint(triggerTaskId) method that snapshots the full graph state (iterating graph.getTasks() and graph.getDataflows() to build TaskCheckpointState[] and DataflowCheckpointState[]).
  3. Call captureCheckpoint in runGraph() after each task completes (inside the runAsync function, after pushOutputFromNodeToEdges and pushStatusFromNodeToEdges), respecting checkpointGranularity.
  4. Emit a new checkpoint event on the graph: this.graph.emit("checkpoint", checkpointData).

2c. Resume from Checkpoint

Add a restoreFromCheckpoint(checkpointData: CheckpointData) method to TaskGraphRunner that:

  1. For each task in checkpointData.taskStates with status COMPLETED or DISABLED, restore the task's status, runOutputData, progress, error.
  2. For each dataflow, restore portData and status.
  3. Configure the DependencyBasedScheduler to skip already-completed tasks by calling onTaskCompleted for each.
  4. The subsequent runGraph call then only processes PENDING tasks.

In TaskGraph.run(), if config.resumeFromCheckpoint is provided, call restoreFromCheckpoint instead of resetGraph in handleStart.

2d. Iteration Checkpoints in WhileTask and IteratorTaskRunner

In [packages/task-graph/src/task/WhileTask.ts](packages/task-graph/src/task/WhileTask.ts) (line ~380, inside the while loop):

  • After each iteration's subGraph.run() completes, if the execution context has a checkpoint saver, capture a checkpoint with iterationIndex and iterationParentTaskId metadata.

In [packages/task-graph/src/task/IteratorTaskRunner.ts](packages/task-graph/src/task/IteratorTaskRunner.ts) (inside executeSubgraphIteration):

  • Same pattern: after each subgraph run, capture an iteration checkpoint.

This requires threading the checkpointSaver and threadId through the execution context (IExecuteContext or IRunConfig). The cleanest approach is to add optional checkpointSaver and threadId to the IRunConfig interface in [packages/task-graph/src/task/ITask.ts](packages/task-graph/src/task/ITask.ts).

2e. Thread ID Concept

The threadId serves as the isolation key for checkpoint namespacing. When running a graph:

  • If no threadId is provided, generate one via uuid4().
  • The threadId is stored on the runner and propagated to all child graph runs.
  • Maps directly to activity_id in the builder.

Part 3: Builder - Checkpoint Data in Activities

3a. Wire CheckpointSaver into runWorkflow

In [builder/src/lib/run-workflow.ts](builder/src/lib/run-workflow.ts):

  1. Create/get a CheckpointSaver (TabularCheckpointSaver backed by the same storage infrastructure used by ActivityRepository).
  2. Pass it to taskGraph.run() via the config: { checkpointSaver, threadId: actId }.
  3. On failure, the checkpoint is already saved. The existing activity_id serves as the threadId.
  4. Add a resumeFromCheckpoint option to RunWorkflowOptions that, when set, passes resumeFromCheckpoint to the graph config to skip completed tasks.

3b. Checkpoint Repository for the Builder

Create builder/src/components/activities/CheckpointRepository.ts:

  • Wraps a TabularCheckpointSaver (or an InMemoryCheckpointSaver for browser-only mode).
  • Provides queries: getCheckpointsForActivity(activityId), getIterationCheckpoints(activityId, taskId).
  • Registered alongside ActivityRepository in the builder's storage setup.

3c. Activity Detail: Per-Task Run Data

Enhance [builder/src/components/activities/ActivityViewer.tsx](builder/src/components/activities/ActivityViewer.tsx):

  • Fetch checkpoints for the current activity using CheckpointRepository.
  • Display a timeline of task completions derived from checkpoint metadata.triggerTaskId and metadata.createdAt.
  • For each task in the graph, show its state (status, inputs, outputs, timing) by reading from the relevant checkpoint's taskStates.
  • For iterative tasks (WhileTask, MapTask), show an expandable list of iteration checkpoints.

File Summary

Area Files Action
libs packages/task-graph/src/checkpoint/CheckpointTypes.ts New
libs packages/task-graph/src/checkpoint/CheckpointSaver.ts New
libs packages/task-graph/src/checkpoint/InMemoryCheckpointSaver.ts New
libs packages/task-graph/src/checkpoint/TabularCheckpointSaver.ts New
libs packages/task-graph/src/checkpoint/index.ts New
libs packages/task-graph/src/common.ts Modify (add checkpoint exports)
libs packages/task-graph/src/task-graph/TaskGraph.ts Modify (extend config)
libs packages/task-graph/src/task-graph/TaskGraphRunner.ts Modify (checkpoint hooks, restore)
libs packages/task-graph/src/task-graph/TaskGraphEvents.ts Modify (add checkpoint event)
libs packages/task-graph/src/task/ITask.ts Modify (add checkpoint to IRunConfig)
libs packages/task-graph/src/task/TaskRunner.ts Modify (propagate checkpoint config)
libs packages/task-graph/src/task/WhileTask.ts Modify (iteration checkpoints)
libs packages/task-graph/src/task/IteratorTaskRunner.ts Modify (iteration checkpoints)

Testing

Tests should be added in packages/test/src/test/task/:

  • Checkpoint.test.ts - Test checkpoint save/restore cycle for a simple graph
  • CheckpointResume.test.ts - Test resume from checkpoint after simulated failure
  • CheckpointIteration.test.ts - Test iteration checkpoints for WhileTask and MapTask

- Introduced CheckpointSaver interface and its implementations (InMemoryCheckpointSaver, TabularCheckpointSaver) for persisting execution state.
- Enhanced task runners (GraphAsTaskRunner, IteratorTaskRunner, TaskRunner, WhileTaskRunner) to support checkpointing, allowing for state recovery and iteration-level checkpoints.
- Updated TaskGraph to manage checkpointing configuration and resume execution from checkpoints.
- Added tests to validate checkpoint saving and retrieval functionality, ensuring robust handling of task execution states.

Plan:
---
name: Checkpointing & Retry
overview: Add execution checkpointing to @workglow/task-graph that saves graph state (task statuses, outputs, dataflow data) after each task completion, with configurable granularity. Build InMemory and Tabular checkpoint savers, add thread_id isolation, enable resume-from-checkpoint. On the builder side, wire checkpoint data into activities and build iteration time-travel UI (builder not a part)
todos:
  - id: checkpoint-types
    content: Create checkpoint data model types (CheckpointData, TaskCheckpointState, etc.) in packages/task-graph/src/checkpoint/
    status: pending
  - id: checkpoint-saver-interface
    content: Create abstract CheckpointSaver class with save/get/getLatest/getHistory/delete methods
    status: pending
  - id: inmemory-saver
    content: Implement InMemoryCheckpointSaver using Map with threadId index
    status: pending
  - id: tabular-saver
    content: Implement TabularCheckpointSaver using existing ITabularStorage interface
    status: pending
  - id: runner-checkpoint-hooks
    content: Add captureCheckpoint() to TaskGraphRunner, call after each task completion with configurable granularity
    status: pending
  - id: runner-restore
    content: Add restoreFromCheckpoint() to TaskGraphRunner for resume-on-failure
    status: pending
  - id: config-extensions
    content: Extend TaskGraphRunConfig, IRunConfig, and IExecuteContext with checkpointSaver/threadId/granularity
    status: pending
  - id: iteration-checkpoints
    content: Add iteration checkpointing to WhileTask and IteratorTaskRunner after each subgraph run
    status: pending
  - id: checkpoint-events
    content: Add 'checkpoint' event to TaskGraphEvents and emit from runner
    status: pending
  - id: exports
    content: Export all checkpoint types/classes from common.ts and add checkpoint/index.ts
    status: pending
  - id: tests
    content: Write tests for checkpoint save/restore, resume-from-failure, and iteration checkpoints
    status: pending
isProject: false
---

# Checkpointing & Retry

## Architecture Overview

```mermaid
flowchart TD
    subgraph taskGraph ["@workglow/task-graph"]
        CheckpointSaver["CheckpointSaver (abstract)"]
        InMemory["InMemoryCheckpointSaver"]
        Tabular["TabularCheckpointSaver"]
        CheckpointData["CheckpointData"]
        TaskGraphRunner_CP["TaskGraphRunner (checkpoint hooks)"]
        WhileTask_CP["WhileTask / IteratorTask (iteration checkpoints)"]
    end

    subgraph builder ["Builder (frontend)"]
        ActivityRepo["ActivityRepository + checkpoint_id refs"]
        ActivityViewer["ActivityViewer (per-task drill-in)"]
        TimeTravelUI["Iteration Time Travel UI"]
    end

    TaskGraphRunner_CP -->|"after each task"| CheckpointSaver
    WhileTask_CP -->|"after each iteration"| CheckpointSaver
    CheckpointSaver --> InMemory
    CheckpointSaver --> Tabular
    ActivityRepo -->|"reads"| CheckpointSaver
    ActivityViewer -->|"reads"| ActivityRepo
    TimeTravelUI -->|"navigates"| ActivityViewer
```

## Part 1: Checkpoint Data Model & Saver Interface

**New directory:** `packages/task-graph/src/checkpoint/`

### 1a. Checkpoint Types (`CheckpointTypes.ts`)

Define the core data structures:

```typescript
export type CheckpointId = string;
export type ThreadId = string;

export interface TaskCheckpointState {
  taskId: unknown;
  taskType: string;
  status: TaskStatus;
  inputData: TaskInput;
  outputData: TaskOutput;
  progress: number;
  error?: string;
  startedAt?: string;
  completedAt?: string;
}

export interface DataflowCheckpointState {
  id: string;
  sourceTaskId: unknown;
  targetTaskId: unknown;
  status: TaskStatus;
  portData?: TaskOutput;
}

export interface CheckpointData {
  checkpointId: CheckpointId;
  threadId: ThreadId;
  parentCheckpointId?: CheckpointId;
  graphJson: TaskGraphJson; // structural definition
  taskStates: TaskCheckpointState[]; // runtime state per task
  dataflowStates: DataflowCheckpointState[];
  metadata: {
    createdAt: string;
    triggerTaskId?: unknown; // task that just completed
    iterationIndex?: number; // for while/map loops
    iterationParentTaskId?: unknown; // which iterator task owns this
  };
}
```

### 1b. CheckpointSaver Interface (`CheckpointSaver.ts`)

```typescript
export abstract class CheckpointSaver {
  abstract saveCheckpoint(data: CheckpointData): Promise<void>;
  abstract getCheckpoint(checkpointId: CheckpointId): Promise<CheckpointData | undefined>;
  abstract getLatestCheckpoint(threadId: ThreadId): Promise<CheckpointData | undefined>;
  abstract getCheckpointHistory(threadId: ThreadId): Promise<CheckpointData[]>;
  abstract getCheckpointsForIteration(
    threadId: ThreadId,
    iterationParentTaskId: unknown
  ): Promise<CheckpointData[]>;
  abstract deleteCheckpoints(threadId: ThreadId): Promise<void>;
}
```

Modeled after the existing `TaskOutputRepository` pattern with `EventEmitter` support and a service token (`CHECKPOINT_SAVER`).

### 1c. InMemoryCheckpointSaver (`InMemoryCheckpointSaver.ts`)

Simple `Map<CheckpointId, CheckpointData>` with a secondary index on `threadId`. Follows the same pattern as existing in-memory storage implementations.

### 1d. TabularCheckpointSaver (`TabularCheckpointSaver.ts`)

Uses the existing `ITabularStorage` interface (same as `TaskOutputTabularRepository`). Schema:

- Primary key: `checkpoint_id`
- Columns: `thread_id`, `parent_checkpoint_id`, `graph_json` (compressed JSON), `task_states` (compressed JSON), `dataflow_states` (compressed JSON), `metadata` (JSON), `created_at`
- Searchable by: `thread_id`

This automatically gives us SQLite, Postgres, IndexedDB, Supabase, and File-backed checkpoint storage via the existing tabular storage backends.

### 1e. Exports

Add all checkpoint exports to `[packages/task-graph/src/common.ts](packages/task-graph/src/common.ts)`:

```typescript
export * from "./checkpoint/CheckpointTypes";
export * from "./checkpoint/CheckpointSaver";
export * from "./checkpoint/InMemoryCheckpointSaver";
export * from "./checkpoint/TabularCheckpointSaver";
```

## Part 2: Integrate Checkpointing into Execution

### 2a. Add `CheckpointSaver` to `TaskGraphRunConfig`

In `[packages/task-graph/src/task-graph/TaskGraph.ts](packages/task-graph/src/task-graph/TaskGraph.ts)`, extend `TaskGraphRunConfig`:

```typescript
export interface TaskGraphRunConfig {
  outputCache?: TaskOutputRepository | boolean;
  parentSignal?: AbortSignal;
  registry?: ServiceRegistry;
  checkpointSaver?: CheckpointSaver; // NEW
  threadId?: string; // NEW
  resumeFromCheckpoint?: CheckpointId; // NEW
  checkpointGranularity?: "every-task" | "top-level-only" | "none"; // NEW, default 'every-task'
}
```

### 2b. Checkpoint Hook in `TaskGraphRunner`

In `[packages/task-graph/src/task-graph/TaskGraphRunner.ts](packages/task-graph/src/task-graph/TaskGraphRunner.ts)`:

1. Store `checkpointSaver`, `threadId`, and `checkpointGranularity` as instance properties (set in `handleStart`).
2. Add a `captureCheckpoint(triggerTaskId)` method that snapshots the full graph state (iterating `graph.getTasks()` and `graph.getDataflows()` to build `TaskCheckpointState[]` and `DataflowCheckpointState[]`).
3. Call `captureCheckpoint` in `runGraph()` after each task completes (inside the `runAsync` function, after `pushOutputFromNodeToEdges` and `pushStatusFromNodeToEdges`), respecting `checkpointGranularity`.
4. Emit a new `checkpoint` event on the graph: `this.graph.emit("checkpoint", checkpointData)`.

### 2c. Resume from Checkpoint

Add a `restoreFromCheckpoint(checkpointData: CheckpointData)` method to `TaskGraphRunner` that:

1. For each task in `checkpointData.taskStates` with status `COMPLETED` or `DISABLED`, restore the task's `status`, `runOutputData`, `progress`, `error`.
2. For each dataflow, restore `portData` and `status`.
3. Configure the `DependencyBasedScheduler` to skip already-completed tasks by calling `onTaskCompleted` for each.
4. The subsequent `runGraph` call then only processes `PENDING` tasks.

In `TaskGraph.run()`, if `config.resumeFromCheckpoint` is provided, call `restoreFromCheckpoint` instead of `resetGraph` in `handleStart`.

### 2d. Iteration Checkpoints in `WhileTask` and `IteratorTaskRunner`

In `[packages/task-graph/src/task/WhileTask.ts](packages/task-graph/src/task/WhileTask.ts)` (line ~380, inside the while loop):

- After each iteration's `subGraph.run()` completes, if the execution context has a checkpoint saver, capture a checkpoint with `iterationIndex` and `iterationParentTaskId` metadata.

In `[packages/task-graph/src/task/IteratorTaskRunner.ts](packages/task-graph/src/task/IteratorTaskRunner.ts)` (inside `executeSubgraphIteration`):

- Same pattern: after each subgraph run, capture an iteration checkpoint.

This requires threading the `checkpointSaver` and `threadId` through the execution context (`IExecuteContext` or `IRunConfig`). The cleanest approach is to add optional `checkpointSaver` and `threadId` to the `IRunConfig` interface in `[packages/task-graph/src/task/ITask.ts](packages/task-graph/src/task/ITask.ts)`.

### 2e. Thread ID Concept

The `threadId` serves as the isolation key for checkpoint namespacing. When running a graph:

- If no `threadId` is provided, generate one via `uuid4()`.
- The `threadId` is stored on the runner and propagated to all child graph runs.
- Maps directly to `activity_id` in the builder.

## Part 3: Builder - Checkpoint Data in Activities

### 3a. Wire CheckpointSaver into `runWorkflow`

In `[builder/src/lib/run-workflow.ts](builder/src/lib/run-workflow.ts)`:

1. Create/get a `CheckpointSaver` (TabularCheckpointSaver backed by the same storage infrastructure used by `ActivityRepository`).
2. Pass it to `taskGraph.run()` via the config: `{ checkpointSaver, threadId: actId }`.
3. On failure, the checkpoint is already saved. The existing `activity_id` serves as the `threadId`.
4. Add a `resumeFromCheckpoint` option to `RunWorkflowOptions` that, when set, passes `resumeFromCheckpoint` to the graph config to skip completed tasks.

### 3b. Checkpoint Repository for the Builder

Create `builder/src/components/activities/CheckpointRepository.ts`:

- Wraps a `TabularCheckpointSaver` (or an `InMemoryCheckpointSaver` for browser-only mode).
- Provides queries: `getCheckpointsForActivity(activityId)`, `getIterationCheckpoints(activityId, taskId)`.
- Registered alongside `ActivityRepository` in the builder's storage setup.

### 3c. Activity Detail: Per-Task Run Data

Enhance `[builder/src/components/activities/ActivityViewer.tsx](builder/src/components/activities/ActivityViewer.tsx)`:

- Fetch checkpoints for the current activity using `CheckpointRepository`.
- Display a timeline of task completions derived from checkpoint `metadata.triggerTaskId` and `metadata.createdAt`.
- For each task in the graph, show its state (status, inputs, outputs, timing) by reading from the relevant checkpoint's `taskStates`.
- For iterative tasks (WhileTask, MapTask), show an expandable list of iteration checkpoints.

## File Summary

| Area | Files                                                           | Action                                |
| ---- | --------------------------------------------------------------- | ------------------------------------- |
| libs | `packages/task-graph/src/checkpoint/CheckpointTypes.ts`         | New                                   |
| libs | `packages/task-graph/src/checkpoint/CheckpointSaver.ts`         | New                                   |
| libs | `packages/task-graph/src/checkpoint/InMemoryCheckpointSaver.ts` | New                                   |
| libs | `packages/task-graph/src/checkpoint/TabularCheckpointSaver.ts`  | New                                   |
| libs | `packages/task-graph/src/checkpoint/index.ts`                   | New                                   |
| libs | `packages/task-graph/src/common.ts`                             | Modify (add checkpoint exports)       |
| libs | `packages/task-graph/src/task-graph/TaskGraph.ts`               | Modify (extend config)                |
| libs | `packages/task-graph/src/task-graph/TaskGraphRunner.ts`         | Modify (checkpoint hooks, restore)    |
| libs | `packages/task-graph/src/task-graph/TaskGraphEvents.ts`         | Modify (add checkpoint event)         |
| libs | `packages/task-graph/src/task/ITask.ts`                         | Modify (add checkpoint to IRunConfig) |
| libs | `packages/task-graph/src/task/TaskRunner.ts`                    | Modify (propagate checkpoint config)  |
| libs | `packages/task-graph/src/task/WhileTask.ts`                     | Modify (iteration checkpoints)        |
| libs | `packages/task-graph/src/task/IteratorTaskRunner.ts`            | Modify (iteration checkpoints)        |

## Testing

Tests should be added in `packages/test/src/test/task/`:

- `Checkpoint.test.ts` - Test checkpoint save/restore cycle for a simple graph
- `CheckpointResume.test.ts` - Test resume from checkpoint after simulated failure
- `CheckpointIteration.test.ts` - Test iteration checkpoints for WhileTask and MapTask
@sroussey
Copy link
Collaborator Author

Will close #183

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a comprehensive checkpointing mechanism for task execution in the @workglow/task-graph package. The feature enables capturing and restoring execution state at various granularities (every-task, top-level-only, or none), supporting resume-from-failure scenarios and execution history tracking.

Changes:

  • Introduced CheckpointSaver interface with InMemoryCheckpointSaver and TabularCheckpointSaver implementations for flexible checkpoint persistence
  • Enhanced TaskGraph, TaskGraphRunner, and task runners (WhileTask, IteratorTask, GraphAsTask, standard TaskRunner) to support checkpoint capture and restoration
  • Added CheckpointData types to model task states, dataflow states, and execution metadata including iteration-level tracking

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
packages/task-graph/src/checkpoint/CheckpointTypes.ts Defines checkpoint data structures (CheckpointData, TaskCheckpointState, DataflowCheckpointState) and type aliases
packages/task-graph/src/checkpoint/CheckpointSaver.ts Abstract CheckpointSaver class defining the interface for checkpoint persistence
packages/task-graph/src/checkpoint/InMemoryCheckpointSaver.ts In-memory implementation using Map with thread-based indexing
packages/task-graph/src/checkpoint/TabularCheckpointSaver.ts Tabular storage implementation with compression support for various database backends
packages/task-graph/src/checkpoint/index.ts Exports all checkpoint-related types and classes
packages/task-graph/src/common.ts Exports checkpoint module from package entry point
packages/task-graph/src/task-graph/TaskGraph.ts Extends TaskGraphRunConfig with checkpoint configuration options
packages/task-graph/src/task-graph/TaskGraphRunner.ts Implements captureCheckpoint and restoreFromCheckpoint methods, integrates checkpoint hooks into execution flow
packages/task-graph/src/task-graph/TaskGraphEvents.ts Adds checkpoint event to task graph event listeners
packages/task-graph/src/task/ITask.ts Extends IExecuteContext and IRunConfig with checkpoint-related fields
packages/task-graph/src/task/TaskRunner.ts Propagates checkpointSaver and threadId to task execution context
packages/task-graph/src/task/WhileTask.ts Captures iteration checkpoints after each subgraph execution
packages/task-graph/src/task/WhileTaskRunner.ts Passes checkpoint configuration to subgraph runs
packages/task-graph/src/task/IteratorTaskRunner.ts Captures iteration checkpoints in parallel/sequential iteration execution
packages/task-graph/src/task/GraphAsTaskRunner.ts Propagates checkpoint configuration to nested subgraphs
packages/test/src/test/task/Checkpoint.test.ts Comprehensive test suite for checkpoint save/retrieve, graph execution with checkpoints, and resume scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

this.checkpointGranularity === "every-task" &&
(task.status === TaskStatus.COMPLETED || task.status === TaskStatus.FAILED)
) {
await this.captureCheckpoint(task.config.id);
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint capture failures inside the finally block will cause unhandled promise rejections that could interfere with task completion tracking. Consider wrapping the captureCheckpoint call in a try-catch block to ensure the scheduler notification (onTaskCompleted) always executes, or emit a warning/error event if checkpoint capture fails without interrupting execution flow.

Suggested change
await this.captureCheckpoint(task.config.id);
try {
await this.captureCheckpoint(task.config.id);
} catch (checkpointError) {
// Do not interrupt task completion tracking if checkpoint capture fails
// eslint-disable-next-line no-console
console.error(
"Failed to capture checkpoint for task",
task.config.id,
checkpointError,
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +391 to +394
await this.subGraph.runner.captureCheckpoint(this.config.id, {
iterationIndex: this._currentIteration,
iterationParentTaskId: this.config.id,
});
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The captureCheckpoint call in iteration checkpointing could fail and throw an error, interrupting the loop execution. Since checkpointing is meant to be a supplementary feature (for debugging/recovery), the loop should continue even if checkpoint capture fails. Wrap this in a try-catch block to log the error without breaking iteration.

Suggested change
await this.subGraph.runner.captureCheckpoint(this.config.id, {
iterationIndex: this._currentIteration,
iterationParentTaskId: this.config.id,
});
try {
await this.subGraph.runner.captureCheckpoint(this.config.id, {
iterationIndex: this._currentIteration,
iterationParentTaskId: this.config.id,
});
} catch (error) {
// Checkpointing is best-effort; log the error but do not interrupt the loop.
console.error(
"Failed to capture while-task iteration checkpoint",
{
taskId: this.config.id,
iterationIndex: this._currentIteration,
error,
}
);
}

Copilot uses AI. Check for mistakes.
const resumeCheckpoint = history.find((cp) =>
cp.taskStates.some((ts) => ts.taskId === "task-1" && ts.status === "COMPLETED")
);
expect(resumeCheckpoint).toBeDefined();
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only verifies that a checkpoint exists after a failed run, but doesn't test the actual resume-from-checkpoint behavior after failure. The test should be completed to actually resume from the checkpoint with a fixed version of task-2 (or with the failure bypassed) to verify that the checkpoint restore mechanism works correctly and task-1 is not re-executed.

Suggested change
expect(resumeCheckpoint).toBeDefined();
expect(resumeCheckpoint).toBeDefined();
// Now resume from this checkpoint with a non-failing version of task-2
const resumeGraph = new TaskGraph();
const resumedTask1 = new NumberTask({ input: 42 }, { id: "task-1" });
const resumedTask2 = new TestSimpleTask({}, { id: "task-2" });
resumeGraph.addTask(resumedTask1);
resumeGraph.addTask(resumedTask2);
resumeGraph.addDataflow(new Dataflow("task-1", "output", "task-2", "in"));
// Use the checkpoint captured before the failure to resume execution.
await resumeGraph.runFromCheckpoint(resumeCheckpoint!, {
checkpointSaver: saver,
threadId: "fail-thread-resumed",
});
// Verify that the resumed run completed successfully and task-2 ran to completion.
const resumedLatest = await saver.getLatestCheckpoint("fail-thread-resumed");
expect(resumedLatest).toBeDefined();
const resumedTask2State = resumedLatest!.taskStates.find(
(ts) => ts.taskId === "task-2"
);
expect(resumedTask2State).toBeDefined();
expect(resumedTask2State!.status).toBe("COMPLETED");

Copilot uses AI. Check for mistakes.
inputData: { ...task.runInputData },
outputData: { ...task.runOutputData },
progress: task.progress,
error: task.error?.message,
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the error message is captured in the checkpoint (task.error?.message), losing important error context like stack traces, error codes, or custom error properties. Consider capturing the full error object or at least additional properties like error.name, error.stack, or serialized custom error data to enable better debugging when resuming from checkpoints.

Copilot uses AI. Check for mistakes.
Comment on lines +733 to +741
/**
* Captures a checkpoint of the current graph state.
* @param triggerTaskId The ID of the task that triggered this checkpoint
* @param metadata Additional metadata for iteration checkpoints
*/
public async captureCheckpoint(
triggerTaskId?: unknown,
metadata?: { iterationIndex?: number; iterationParentTaskId?: unknown }
): Promise<CheckpointData | undefined> {
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The captureCheckpoint method is marked as public, suggesting it's part of the API, but it's not clear when users should call this manually versus letting the runner call it automatically. Add documentation explaining that this is typically called internally by the runner but can be called manually for custom checkpoint scenarios, and clarify the implications of calling it during execution.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +461
/**
* @license
* Copyright 2025 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

import { CheckpointData, Dataflow, InMemoryCheckpointSaver, TaskGraph } from "@workglow/task-graph";
import { beforeEach, describe, expect, it } from "vitest";
import { FailingTask, NumberTask, TestSimpleTask } from "./TestTasks";

describe("Checkpoint", () => {
let saver: InMemoryCheckpointSaver;

beforeEach(() => {
saver = new InMemoryCheckpointSaver();
});

describe("InMemoryCheckpointSaver", () => {
it("should save and retrieve a checkpoint", async () => {
const data: CheckpointData = {
checkpointId: "cp-1",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: new Date().toISOString() },
};

await saver.saveCheckpoint(data);
const retrieved = await saver.getCheckpoint("cp-1");
expect(retrieved).toEqual(data);
});

it("should return undefined for non-existent checkpoint", async () => {
const retrieved = await saver.getCheckpoint("non-existent");
expect(retrieved).toBeUndefined();
});

it("should get latest checkpoint for thread", async () => {
const data1: CheckpointData = {
checkpointId: "cp-1",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:00:00Z" },
};
const data2: CheckpointData = {
checkpointId: "cp-2",
threadId: "thread-1",
parentCheckpointId: "cp-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:01:00Z" },
};

await saver.saveCheckpoint(data1);
await saver.saveCheckpoint(data2);

const latest = await saver.getLatestCheckpoint("thread-1");
expect(latest?.checkpointId).toBe("cp-2");
});

it("should get checkpoint history for thread", async () => {
await saver.saveCheckpoint({
checkpointId: "cp-1",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:00:00Z" },
});
await saver.saveCheckpoint({
checkpointId: "cp-2",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:01:00Z" },
});
await saver.saveCheckpoint({
checkpointId: "cp-3",
threadId: "thread-2",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:02:00Z" },
});

const history = await saver.getCheckpointHistory("thread-1");
expect(history).toHaveLength(2);
});

it("should get checkpoints for iteration", async () => {
await saver.saveCheckpoint({
checkpointId: "cp-1",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: {
createdAt: "2025-01-01T00:00:00Z",
iterationParentTaskId: "while-1",
iterationIndex: 0,
},
});
await saver.saveCheckpoint({
checkpointId: "cp-2",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: {
createdAt: "2025-01-01T00:01:00Z",
iterationParentTaskId: "while-1",
iterationIndex: 1,
},
});
await saver.saveCheckpoint({
checkpointId: "cp-3",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: {
createdAt: "2025-01-01T00:02:00Z",
triggerTaskId: "other-task",
},
});

const iterCheckpoints = await saver.getCheckpointsForIteration("thread-1", "while-1");
expect(iterCheckpoints).toHaveLength(2);
});

it("should delete checkpoints for thread", async () => {
await saver.saveCheckpoint({
checkpointId: "cp-1",
threadId: "thread-1",
graphJson: { tasks: [], dataflows: [] },
taskStates: [],
dataflowStates: [],
metadata: { createdAt: "2025-01-01T00:00:00Z" },
});

await saver.deleteCheckpoints("thread-1");

const history = await saver.getCheckpointHistory("thread-1");
expect(history).toHaveLength(0);
});
});

describe("Checkpoint save during graph execution", () => {
it("should capture checkpoints after each task completion", async () => {
const graph = new TaskGraph();
const task1 = new TestSimpleTask({ input: "hello" }, { id: "task-1" });
const task2 = new TestSimpleTask({ input: "world" }, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

const checkpoints: CheckpointData[] = [];
graph.on("checkpoint", (data) => {
checkpoints.push(data);
});

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "test-thread",
checkpointGranularity: "every-task",
}
);

// Should have checkpoints for each task completion
expect(checkpoints.length).toBeGreaterThanOrEqual(1);

// Verify checkpoint data structure
const lastCheckpoint = checkpoints[checkpoints.length - 1];
expect(lastCheckpoint.threadId).toBe("test-thread");
expect(lastCheckpoint.taskStates.length).toBe(2);
expect(lastCheckpoint.dataflowStates.length).toBe(1);
});

it("should not capture checkpoints when granularity is none", async () => {
const graph = new TaskGraph();
const task1 = new TestSimpleTask({ input: "hello" }, { id: "task-1" });

graph.addTask(task1);

const checkpoints: CheckpointData[] = [];
graph.on("checkpoint", (data) => {
checkpoints.push(data);
});

await graph.run(
{},
{
checkpointSaver: saver,
checkpointGranularity: "none",
}
);

expect(checkpoints).toHaveLength(0);
});

it("should capture single checkpoint for top-level-only granularity", async () => {
const graph = new TaskGraph();
const task1 = new TestSimpleTask({ input: "hello" }, { id: "task-1" });
const task2 = new TestSimpleTask({ input: "world" }, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

const checkpoints: CheckpointData[] = [];
graph.on("checkpoint", (data) => {
checkpoints.push(data);
});

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "test-thread",
checkpointGranularity: "top-level-only",
}
);

// Should have exactly one checkpoint at the end
expect(checkpoints).toHaveLength(1);

// All tasks should be completed in the checkpoint
const cp = checkpoints[0];
expect(cp.taskStates.every((ts) => ts.status === "COMPLETED")).toBe(true);
});

it("should persist checkpoints in the saver", async () => {
const graph = new TaskGraph();
const task1 = new NumberTask({ input: 42 }, { id: "task-1" });

graph.addTask(task1);

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "persist-thread",
}
);

const history = await saver.getCheckpointHistory("persist-thread");
expect(history.length).toBeGreaterThanOrEqual(1);

const latest = await saver.getLatestCheckpoint("persist-thread");
expect(latest).toBeDefined();
expect(latest!.threadId).toBe("persist-thread");
});

it("should chain parent checkpoint IDs", async () => {
const graph = new TaskGraph();
const task1 = new TestSimpleTask({ input: "a" }, { id: "task-1" });
const task2 = new TestSimpleTask({ input: "b" }, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "chain-thread",
}
);

const history = await saver.getCheckpointHistory("chain-thread");
if (history.length >= 2) {
expect(history[1].parentCheckpointId).toBe(history[0].checkpointId);
}
});
});

describe("Resume from checkpoint", () => {
it("should resume from a checkpoint, skipping completed tasks", async () => {
// First run: execute a graph and save checkpoints
const graph = new TaskGraph();
const task1 = new TestSimpleTask({ input: "first" }, { id: "task-1" });
const task2 = new TestSimpleTask({ input: "second" }, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "resume-thread",
}
);

// Get the checkpoint after task-1 completed (first checkpoint)
const history = await saver.getCheckpointHistory("resume-thread");
expect(history.length).toBeGreaterThanOrEqual(1);

// Now create a new graph with the same structure and resume
const graph2 = new TaskGraph();
const task1b = new TestSimpleTask({ input: "first" }, { id: "task-1" });
const task2b = new TestSimpleTask({ input: "second" }, { id: "task-2" });

graph2.addTask(task1b);
graph2.addTask(task2b);
graph2.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

// Resume from the last checkpoint (all tasks completed)
const lastCheckpoint = history[history.length - 1];
const results = await graph2.run(
{},
{
checkpointSaver: saver,
threadId: "resume-thread-2",
resumeFromCheckpoint: lastCheckpoint.checkpointId,
}
);

// Should complete successfully
expect(results.length).toBeGreaterThanOrEqual(0);
});

it("should re-run failed tasks when resuming from checkpoint before failure", async () => {
// Create a graph where task-2 fails
const graph = new TaskGraph();
const task1 = new NumberTask({ input: 42 }, { id: "task-1" });
const task2 = new FailingTask({}, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "in"));

try {
await graph.run(
{},
{
checkpointSaver: saver,
threadId: "fail-thread",
}
);
} catch {
// Expected failure
}

// Should have captured at least a checkpoint after task-1
const history = await saver.getCheckpointHistory("fail-thread");
expect(history.length).toBeGreaterThanOrEqual(1);

// Find the checkpoint where task-1 is completed but task-2 hasn't run yet
const resumeCheckpoint = history.find((cp) =>
cp.taskStates.some((ts) => ts.taskId === "task-1" && ts.status === "COMPLETED")
);
expect(resumeCheckpoint).toBeDefined();
});
});

describe("Checkpoint data correctness", () => {
it("should capture task input and output data", async () => {
const graph = new TaskGraph();
const task = new NumberTask({ input: 42 }, { id: "task-1" });

graph.addTask(task);

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "data-thread",
}
);

const latest = await saver.getLatestCheckpoint("data-thread");
expect(latest).toBeDefined();

const taskState = latest!.taskStates.find((ts) => ts.taskId === "task-1");
expect(taskState).toBeDefined();
expect(taskState!.status).toBe("COMPLETED");
expect(taskState!.outputData).toBeDefined();
expect(taskState!.outputData.output).toBe(42);
});

it("should capture dataflow state", async () => {
const graph = new TaskGraph();
const task1 = new NumberTask({ input: 10 }, { id: "task-1" });
const task2 = new NumberTask({}, { id: "task-2" });

graph.addTask(task1);
graph.addTask(task2);
graph.addDataflow(new Dataflow("task-1", "output", "task-2", "input"));

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "df-thread",
}
);

const latest = await saver.getLatestCheckpoint("df-thread");
expect(latest).toBeDefined();
expect(latest!.dataflowStates.length).toBe(1);

const dfState = latest!.dataflowStates[0];
expect(dfState.sourceTaskId).toBe("task-1");
expect(dfState.targetTaskId).toBe("task-2");
expect(dfState.status).toBe("COMPLETED");
});

it("should include graph JSON in checkpoint", async () => {
const graph = new TaskGraph();
const task = new TestSimpleTask({ input: "test" }, { id: "task-1" });
graph.addTask(task);

await graph.run(
{},
{
checkpointSaver: saver,
threadId: "json-thread",
}
);

const latest = await saver.getLatestCheckpoint("json-thread");
expect(latest).toBeDefined();
expect(latest!.graphJson).toBeDefined();
expect(latest!.graphJson.tasks.length).toBe(1);
});

it("should auto-generate threadId when not provided", async () => {
const graph = new TaskGraph();
const task = new TestSimpleTask({ input: "test" }, { id: "task-1" });
graph.addTask(task);

const checkpoints: CheckpointData[] = [];
graph.on("checkpoint", (data) => {
checkpoints.push(data);
});

await graph.run(
{},
{
checkpointSaver: saver,
}
);

expect(checkpoints.length).toBeGreaterThanOrEqual(1);
// Thread ID should be auto-generated (non-empty UUID)
expect(checkpoints[0].threadId).toBeTruthy();
expect(checkpoints[0].threadId.length).toBeGreaterThan(0);
});
});
});
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests exist for TabularCheckpointSaver. While the InMemoryCheckpointSaver is tested, the TabularCheckpointSaver has additional complexity (compression/decompression, database operations, type conversions) that should be tested separately. Add tests that verify TabularCheckpointSaver can save/retrieve checkpoints correctly, handle compression, and work with different storage backends.

Copilot uses AI. Check for mistakes.
Comment on lines +199 to +204
// Capture iteration checkpoint if checkpoint saver is available
if (this.checkpointSaver && this.threadId && iterationIndex !== undefined) {
await this.task.subGraph.runner.captureCheckpoint(this.task.config.id, {
iterationIndex,
iterationParentTaskId: this.task.config.id,
});
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to WhileTask, the iteration checkpoint capture could fail and interrupt parallel iteration execution. Wrap the checkpoint capture in a try-catch block to ensure that checkpoint failures don't break the iteration processing, especially since multiple iterations may be running in parallel.

Suggested change
// Capture iteration checkpoint if checkpoint saver is available
if (this.checkpointSaver && this.threadId && iterationIndex !== undefined) {
await this.task.subGraph.runner.captureCheckpoint(this.task.config.id, {
iterationIndex,
iterationParentTaskId: this.task.config.id,
});
// Capture iteration checkpoint if checkpoint saver is available.
// This is best-effort: failures here should not break iteration processing.
if (this.checkpointSaver && this.threadId && iterationIndex !== undefined) {
try {
await this.task.subGraph.runner.captureCheckpoint(this.task.config.id, {
iterationIndex,
iterationParentTaskId: this.task.config.id,
});
} catch {
// Swallow checkpoint errors to avoid interrupting iteration execution.
// Optionally, logging could be added here if desired.
}

Copilot uses AI. Check for mistakes.
Comment on lines +172 to +176
// Capture checkpoint after task completion
if (
this.checkpointSaver &&
this.checkpointGranularity === "every-task" &&
(task.status === TaskStatus.COMPLETED || task.status === TaskStatus.FAILED)
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoints are captured for both COMPLETED and FAILED tasks, which means failed task states are saved. However, when resuming from a checkpoint, failed tasks are left in their original state without being reset. This could cause issues if the failed task has stale error information. Consider whether checkpoints should be captured for FAILED tasks, or ensure that failed tasks are properly reset when resuming.

Suggested change
// Capture checkpoint after task completion
if (
this.checkpointSaver &&
this.checkpointGranularity === "every-task" &&
(task.status === TaskStatus.COMPLETED || task.status === TaskStatus.FAILED)
// Capture checkpoint after successful task completion
if (
this.checkpointSaver &&
this.checkpointGranularity === "every-task" &&
task.status === TaskStatus.COMPLETED

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +22
private threadIndex: Map<ThreadId, CheckpointId[]> = new Map();

async saveCheckpoint(data: CheckpointData): Promise<void> {
this.checkpoints.set(data.checkpointId, data);

const threadCheckpoints = this.threadIndex.get(data.threadId) ?? [];
threadCheckpoints.push(data.checkpointId);
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InMemoryCheckpointSaver stores all checkpoints indefinitely in memory without any cleanup mechanism. For long-running applications or workflows with many iterations, this could lead to memory leaks. Consider adding a pruning mechanism (e.g., keeping only the last N checkpoints per thread) or documenting that users should periodically call deleteCheckpoints to prevent unbounded memory growth.

Suggested change
private threadIndex: Map<ThreadId, CheckpointId[]> = new Map();
async saveCheckpoint(data: CheckpointData): Promise<void> {
this.checkpoints.set(data.checkpointId, data);
const threadCheckpoints = this.threadIndex.get(data.threadId) ?? [];
threadCheckpoints.push(data.checkpointId);
private threadIndex: Map<ThreadId, CheckpointId[]> = new Map();
private readonly maxCheckpointsPerThread: number;
constructor(maxCheckpointsPerThread: number = 1000) {
super();
this.maxCheckpointsPerThread = maxCheckpointsPerThread;
}
async saveCheckpoint(data: CheckpointData): Promise<void> {
this.checkpoints.set(data.checkpointId, data);
const threadCheckpoints = this.threadIndex.get(data.threadId) ?? [];
threadCheckpoints.push(data.checkpointId);
if (threadCheckpoints.length > this.maxCheckpointsPerThread) {
const excess = threadCheckpoints.length - this.maxCheckpointsPerThread;
const removedIds = threadCheckpoints.splice(0, excess);
for (const id of removedIds) {
this.checkpoints.delete(id);
}
}

Copilot uses AI. Check for mistakes.
const results = await this.subGraph.run<Output>(iterationInput, {
parentSignal: context.signal,
checkpointSaver: context.checkpointSaver,
threadId: context.threadId,
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a subgraph runs (e.g., in WhileTask, IteratorTaskRunner, or GraphAsTaskRunner), it receives the same checkpointGranularity setting as the parent. This means subgraphs could capture their own "top-level-only" checkpoint at completion, which may not be desired since the iteration checkpoint is captured separately. Consider whether subgraphs should inherit the granularity setting or if iteration-based tasks should override it to "none" for their subgraphs to avoid duplicate checkpoints.

Suggested change
threadId: context.threadId,
threadId: context.threadId,
// Disable subgraph top-level checkpoints; iteration checkpoints are handled separately.
checkpointGranularity: "none",

Copilot uses AI. Check for mistakes.
@sroussey
Copy link
Collaborator Author

@copilot open a new pull request to apply changes based on the comments in this thread

Copy link
Contributor

Copilot AI commented Feb 20, 2026

@sroussey I've opened a new pull request, #216, to work on those changes. Once the pull request is ready, I'll request review from you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants