Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/api/v2/ai/agent/sessions/[sessionId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ import AgentSessionService from 'server/services/agentSession';
* properties:
* stage:
* type: string
* enum: [create_session, connect_runtime]
* enum: [create_session, connect_runtime, attach_services]
* title:
* type: string
* message:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ function isRequestedSessionServiceRef(value: unknown): value is RequestedAgentSe
* properties:
* stage:
* type: string
* enum: [create_session, connect_runtime]
* enum: [create_session, connect_runtime, attach_services]
* title:
* type: string
* message:
Expand Down
4 changes: 2 additions & 2 deletions src/app/api/v2/ai/agent/sessions/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async function resolveRequestedServices(
* properties:
* stage:
* type: string
* enum: [create_session, connect_runtime]
* enum: [create_session, connect_runtime, attach_services]
* title:
* type: string
* message:
Expand Down Expand Up @@ -520,7 +520,7 @@ async function resolveRequestedServices(
* properties:
* stage:
* type: string
* enum: [create_session, connect_runtime]
* enum: [create_session, connect_runtime, attach_services]
* title:
* type: string
* message:
Expand Down
69 changes: 69 additions & 0 deletions src/server/lib/agentSession/__tests__/podFactory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ jest.mock('server/lib/logger', () => ({

import {
buildSessionWorkspacePodSpec,
createSessionWorkspacePodWithoutWaiting,
createSessionWorkspacePod,
deleteSessionWorkspacePod,
SessionWorkspacePodOptions,
SESSION_WORKSPACE_GATEWAY_PORT_NAME,
waitForSessionWorkspacePodReady,
waitForSessionWorkspacePodScheduled,
} from '../podFactory';

const baseOpts: SessionWorkspacePodOptions = {
Expand Down Expand Up @@ -859,6 +862,15 @@ describe('podFactory', () => {
});

describe('createSessionWorkspacePod', () => {
it('creates pod without waiting when requested explicitly', async () => {
mockCreatePod.mockResolvedValue({ body: { metadata: { name: 'agent-abc123' } } });

await createSessionWorkspacePodWithoutWaiting(baseOpts);

expect(mockCreatePod).toHaveBeenCalledTimes(1);
expect(mockReadPod).not.toHaveBeenCalled();
});

it('creates pod via K8s API', async () => {
mockCreatePod.mockResolvedValue({ body: { metadata: { name: 'agent-abc123' } } });

Expand Down Expand Up @@ -1001,6 +1013,63 @@ describe('podFactory', () => {
})
).rejects.toThrow('Session workspace pod did not become ready within 1ms');
});

it('returns once the pod is scheduled even before readiness succeeds', async () => {
mockReadPod
.mockResolvedValueOnce({
body: {
spec: {
nodeName: 'worker-a',
},
status: {
phase: 'Pending',
},
},
})
.mockResolvedValue({
body: {
status: {
phase: 'Running',
conditions: [{ type: 'Ready', status: 'True' }],
},
},
});

await expect(waitForSessionWorkspacePodScheduled('test-ns', 'agent-abc123')).resolves.toEqual(
expect.objectContaining({
spec: expect.objectContaining({
nodeName: 'worker-a',
}),
})
);
});

it('times out when the pod never gets a node assignment', async () => {
mockReadPod.mockResolvedValue({
body: {
status: {
phase: 'Pending',
},
},
});

await expect(
waitForSessionWorkspacePodScheduled('test-ns', 'agent-abc123', {
timeoutMs: 1,
pollMs: 0,
})
).rejects.toThrow('Session workspace pod was not scheduled within 1ms');
});

it('keeps the ready wait available as a standalone helper', async () => {
await expect(waitForSessionWorkspacePodReady('test-ns', 'agent-abc123')).resolves.toEqual(
expect.objectContaining({
status: expect.objectContaining({
conditions: [{ type: 'Ready', status: 'True' }],
}),
})
);
});
});

describe('deleteSessionWorkspacePod', () => {
Expand Down
153 changes: 107 additions & 46 deletions src/server/lib/agentSession/podFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -758,62 +758,85 @@ function summarizeLogLine(logs: string | null): string | null {
return firstLine || null;
}

async function waitForSessionWorkspacePodReady(
function resolveSessionWorkspacePodWaitConfig(readiness?: SessionWorkspacePodOptions['readiness']): {
timeoutMs: number;
pollMs: number;
} {
return {
timeoutMs:
normalizeNonNegativeInteger(readiness?.timeoutMs) ??
normalizeNonNegativeInteger(process.env.AGENT_SESSION_WORKSPACE_READY_TIMEOUT_MS) ??
60000,
pollMs:
normalizeNonNegativeInteger(readiness?.pollMs) ??
normalizeNonNegativeInteger(process.env.AGENT_SESSION_WORKSPACE_READY_POLL_MS) ??
1000,
};
}

function getFailingPodStatusContainerName(pod: k8s.V1Pod): string | null {
return (
[...(pod.status?.initContainerStatuses || []), ...(pod.status?.containerStatuses || [])].find((status) => {
const waiting = status.state?.waiting;
if (
waiting?.reason &&
[
'ErrImagePull',
'ImagePullBackOff',
'CrashLoopBackOff',
'CreateContainerConfigError',
'RunContainerError',
].includes(waiting.reason)
) {
return true;
}

const terminated = status.state?.terminated;
return !!(terminated?.reason && terminated.exitCode !== 0);
})?.name || null
);
}

async function throwIfSessionWorkspacePodFailedToStart(
coreApi: k8s.CoreV1Api,
namespace: string,
podName: string,
pod: k8s.V1Pod
): Promise<void> {
const failure = getPodStartupFailure(pod);
if (!failure) {
return;
}

const failingContainer = getFailingPodStatusContainerName(pod);
const containerLogs = failingContainer ? await getContainerLogs(coreApi, namespace, podName, failingContainer) : null;

if (containerLogs) {
getLogger().error(
{ namespace, podName, containerName: failingContainer, logs: containerLogs },
`Session: startup logs captured containerName=${failingContainer} namespace=${namespace} podName=${podName}`
);
}

const logSummary = summarizeLogLine(containerLogs);
throw new Error(`Session workspace pod failed to start: ${failure}${logSummary ? ` - ${logSummary}` : ''}`);
}

async function waitForSessionWorkspacePodReadyInternal(
coreApi: k8s.CoreV1Api,
namespace: string,
podName: string,
readiness?: SessionWorkspacePodOptions['readiness']
): Promise<k8s.V1Pod> {
const readyTimeoutMs =
normalizeNonNegativeInteger(readiness?.timeoutMs) ??
normalizeNonNegativeInteger(process.env.AGENT_SESSION_WORKSPACE_READY_TIMEOUT_MS) ??
60000;
const readyPollMs =
normalizeNonNegativeInteger(readiness?.pollMs) ??
normalizeNonNegativeInteger(process.env.AGENT_SESSION_WORKSPACE_READY_POLL_MS) ??
1000;
const { timeoutMs: readyTimeoutMs, pollMs: readyPollMs } = resolveSessionWorkspacePodWaitConfig(readiness);
const deadline = Date.now() + readyTimeoutMs;
let lastObservedState = 'pending';
let lastPod: k8s.V1Pod | null = null;

while (Date.now() < deadline) {
const { body: pod } = await coreApi.readNamespacedPod(podName, namespace);
lastPod = pod;
const failure = getPodStartupFailure(pod);
if (failure) {
const failingContainer =
[...(pod.status?.initContainerStatuses || []), ...(pod.status?.containerStatuses || [])].find((status) => {
const waiting = status.state?.waiting;
if (
waiting?.reason &&
[
'ErrImagePull',
'ImagePullBackOff',
'CrashLoopBackOff',
'CreateContainerConfigError',
'RunContainerError',
].includes(waiting.reason)
) {
return true;
}

const terminated = status.state?.terminated;
return !!(terminated?.reason && terminated.exitCode !== 0);
})?.name || null;

const containerLogs = failingContainer
? await getContainerLogs(coreApi, namespace, podName, failingContainer)
: null;
if (containerLogs) {
getLogger().error(
{ namespace, podName, containerName: failingContainer, logs: containerLogs },
`Session: startup logs captured containerName=${failingContainer} namespace=${namespace} podName=${podName}`
);
}

const logSummary = summarizeLogLine(containerLogs);
throw new Error(`Session workspace pod failed to start: ${failure}${logSummary ? ` - ${logSummary}` : ''}`);
}
await throwIfSessionWorkspacePodFailedToStart(coreApi, namespace, podName, pod);

if (isPodReady(pod)) {
return pod;
Expand Down Expand Up @@ -842,7 +865,7 @@ async function waitForSessionWorkspacePodReady(
);
}

export async function createSessionWorkspacePod(opts: SessionWorkspacePodOptions): Promise<k8s.V1Pod> {
export async function createSessionWorkspacePodWithoutWaiting(opts: SessionWorkspacePodOptions): Promise<void> {
const logger = getLogger();
const coreApi = getCoreApi();
const pod = buildSessionWorkspacePodSpec(opts);
Expand All @@ -864,8 +887,46 @@ export async function createSessionWorkspacePod(opts: SessionWorkspacePodOptions

throw error;
}
}

export async function waitForSessionWorkspacePodScheduled(
namespace: string,
podName: string,
readiness?: SessionWorkspacePodOptions['readiness']
): Promise<k8s.V1Pod> {
const coreApi = getCoreApi();
const { timeoutMs, pollMs } = resolveSessionWorkspacePodWaitConfig(readiness);
const deadline = Date.now() + timeoutMs;
let lastObservedState = 'pending';

while (Date.now() < deadline) {
const { body: pod } = await coreApi.readNamespacedPod(podName, namespace);
await throwIfSessionWorkspacePodFailedToStart(coreApi, namespace, podName, pod);

if (pod.spec?.nodeName) {
return pod;
}

lastObservedState = summarizePodState(pod);
await sleep(pollMs);
}

throw new Error(`Session workspace pod was not scheduled within ${timeoutMs}ms: ${lastObservedState}`);
}

export async function waitForSessionWorkspacePodReady(
namespace: string,
podName: string,
readiness?: SessionWorkspacePodOptions['readiness']
): Promise<k8s.V1Pod> {
return waitForSessionWorkspacePodReadyInternal(getCoreApi(), namespace, podName, readiness);
}

export async function createSessionWorkspacePod(opts: SessionWorkspacePodOptions): Promise<k8s.V1Pod> {
const logger = getLogger();
await createSessionWorkspacePodWithoutWaiting(opts);

const result = await waitForSessionWorkspacePodReady(coreApi, opts.namespace, opts.podName, opts.readiness);
const result = await waitForSessionWorkspacePodReady(opts.namespace, opts.podName, opts.readiness);
logger.info(`Session: workspace pod ready podName=${opts.podName} namespace=${opts.namespace}`);
return result;
}
Expand Down
9 changes: 7 additions & 2 deletions src/server/lib/agentSession/startupFailureState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const AGENT_SESSION_STARTUP_FAILURE_REDIS_PREFIX = 'lifecycle:agent:session:star
const AGENT_SESSION_STARTUP_FAILURE_TTL_SECONDS = 60 * 60;
const AGENT_SESSION_STARTUP_FAILURE_MESSAGE_MAX_LENGTH = 4000;

export type AgentSessionStartupFailureStage = 'create_session' | 'connect_runtime';
export type AgentSessionStartupFailureStage = 'create_session' | 'connect_runtime' | 'attach_services';

export interface AgentSessionStartupFailureState {
sessionId: string;
Expand Down Expand Up @@ -112,7 +112,12 @@ function classifyFailure(
}

return {
title: stage === 'create_session' ? 'Agent session failed to start' : 'Session workspace connection failed',
title:
stage === 'create_session'
? 'Agent session failed to start'
: stage === 'attach_services'
? 'Attached services failed to start'
: 'Session workspace connection failed',
message,
};
}
Expand Down
Loading
Loading