-
Notifications
You must be signed in to change notification settings - Fork 10
llm call: polling llm db endpoint #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
edeb059
2861753
cb51ef7
20f96cb
05e9f57
5372913
b341f83
572e422
7c17313
82c1d81
8936ea5
a7e5b81
1cf2103
aeb9d31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| """add project id to job table | ||
|
|
||
| Revision ID: 050 | ||
| Revises: 049 | ||
| Create Date: 2026-04-07 14:23:00.938901 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| import sqlmodel.sql.sqltypes | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "050" | ||
| down_revision = "049" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
| chain_status_enum = postgresql.ENUM( | ||
| "PENDING", | ||
| "RUNNING", | ||
| "FAILED", | ||
| "COMPLETED", | ||
| name="chainstatus", | ||
| create_type=False, | ||
| ) | ||
|
|
||
|
|
||
| def upgrade(): | ||
| chain_status_enum.create(op.get_bind()) | ||
| op.add_column( | ||
| "job", | ||
| sa.Column( | ||
| "project_id", | ||
| sa.Integer(), | ||
| nullable=True, | ||
| comment="Project ID of the job's project", | ||
| ), | ||
| ) | ||
| op.alter_column( | ||
| "llm_call", | ||
| "chain_id", | ||
| existing_type=sa.UUID(), | ||
| comment="Reference to the parent chain (NULL for standalone llm_call requests)", | ||
| existing_comment="Reference to the parent chain (NULL for standalone /llm/call requests)", | ||
| existing_nullable=True, | ||
| ) | ||
| op.alter_column( | ||
| "llm_call", | ||
| "input_type", | ||
| existing_type=sa.VARCHAR(), | ||
| comment="Input type: text, audio, image, pdf, multimodal", | ||
| existing_comment="Input type: text, audio, image", | ||
| existing_nullable=False, | ||
| ) | ||
| op.execute("ALTER TABLE llm_chain ALTER COLUMN status DROP DEFAULT") | ||
| op.alter_column( | ||
| "llm_chain", | ||
| "status", | ||
| existing_type=sa.VARCHAR(), | ||
| type_=chain_status_enum, | ||
| existing_comment="Chain execution status (pending, running, failed, completed)", | ||
| existing_nullable=False, | ||
| postgresql_using="UPPER(status)::chainstatus", | ||
| ) | ||
| op.execute( | ||
| "ALTER TABLE llm_chain ALTER COLUMN status SET DEFAULT 'PENDING'::chainstatus" | ||
| ) | ||
| op.alter_column( | ||
| "llm_chain", | ||
| "error", | ||
| existing_type=sa.TEXT(), | ||
| type_=sqlmodel.sql.sqltypes.AutoString(), | ||
| existing_comment="Error message if the chain execution failed", | ||
| existing_nullable=True, | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| op.alter_column( | ||
| "llm_chain", | ||
| "error", | ||
| existing_type=sqlmodel.sql.sqltypes.AutoString(), | ||
| type_=sa.TEXT(), | ||
| existing_comment="Error message if the chain execution failed", | ||
| existing_nullable=True, | ||
| ) | ||
| op.execute("ALTER TABLE llm_chain ALTER COLUMN status DROP DEFAULT") | ||
| op.alter_column( | ||
| "llm_chain", | ||
| "status", | ||
| existing_type=sa.Enum( | ||
| "PENDING", "RUNNING", "FAILED", "COMPLETED", name="chainstatus" | ||
| ), | ||
| type_=sa.VARCHAR(), | ||
| existing_comment="Chain execution status (pending, running, failed, completed)", | ||
| existing_nullable=False, | ||
| ) | ||
| op.execute("ALTER TABLE llm_chain ALTER COLUMN status SET DEFAULT 'pending'") | ||
| op.execute("DROP TYPE IF EXISTS chainstatus") | ||
| op.alter_column( | ||
| "llm_call", | ||
| "input_type", | ||
| existing_type=sa.VARCHAR(), | ||
| comment="Input type: text, audio, image", | ||
| existing_comment="Input type: text, audio, image, pdf, multimodal", | ||
| existing_nullable=False, | ||
| ) | ||
| op.alter_column( | ||
| "llm_call", | ||
| "chain_id", | ||
| existing_type=sa.UUID(), | ||
| comment="Reference to the parent chain (NULL for standalone /llm/call requests)", | ||
| existing_comment="Reference to the parent chain (NULL for standalone llm_call requests)", | ||
| existing_nullable=True, | ||
| ) | ||
| op.drop_column("job", "project_id") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| Retrieve the status and results of an LLM call job by job ID. | ||
|
|
||
| This endpoint allows you to poll for the status and results of an asynchronous LLM call job that was previously initiated via the POST `/llm/call` endpoint. | ||
|
|
||
|
|
||
| ### Notes | ||
|
|
||
| - This endpoint returns both the job status AND the actual LLM response when complete | ||
| - LLM responses are also delivered asynchronously via the callback URL (if provided) | ||
| - Jobs can be queried at any time after creation |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,20 @@ | ||
| import logging | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import APIRouter, Depends | ||
| from fastapi import APIRouter, Depends, HTTPException | ||
|
|
||
| from app.api.deps import AuthContextDep, SessionDep | ||
| from app.api.permissions import Permission, require_permission | ||
| from app.models import LLMCallRequest, LLMCallResponse, Message | ||
| from app.crud.jobs import JobCrud | ||
| from app.crud.llm import get_llm_calls_by_job_id | ||
| from app.models import ( | ||
| LLMCallRequest, | ||
| LLMCallResponse, | ||
| LLMJobImmediatePublic, | ||
| LLMJobPublic, | ||
| JobStatus, | ||
| ) | ||
| from app.models.llm.response import LLMResponse, Usage | ||
| from app.services.llm.jobs import start_job | ||
| from app.utils import APIResponse, validate_callback_url, load_description | ||
|
|
||
|
|
@@ -34,7 +44,7 @@ def llm_callback_notification(body: APIResponse[LLMCallResponse]): | |
| @router.post( | ||
| "/llm/call", | ||
| description=load_description("llm/llm_call.md"), | ||
| response_model=APIResponse[Message], | ||
| response_model=APIResponse[LLMJobImmediatePublic], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we tried running this endpoint manually triggering with Postman or Swagger? |
||
| callbacks=llm_callback_router.routes, | ||
| dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], | ||
| ) | ||
|
|
@@ -43,22 +53,104 @@ def llm_call( | |
| ): | ||
| """ | ||
| Endpoint to initiate an LLM call as a background job. | ||
| Returns job information for polling. | ||
| """ | ||
| project_id = _current_user.project_.id | ||
| organization_id = _current_user.organization_.id | ||
|
|
||
| if request.callback_url: | ||
| validate_callback_url(str(request.callback_url)) | ||
|
|
||
| start_job( | ||
| job_id = start_job( | ||
| db=session, | ||
| request=request, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| ) | ||
|
|
||
| return APIResponse.success_response( | ||
| data=Message( | ||
| message=f"Your response is being generated and will be delivered via callback." | ||
| ), | ||
| # Fetch job details to return immediate response | ||
| job_crud = JobCrud(session=session) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. jobcrud.create() should return the job_id no? I think
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. job create is called inside the start job function, while we will get the job id from that function we also want to show the status, inserted at and updated at of the job as well for wich we would need to fetch the whole thing again |
||
| job = job_crud.get(job_id=job_id, project_id=project_id) | ||
|
|
||
| if not job: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like this line is not required if above is fixed |
||
| raise HTTPException(status_code=404, detail="Job not found") | ||
|
|
||
| if request.callback_url: | ||
| message = "Your response is being generated and will be delivered via callback." | ||
nishika26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| else: | ||
| message = "Your response is being generated" | ||
|
|
||
| job_response = LLMJobImmediatePublic( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the wrapper model is not required
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it not needed, could not get this sorry |
||
| job_id=job.id, | ||
| status=job.status.value, | ||
| message=message, | ||
| job_inserted_at=job.created_at, | ||
| job_updated_at=job.updated_at, | ||
| ) | ||
|
|
||
| return APIResponse.success_response(data=job_response) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/llm/call/{job_id}", | ||
| description=load_description("llm/get_llm_call.md"), | ||
| response_model=APIResponse[LLMJobPublic], | ||
| dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], | ||
| ) | ||
| def get_llm_call_status( | ||
| _current_user: AuthContextDep, | ||
| session: SessionDep, | ||
| job_id: UUID, | ||
| ) -> APIResponse[LLMJobPublic]: | ||
| """ | ||
| Poll for LLM call job status and results. | ||
| Returns job information with nested LLM response when complete. | ||
| """ | ||
|
|
||
| project_id = _current_user.project_.id | ||
|
|
||
| job_crud = JobCrud(session=session) | ||
| job = job_crud.get(job_id=job_id, project_id=project_id) | ||
|
|
||
| if not job: | ||
| raise HTTPException(status_code=404, detail="Job not found") | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| llm_call_response = None | ||
| if job.status.value == JobStatus.SUCCESS: | ||
| llm_calls = get_llm_calls_by_job_id( | ||
| session=session, job_id=job_id, project_id=project_id | ||
| ) | ||
|
|
||
| if llm_calls: | ||
| # Get the first LLM call from the list which will be the only call for the job id | ||
| # since we initially won't be using this endpoint for llm chains | ||
| llm_call = llm_calls[0] | ||
|
|
||
| llm_response = LLMResponse( | ||
| provider_response_id=llm_call.provider_response_id or "", | ||
| conversation_id=llm_call.conversation_id, | ||
| provider=llm_call.provider, | ||
| model=llm_call.model, | ||
| output=llm_call.content, | ||
| ) | ||
|
|
||
| if not llm_call.usage: | ||
| raise HTTPException( | ||
nishika26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| status_code=500, | ||
| detail="Completed LLM job is missing usage data", | ||
| ) | ||
|
|
||
| llm_call_response = LLMCallResponse( | ||
| response=llm_response, | ||
| usage=Usage(**llm_call.usage), | ||
| provider_raw_response=None, | ||
| ) | ||
|
|
||
| job_response = LLMJobPublic( | ||
| job_id=job.id, | ||
| status=job.status.value, | ||
| llm_response=llm_call_response, | ||
| error_message=job.error_message, | ||
| ) | ||
|
|
||
| return APIResponse.success_response(data=job_response) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,11 +12,13 @@ class JobCrud: | |
| def __init__(self, session: Session): | ||
| self.session = session | ||
|
|
||
| def create(self, job_type: JobType, trace_id: str | None = None) -> Job: | ||
| new_job = Job( | ||
| job_type=job_type, | ||
| trace_id=trace_id, | ||
| ) | ||
| def create( | ||
| self, | ||
| job_type: JobType, | ||
| trace_id: str | None = None, | ||
| project_id: int | None = None, | ||
| ) -> Job: | ||
| new_job = Job(job_type=job_type, trace_id=trace_id, project_id=project_id) | ||
| self.session.add(new_job) | ||
| self.session.commit() | ||
| self.session.refresh(new_job) | ||
|
|
@@ -38,5 +40,14 @@ def update(self, job_id: UUID, job_update: JobUpdate) -> Job: | |
|
|
||
| return job | ||
|
|
||
| def get(self, job_id: UUID) -> Job | None: | ||
| return self.session.get(Job, job_id) | ||
| def get(self, job_id: UUID, project_id: int | None = None) -> Job | None: | ||
| job = self.session.get(Job, job_id) | ||
| if job is None: | ||
| return None | ||
| if ( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hard to read this block. Will be bug prone. Simplify |
||
| project_id is not None | ||
| and job.project_id is not None | ||
| and job.project_id != project_id | ||
| ): | ||
| return None | ||
| return job | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,4 +30,7 @@ | |
| AudioOutput, | ||
| LLMChainResponse, | ||
| IntermediateChainResponse, | ||
| LLMJobBasePublic, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these models are not required. There are total 7 fields russian doll-ed amongst the three. Use the strings instead |
||
| LLMJobImmediatePublic, | ||
| LLMJobPublic, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nitpick)
The alterations on llm_chain.error and llm_call.input_type appear to be autogenerate noises, they don’t introduce any effective schema change (TEXT → AutoString resolves to TEXT, and input_type only updates comments).
Recommend removing these to keep the migration scoped only to adding project_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this reason specifically i am including it here, to decrease the noise as much as possible, plus chain status changing to enum was an important one