diff --git a/backend/app/alembic/versions/051_add_columns_to_collection_job_and_documents.py b/backend/app/alembic/versions/051_add_columns_to_collection_job_and_documents.py new file mode 100644 index 000000000..86b0be3b4 --- /dev/null +++ b/backend/app/alembic/versions/051_add_columns_to_collection_job_and_documents.py @@ -0,0 +1,62 @@ +"""add columns to collection job and documents table + +Revision ID: 051 +Revises: 050 +Create Date: 2026-03-25 10:09:47.318575 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "051" +down_revision = "050" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "collection_jobs", + sa.Column( + "docs_num", + sa.Integer(), + nullable=True, + comment="Total number of documents to be processed in this job", + ), + ) + op.add_column( + "collection_jobs", + sa.Column( + "total_size_mb", + sa.Float(), + nullable=True, + comment="Total size of documents being uploaded to collection in MB", + ), + ) + op.add_column( + "collection_jobs", + sa.Column( + "documents", + sa.JSON(), + nullable=True, + comment="List of documents given to make collection", + ), + ) + op.add_column( + "document", + sa.Column( + "file_size_kb", + sa.Float(), + nullable=True, + comment="Size of the document in kilobytes (KB)", + ), + ) + + +def downgrade(): + op.drop_column("document", "file_size_kb") + op.drop_column("collection_jobs", "total_size_mb") + op.drop_column("collection_jobs", "docs_num") + op.drop_column("collection_jobs", "documents") diff --git a/backend/app/api/docs/collections/create.md b/backend/app/api/docs/collections/create.md index cc85ad36a..bc94e7c45 100644 --- a/backend/app/api/docs/collections/create.md +++ b/backend/app/api/docs/collections/create.md @@ -3,9 +3,10 @@ pipeline: * Create a vector store from the document IDs you received after uploading your documents through the Documents module. -* The `batch_size` parameter controls how many documents are sent to OpenAI in a - single transaction when creating the vector store. This helps optimize the upload - process for large document sets. If not specified, the default value is **10**. +* Documents are automatically batched when creating the vector store to optimize + the upload process for large document sets. A new batch is created when either + the cumulative size reaches 30 MB of documents given to upload to a vector store + or the document count reaches 200 files in a batch, whichever limit is hit first. * [Deprecated] Attach the Vector Store to an OpenAI [Assistant](https://platform.openai.com/docs/api-reference/assistants). Use parameters in the request body relevant to an Assistant to flesh out diff --git a/backend/app/api/routes/collection_job.py b/backend/app/api/routes/collection_job.py index c586a3cc9..792814688 100644 --- a/backend/app/api/routes/collection_job.py +++ b/backend/app/api/routes/collection_job.py @@ -17,7 +17,6 @@ CollectionActionType, CollectionJobPublic, ) -from app.models.collection import CollectionPublic from app.utils import APIResponse, load_description from app.services.collections.helpers import extract_error_message, to_collection_public diff --git a/backend/app/api/routes/collections.py b/backend/app/api/routes/collections.py index 558e4d867..28155c9b5 100644 --- a/backend/app/api/routes/collections.py +++ b/backend/app/api/routes/collections.py @@ -95,12 +95,16 @@ def create_collection( if request.name: ensure_unique_name(session, current_user.project_.id, request.name) + unique_documents = list(dict.fromkeys(request.documents)) + collection_job_crud = CollectionJobCrud(session, current_user.project_.id) collection_job = collection_job_crud.create( CollectionJobCreate( action_type=CollectionActionType.CREATE, project_id=current_user.project_.id, status=CollectionJobStatus.PENDING, + docs_num=len(unique_documents), + documents=[str(doc_id) for doc_id in unique_documents], ) ) diff --git a/backend/app/api/routes/documents.py b/backend/app/api/routes/documents.py index 28c6087ab..3c02ee4a6 100644 --- a/backend/app/api/routes/documents.py +++ b/backend/app/api/routes/documents.py @@ -12,6 +12,7 @@ UploadFile, ) from fastapi import Path as FastPath +from fastapi import HTTPException from app.api.deps import AuthContextDep, SessionDep from app.api.permissions import Permission, require_permission @@ -27,8 +28,9 @@ DocTransformationJobPublic, ) from app.core.cloud import get_cloud_storage -from app.services.collections.helpers import pick_service_for_documennt +from app.services.collections.helpers import pick_service_for_documennt, MAX_DOC_SIZE_MB from app.services.documents.helpers import ( + calculate_file_size, schedule_transformation, pre_transform_validation, build_document_schema, @@ -129,6 +131,20 @@ async def upload_doc( transformer=transformer, ) + file_size_kb = calculate_file_size(src) + file_size_mb = file_size_kb / 1024 + + if file_size_mb > MAX_DOC_SIZE_MB: + logger.warning( + f"[upload_doc] Document size exceeds limit | " + f"{{'filename': '{src.filename}', 'size_mb': {round(file_size_mb, 2)}, 'max_size_mb': {MAX_DOC_SIZE_MB}}}" + ) + raise HTTPException( + status_code=413, + detail=f"Document size ({round(file_size_mb, 2)} MB) exceeds the maximum allowed size of {MAX_DOC_SIZE_MB} MB. " + f"Please upload a smaller file.", + ) + storage = get_cloud_storage(session=session, project_id=current_user.project_.id) document_id = uuid4() object_store_url = storage.put(src, Path(str(document_id))) @@ -137,6 +153,7 @@ async def upload_doc( document = Document( id=document_id, fname=src.filename, + file_size_kb=file_size_kb, object_store_url=str(object_store_url), ) source_document = crud.update(document) diff --git a/backend/app/core/cloud/storage.py b/backend/app/core/cloud/storage.py index 727380726..a57273a06 100644 --- a/backend/app/core/cloud/storage.py +++ b/backend/app/core/cloud/storage.py @@ -125,6 +125,11 @@ def stream(self, url: str) -> StreamingBody: """Stream a file from storage""" pass + @abstractmethod + def get(self, url: str) -> bytes: + """Get file contents as bytes (for files that fit in memory)""" + pass + @abstractmethod def get_file_size_kb(self, url: str) -> float: """Return the file size in KB""" @@ -193,6 +198,25 @@ def stream(self, url: str) -> StreamingBody: ) raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err + def get(self, url: str) -> bytes: + name = SimpleStorageName.from_url(url) + kwargs = asdict(name) + try: + body = self.aws.client.get_object(**kwargs).get("Body") + content = body.read() + logger.info( + f"[AmazonCloudStorage.get] File retrieved successfully | " + f"{{'project_id': '{self.project_id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'size_bytes': {len(content)}}}" + ) + return content + except ClientError as err: + logger.error( + f"[AmazonCloudStorage.get] AWS get error | " + f"{{'project_id': '{self.project_id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'error': '{str(err)}'}}", + exc_info=True, + ) + raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err + def get_file_size_kb(self, url: str) -> float: name = SimpleStorageName.from_url(url) kwargs = asdict(name) diff --git a/backend/app/crud/rag/open_ai.py b/backend/app/crud/rag/open_ai.py index cdb644abc..cdae82440 100644 --- a/backend/app/crud/rag/open_ai.py +++ b/backend/app/crud/rag/open_ai.py @@ -1,13 +1,13 @@ import json import logging import functools as ft +from io import BytesIO from typing import Iterable from openai import OpenAI, OpenAIError from pydantic import BaseModel from app.core.cloud import CloudStorage -from app.core.config import settings from app.models import Document logger = logging.getLogger(__name__) @@ -121,15 +121,13 @@ def update( storage: CloudStorage, documents: Iterable[Document], ): - files = [] for docs in documents: + files = [] for d in docs: - f_obj = storage.stream(d.object_store_url) - - # monkey patch botocore.response.StreamingBody to make - # OpenAI happy + # Get file bytes and wrap in BytesIO for OpenAI API + content = storage.get(d.object_store_url) + f_obj = BytesIO(content) f_obj.name = d.fname - files.append(f_obj) logger.info( @@ -143,31 +141,11 @@ def update( f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" ) if req.file_counts.completed != req.file_counts.total: - view = {x.fname: x for x in docs} - for i in self.read(vector_store_id): - if i.last_error is None: - fname = self.client.files.retrieve(i.id) - view.pop(fname) - - error = { - "error": "OpenAI document processing error", - "documents": list(view.values()), - } - try: - raise InterruptedError(json.dumps(error, cls=BaseModelEncoder)) - except InterruptedError as err: - logger.error( - f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'error': '{error['error']}', 'failed_documents': {len(error['documents'])}}}", - exc_info=True, - ) - raise - - while files: - f_obj = files.pop() - f_obj.close() - logger.info( - f"[OpenAIVectorStoreCrud.update] Closed file stream | {{'vector_store_id': '{vector_store_id}', 'filename': '{f_obj.name}'}}" + error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed" + logger.error( + f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" ) + raise InterruptedError(error_msg) yield from docs diff --git a/backend/app/models/collection.py b/backend/app/models/collection.py index f8b545404..ccd606deb 100644 --- a/backend/app/models/collection.py +++ b/backend/app/models/collection.py @@ -4,7 +4,7 @@ from uuid import UUID, uuid4 from pydantic import HttpUrl, model_validator, model_serializer -from sqlalchemy import UniqueConstraint, Index, text +from sqlalchemy import Index, text from sqlmodel import Field, Relationship, SQLModel from app.core.util import now @@ -39,12 +39,10 @@ class Collection(SQLModel, table=True): description="Unique identifier for the collection", sa_column_kwargs={"comment": "Unique identifier for the collection"}, ) - provider: ProviderType = ( - Field( - nullable=False, - description="LLM provider used for this collection (e.g., 'openai', 'bedrock', 'google', etc)", - sa_column_kwargs={"comment": "LLM provider used for this collection"}, - ), + provider: ProviderType = Field( + nullable=False, + description="LLM provider used for this collection (e.g., 'openai', 'bedrock', 'google', etc)", + sa_column_kwargs={"comment": "LLM provider used for this collection"}, ) llm_service_id: str = Field( nullable=False, @@ -102,14 +100,6 @@ class CollectionOptions(SQLModel): documents: list[UUID] = Field( description="List of document IDs", ) - batch_size: int = Field( - default=10, - description=( - "Number of documents to send to OpenAI in a single " - "transaction. See the `file_ids` parameter in the " - "vector store [create batch](https://platform.openai.com/docs/api-reference/vector-stores-file-batches/createBatch)." - ), - ) def model_post_init(self, __context: Any): self.documents = list(set(self.documents)) diff --git a/backend/app/models/collection_job.py b/backend/app/models/collection_job.py index 7c55e8562..333ebfd14 100644 --- a/backend/app/models/collection_job.py +++ b/backend/app/models/collection_job.py @@ -2,7 +2,8 @@ from enum import Enum from uuid import UUID, uuid4 -from sqlmodel import Column, Field, SQLModel, Text +from pydantic import field_validator +from sqlmodel import JSON, Column, Field, SQLModel, Text from app.core.util import now from app.models.collection import CollectionIDPublic, CollectionPublic @@ -53,12 +54,32 @@ class CollectionJob(SQLModel, table=True): description="Tracing ID for correlating logs and traces.", sa_column_kwargs={"comment": "Tracing ID for correlating logs and traces"}, ) + docs_num: int | None = Field( + default=None, + description="Total number of documents to be processed in this job", + sa_column_kwargs={ + "comment": "Total number of documents to be processed in this job" + }, + ) + total_size_mb: float | None = Field( + default=None, + description="Total size of documents being uploaded to collection in MB", + sa_column_kwargs={ + "comment": "Total size of documents being uploaded to collection in MB" + }, + ) error_message: str | None = Field( default=None, sa_column=Column( Text, nullable=True, comment="Error message if the job failed" ), ) + documents: list[str] | None = Field( + default=None, + sa_column=Column( + JSON, nullable=True, comment="List of documents given to make collection" + ), + ) # Foreign keys collection_id: UUID | None = Field( @@ -106,7 +127,9 @@ class CollectionJobCreate(SQLModel): collection_id: UUID | None = None status: CollectionJobStatus action_type: CollectionActionType + docs_num: int | None = None project_id: int + documents: list[str] | None = None class CollectionJobUpdate(SQLModel): @@ -114,6 +137,7 @@ class CollectionJobUpdate(SQLModel): status: CollectionJobStatus | None = None error_message: str | None = None collection_id: UUID | None = None + total_size_mb: float | None = None trace_id: str | None = None diff --git a/backend/app/models/document.py b/backend/app/models/document.py index bffa7b39c..12843e72a 100644 --- a/backend/app/models/document.py +++ b/backend/app/models/document.py @@ -41,6 +41,11 @@ class Document(DocumentBase, table=True): default=False, sa_column_kwargs={"comment": "Soft delete flag"}, ) + file_size_kb: float | None = Field( + default=None, + description="The size of the document in kilobytes", + sa_column_kwargs={"comment": "Size of the document in kilobytes (KB)"}, + ) # Foreign keys source_document_id: UUID | None = Field( @@ -80,9 +85,6 @@ class DocumentPublic(DocumentBase): updated_at: datetime = Field( description="The timestamp when the document was last updated" ) - signed_url: str | None = Field( - default=None, description="A signed URL for accessing the document" - ) class TransformedDocumentPublic(DocumentPublic): diff --git a/backend/app/services/collections/create_collection.py b/backend/app/services/collections/create_collection.py index 397e4eb3c..eb37fd039 100644 --- a/backend/app/services/collections/create_collection.py +++ b/backend/app/services/collections/create_collection.py @@ -18,12 +18,10 @@ CollectionJob, Collection, CollectionJobUpdate, - CollectionPublic, CollectionJobPublic, CreationRequest, ) from app.services.collections.helpers import ( - batch_documents, extract_error_message, to_collection_public, ) @@ -168,6 +166,14 @@ def execute_job( job_uuid = UUID(job_id) + with Session(engine) as session: + document_crud = DocumentCrud(session, project_id) + flat_docs = document_crud.read_each(creation_request.documents) + + file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname} + total_size_kb = sum(doc.file_size_kb or 0 for doc in flat_docs) + total_size_mb = round(total_size_kb / 1024, 2) + with Session(engine) as session: collection_job_crud = CollectionJobCrud(session, project_id) collection_job = collection_job_crud.read_one(job_uuid) @@ -176,18 +182,12 @@ def execute_job( CollectionJobUpdate( task_id=task_id, status=CollectionJobStatus.PROCESSING, + total_size_mb=total_size_mb, ), ) storage = get_cloud_storage(session=session, project_id=project_id) - batch_size = creation_request.batch_size or 10 - docs_batches = batch_documents( - DocumentCrud(session, project_id), - creation_request.documents, - batch_size, - ) - provider = get_llm_provider( session=session, provider=creation_request.provider, @@ -198,20 +198,12 @@ def execute_job( result = provider.create( collection_request=creation_request, storage=storage, - docs_batches=docs_batches, + documents=flat_docs, ) llm_service_id = result.llm_service_id llm_service_name = result.llm_service_name - # Flatten the already-loaded batches — no need for a second DB read - flat_docs = [doc for batch in docs_batches for doc in batch] - - file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname} - file_sizes_kb = [ - storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs - ] - with Session(engine) as session: collection_crud = CollectionCrud(session, project_id) @@ -245,11 +237,11 @@ def execute_job( elapsed = time.time() - start_time logger.info( - "[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Sizes: %s KB | Types: %s", + "[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB | Types: %s", collection_id, elapsed, len(flat_docs), - file_sizes_kb, + collection_job.total_size_mb, list(file_exts), ) diff --git a/backend/app/services/collections/helpers.py b/backend/app/services/collections/helpers.py index 6275ee40d..6985ac78e 100644 --- a/backend/app/services/collections/helpers.py +++ b/backend/app/services/collections/helpers.py @@ -3,18 +3,26 @@ import ast import re from uuid import UUID -from typing import List from fastapi import HTTPException from sqlmodel import select -from app.crud import DocumentCrud, CollectionCrud +from app.crud import CollectionCrud from app.api.deps import SessionDep -from app.models import DocumentCollection, Collection, CollectionPublic +from app.models import DocumentCollection, Collection, CollectionPublic, Document logger = logging.getLogger(__name__) +# Necessary Constants - +# Maximum individual document size (must be less than batch size) +MAX_DOC_SIZE_MB = 25 # 25 MB maximum per document + +# Maximum batch size for uploading documents to vector store +# Derived from MAX_DOC_SIZE + buffer to ensure single docs always fit +MAX_BATCH_SIZE_KB = (MAX_DOC_SIZE_MB + 5) * 1024 # 30 MB in KB (25 + 5 MB buffer) +MAX_BATCH_COUNT = 200 # Maximum documents per batch + def get_service_name(provider: str) -> str: """Get the collection service name for a provider.""" @@ -55,25 +63,52 @@ def extract_error_message(err: Exception) -> str: return message.strip()[:1000] -def batch_documents( - document_crud: DocumentCrud, documents: List[UUID], batch_size: int -): - """Batch document IDs into chunks of size `batch_size`, load each via `DocumentCrud.read_each`, - and return a list of document batches.""" +def batch_documents(documents: list[Document]) -> list[list[Document]]: + """ + Batch documents dynamically based on size and count limits. + + Creates a new batch when either: + - Total size reaches 30 MB (30,720 KB) + - Document count reaches 200 + + Args: + documents: List of Document objects to batch + + Returns: + List of document batches + """ + + docs_batches = [] + current_batch = [] + current_batch_size_kb = 0 + + for doc in documents: + doc_size_kb = doc.file_size_kb or 0 + + would_exceed_size = (current_batch_size_kb + doc_size_kb) > MAX_BATCH_SIZE_KB + would_exceed_count = len(current_batch) >= MAX_BATCH_COUNT + + if current_batch and (would_exceed_size or would_exceed_count): + docs_batches.append(current_batch) + logger.info( + f"[batch_documents] Batch completed | {{'batch_num': {len(docs_batches)}, 'doc_count': {len(current_batch)}, 'batch_size_mb': {round(current_batch_size_kb / 1024)}}}" + ) + current_batch = [] + current_batch_size_kb = 0 + + current_batch.append(doc) + current_batch_size_kb += doc_size_kb + + if current_batch: + docs_batches.append(current_batch) + logger.info( + f"[batch_documents] Final Batch completed | {{'batch_num': {len(docs_batches)}, 'doc_count': {len(current_batch)}, 'batch_size_mb': {round(current_batch_size_kb / 1024)}}}" + ) logger.info( - f"[batch_documents] Starting batch iteration for documents | {{'batch_size': {batch_size}, 'total_documents': {len(documents)}}}" + f"[batch_documents] Batching complete | {{'total_batches': {len(docs_batches)}, 'total_documents': {len(documents)}}}" ) - docs_batches = [] - start, stop = 0, batch_size - while True: - view = documents[start:stop] - if not view: - break - batch_docs = document_crud.read_each(view) - docs_batches.append(batch_docs) - start = stop - stop += batch_size + return docs_batches diff --git a/backend/app/services/collections/providers/base.py b/backend/app/services/collections/providers/base.py index b5d37bf9b..36283d1fa 100644 --- a/backend/app/services/collections/providers/base.py +++ b/backend/app/services/collections/providers/base.py @@ -31,18 +31,17 @@ def create( self, collection_request: CreationRequest, storage: CloudStorage, - docs_batches: list[list[Document]], + documents: list[Document], ) -> Collection: """Create collection with documents and optionally an assistant. Args: collection_request: Collection parameters (name, description, document list, etc.) storage: Cloud storage instance for file access - docs_batches: Pre-fetched document batches (DB reads must happen before this call) + documents: Pre-fetched list of Document objects to add to the collection Returns: - llm_service_id: ID of the resource to delete - llm_service_name: Name of the service (determines resource type) + Collection object with llm_service_id and llm_service_name populated """ raise NotImplementedError("Providers must implement execute method") diff --git a/backend/app/services/collections/providers/openai.py b/backend/app/services/collections/providers/openai.py index 9dfaab31c..f52e83394 100644 --- a/backend/app/services/collections/providers/openai.py +++ b/backend/app/services/collections/providers/openai.py @@ -1,11 +1,12 @@ import logging +from typing import List from openai import OpenAI from app.services.collections.providers import BaseProvider from app.core.cloud.storage import CloudStorage from app.crud.rag import OpenAIVectorStoreCrud, OpenAIAssistantCrud -from app.services.collections.helpers import get_service_name +from app.services.collections.helpers import get_service_name, batch_documents from app.models import CreationRequest, Collection, Document @@ -23,13 +24,14 @@ def create( self, collection_request: CreationRequest, storage: CloudStorage, - docs_batches: list[list[Document]], + documents: List[Document], ) -> Collection: """ Create OpenAI vector store with documents and optionally an assistant. docs_batches must be pre-fetched inside a DB session before this call. """ try: + docs_batches = batch_documents(documents) vector_store_crud = OpenAIVectorStoreCrud(self.client) vector_store = vector_store_crud.create() diff --git a/backend/app/services/documents/helpers.py b/backend/app/services/documents/helpers.py index cd941eb55..7d78b6160 100644 --- a/backend/app/services/documents/helpers.py +++ b/backend/app/services/documents/helpers.py @@ -1,7 +1,7 @@ from typing import Optional, Tuple, Iterable, Union from uuid import UUID -from fastapi import HTTPException +from fastapi import HTTPException, UploadFile from app.services.doctransform.registry import ( get_available_transformers, @@ -23,6 +23,26 @@ ) +def calculate_file_size(file: UploadFile) -> float: + """ + Calculate the size of an uploaded file in kilobytes. + + Args: + file: The uploaded file from FastAPI + + Returns: + The size of the file in kilobytes (KB) as a whole number + """ + if file.size: + return round(file.size / 1024) + + file.file.seek(0, 2) + size_bytes = file.file.tell() + file.file.seek(0) + + return round(size_bytes / 1024) + + def pre_transform_validation( *, src_filename: str, diff --git a/backend/app/tests/api/routes/collections/test_create_collections.py b/backend/app/tests/api/routes/collections/test_create_collections.py index 220cb4ee8..b51631939 100644 --- a/backend/app/tests/api/routes/collections/test_create_collections.py +++ b/backend/app/tests/api/routes/collections/test_create_collections.py @@ -3,10 +3,11 @@ from typing import Any from fastapi.testclient import TestClient +from sqlmodel import Session from app.core.config import settings from app.tests.utils.auth import TestAuthContext -from app.models import CollectionJobStatus +from app.models import CollectionJobStatus, Document from app.models.collection import CreationRequest @@ -14,19 +15,39 @@ def _extract_metadata(body: dict) -> dict | None: return body.get("metadata") or body.get("meta") +def _create_test_document( + db: Session, project_id: int, file_size: float = 1 +) -> Document: + """Helper to create a test document.""" + doc = Document( + id=uuid4(), + fname="test_document.txt", + object_store_url="s3://test-bucket/test_document.txt", + project_id=project_id, + file_size_kb=file_size, + ) + db.add(doc) + db.commit() + db.refresh(doc) + return doc + + @patch("app.api.routes.collections.create_service.start_job") def test_collection_creation_with_assistant_calls_start_job_and_returns_job( mock_start_job: Any, client: TestClient, user_api_key_header: dict[str, str], user_api_key: TestAuthContext, + db: Session, ) -> None: + # Create a test document in the database + doc = _create_test_document(db, user_api_key.project_id, file_size=2) + creation_data = CreationRequest( model="gpt-4o", instructions="string", temperature=0.000001, - documents=[UUID("f3e86a17-1e6f-41ec-b020-5b08eebef928")], - batch_size=10, + documents=[doc.id], callback_url=None, ) @@ -67,11 +88,14 @@ def test_collection_creation_vector_only_adds_metadata_and_sets_with_assistant_f client: TestClient, user_api_key_header: dict[str, str], user_api_key: TestAuthContext, + db: Session, ) -> None: + # Create a test document in the database + doc = _create_test_document(db, user_api_key.project_id, file_size=5) + creation_data = CreationRequest( temperature=0.000001, - documents=[str(uuid4())], - batch_size=10, + documents=[doc.id], callback_url=None, ) @@ -103,13 +127,18 @@ def test_collection_creation_vector_only_adds_metadata_and_sets_with_assistant_f def test_collection_creation_vector_only_request_validation_error( - client: TestClient, user_api_key_header: dict[str, str] + client: TestClient, + user_api_key_header: dict[str, str], + user_api_key: TestAuthContext, + db: Session, ) -> None: + # Create a test document in the database + doc = _create_test_document(db, user_api_key.project_id) + payload = { "model": "gpt-4o", "temperature": 0.000001, - "documents": [str(uuid4())], - "batch_size": 10, + "documents": [str(doc.id)], "callback_url": None, } diff --git a/backend/app/tests/api/routes/documents/test_route_document_upload.py b/backend/app/tests/api/routes/documents/test_route_document_upload.py index 6f16b52b1..f5ccca242 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_upload.py +++ b/backend/app/tests/api/routes/documents/test_route_document_upload.py @@ -4,7 +4,7 @@ from pathlib import Path from tempfile import NamedTemporaryFile from urllib.parse import urlparse -from unittest.mock import patch +from unittest.mock import patch, MagicMock import pytest from moto import mock_aws @@ -301,6 +301,50 @@ def test_transformation_job_created_in_database( # Check that start_job was called with the right arguments assert "transformer_name" in kwargs or len(args) >= 4 + def test_upload_file_within_size_limit( + self, + db: Session, + route: Route, + scratch: Path, + uploader: WebUploader, + ) -> None: + """Test that a file within the size limit uploads successfully.""" + aws = AmazonCloudStorageClient() + aws.create() + + # Mock calculate_file_size to return a value just under the 25 MB limit (in KB) + with patch( + "app.api.routes.documents.calculate_file_size", + return_value=25 * 1024 - 1, # 25 MB - 1 KB + ): + response = uploader.put(route, scratch) + + assert response.status_code == 200 + + def test_upload_file_exceeds_size_limit( + self, + db: Session, + route: Route, + scratch: Path, + uploader: WebUploader, + ) -> None: + """Test that a file exceeding 25 MB returns a 413 error.""" + aws = AmazonCloudStorageClient() + aws.create() + + # Mock calculate_file_size to return a value over the 25 MB limit (in KB) + with patch( + "app.api.routes.documents.calculate_file_size", + return_value=25 * 1024 + 1, # 25 MB + 1 KB + ): + response = uploader.put(route, scratch) + + assert response.status_code == 413 + print("response =", response.json()) + error_detail = response.json()["error"] + assert "exceeds the maximum allowed size" in error_detail + assert "25" in error_detail + def test_upload_response_structure_without_transformation( self, db: Session, diff --git a/backend/app/tests/services/collections/providers/test_openai_provider.py b/backend/app/tests/services/collections/providers/test_openai_provider.py index 044c94a14..b21577d49 100644 --- a/backend/app/tests/services/collections/providers/test_openai_provider.py +++ b/backend/app/tests/services/collections/providers/test_openai_provider.py @@ -18,14 +18,16 @@ def test_create_openai_vector_store_only() -> None: collection_request = SimpleNamespace( documents=["doc1", "doc2"], - batch_size=10, model=None, instructions=None, temperature=None, ) storage = MagicMock() - docs_batches = [["doc1"], ["doc2"]] + documents = [ + SimpleNamespace(file_size_kb=10), + SimpleNamespace(file_size_kb=20), + ] vector_store_id = generate_openai_id("vs_") with patch( @@ -38,7 +40,7 @@ def test_create_openai_vector_store_only() -> None: collection = provider.create( collection_request, storage, - docs_batches, + documents, ) assert isinstance(collection, Collection) @@ -52,14 +54,13 @@ def test_create_openai_with_assistant() -> None: collection_request = SimpleNamespace( documents=["doc1"], - batch_size=10, model="gpt-4o", instructions="You are helpful", temperature=0.7, ) storage = MagicMock() - docs_batches = [["doc1"]] + documents = [SimpleNamespace(file_size_kb=10)] vector_store_id = generate_openai_id("vs_") assistant_id = generate_openai_id("asst_") @@ -78,7 +79,7 @@ def test_create_openai_with_assistant() -> None: collection = provider.create( collection_request, storage, - docs_batches, + documents, ) assert collection.llm_service_id == assistant_id @@ -128,7 +129,6 @@ def test_create_propagates_exception() -> None: collection_request = SimpleNamespace( documents=["doc1"], - batch_size=10, model=None, instructions=None, temperature=None, @@ -143,5 +143,5 @@ def test_create_propagates_exception() -> None: provider.create( collection_request, MagicMock(), - [["doc1"]], + [SimpleNamespace(file_size_kb=10)], ) diff --git a/backend/app/tests/services/collections/test_create_collection.py b/backend/app/tests/services/collections/test_create_collection.py index db5a1b30c..4d393ea1b 100644 --- a/backend/app/tests/services/collections/test_create_collection.py +++ b/backend/app/tests/services/collections/test_create_collection.py @@ -58,7 +58,6 @@ def test_start_job_creates_collection_job_and_schedules_task(db: Session) -> Non project = get_project(db) request = CreationRequest( documents=[UUID("f3e86a17-1e6f-41ec-b020-5b08eebef928")], - batch_size=10, callback_url=None, provider="openai", ) @@ -133,7 +132,7 @@ def test_execute_job_success_flow_updates_job_and_creates_collection( aws.client.put_object(Bucket=settings.AWS_S3_BUCKET, Key=str(s3_key), Body=b"test") sample_request = CreationRequest( - documents=[document.id], batch_size=10, callback_url=None, provider="openai" + documents=[document.id], callback_url=None, provider="openai" ) mock_get_llm_provider.return_value = get_mock_provider( @@ -200,9 +199,7 @@ def test_execute_job_assistant_create_failure_marks_failed_and_deletes_collectio collection_id=None, ) - req = CreationRequest( - documents=[], batch_size=10, callback_url=None, provider="openai" - ) + req = CreationRequest(documents=[], callback_url=None, provider="openai") mock_provider = get_mock_provider( llm_service_id="vs_123", llm_service_name="openai vector store" @@ -265,7 +262,6 @@ def test_execute_job_success_flow_callback_job_and_creates_collection( sample_request = CreationRequest( documents=[document.id], - batch_size=10, callback_url=callback_url, provider="openai", ) @@ -346,7 +342,6 @@ def test_execute_job_success_creates_collection_with_callback( sample_request = CreationRequest( documents=[document.id], - batch_size=10, callback_url=callback_url, provider="openai", ) @@ -430,7 +425,6 @@ def test_execute_job_failure_flow_callback_job_and_marks_failed( sample_request = CreationRequest( documents=[uuid.uuid4()], - batch_size=10, callback_url=callback_url, provider="openai", ) diff --git a/backend/app/tests/services/collections/test_helpers.py b/backend/app/tests/services/collections/test_helpers.py index f53271f18..7cddaf305 100644 --- a/backend/app/tests/services/collections/test_helpers.py +++ b/backend/app/tests/services/collections/test_helpers.py @@ -11,7 +11,13 @@ from app.services.collections import helpers from app.tests.utils.utils import get_project from app.tests.utils.collection import get_vector_store_collection -from app.services.collections.helpers import ensure_unique_name +from app.services.collections.helpers import ( + ensure_unique_name, + get_service_name, + to_collection_public, +) +from app.models import Collection, ProviderType +from app.core.util import now def test_extract_error_message_parses_json_and_strips_prefix() -> None: @@ -45,49 +51,91 @@ def test_extract_error_message_handles_non_matching_bodies() -> None: # batch documents -class FakeDocumentCrud: - def __init__(self): - self.calls = [] +def create_fake_documents( + count: int, file_size_kb: float | None = 1 +) -> list[SimpleNamespace]: + """Create fake document objects for testing. + + Args: + count: Number of documents to create + file_size_kb: Size in KB for each document (default 1 KB) + + Returns: + List of SimpleNamespace objects mimicking Document objects + """ + return [ + SimpleNamespace( + id=uuid4(), + fname=f"doc_{i}.txt", + object_store_url=f"s3://bucket/doc_{i}.txt", + file_size_kb=file_size_kb, + ) + for i in range(count) + ] + + +def test_batch_documents_small_files_single_batch() -> None: + """Test that small files all fit in one batch (under 30 MB and under 200 docs).""" + docs = create_fake_documents(6, file_size_kb=1) # 1 KB per file + batches = helpers.batch_documents(docs) + + # All 6 small files should fit in one batch + assert len(batches) == 1 + assert len(batches[0]) == 6 + assert [d.id for d in batches[0]] == [d.id for d in docs] - def read_each(self, ids): - self.calls.append(list(ids)) - return [ - SimpleNamespace( - id=i, fname=f"{i}.txt", object_store_url=f"s3://bucket/{i}.txt" - ) - for i in ids - ] +def test_batch_documents_size_based_batching() -> None: + """Test that large files trigger size-based batching (30 MB limit).""" + # Each file is 20 MB (20480 KB), so max 1 file per batch (since 2 * 20 MB > 30 MB) + docs = create_fake_documents(3, file_size_kb=20 * 1024) + batches = helpers.batch_documents(docs) -def test_batch_documents_even_chunks() -> None: - crud = FakeDocumentCrud() - ids = [uuid4() for _ in range(6)] - batches = helpers.batch_documents(crud, ids, batch_size=3) + # Should create 3 batches, one for each 20 MB file + assert len(batches) == 3 + assert len(batches[0]) == 1 + assert len(batches[1]) == 1 + assert len(batches[2]) == 1 - # read_each called with chunks [0:3], [3:6] - assert crud.calls == [ids[0:3], ids[3:6]] - # output mirrors what read_each returned + +def test_batch_documents_count_based_batching() -> None: + """Test that document count triggers batching (200 docs limit).""" + docs = create_fake_documents(250, file_size_kb=0.1) # Small files + batches = helpers.batch_documents(docs) + + # Should create 2 batches: 200 + 50 assert len(batches) == 2 - assert [d.id for d in batches[0]] == ids[0:3] - assert [d.id for d in batches[1]] == ids[3:6] + assert len(batches[0]) == 200 + assert len(batches[1]) == 50 + +def test_batch_documents_mixed_size_batching() -> None: + """Test batching with files that fit multiple per batch but hit 30 MB limit.""" + # Each file is 15 MB (15360 KB), so 2 files = 30 MB (at limit), 3 files > 30 MB + docs = create_fake_documents(5, file_size_kb=15 * 1024) + batches = helpers.batch_documents(docs) -def test_batch_documents_ragged_last_chunk() -> None: - crud = FakeDocumentCrud() - ids = [uuid4() for _ in range(5)] - batches = helpers.batch_documents(crud, ids, batch_size=2) + # Should create 3 batches: [2 files, 2 files, 1 file] + assert len(batches) == 3 + assert len(batches[0]) == 2 # 30 MB total + assert len(batches[1]) == 2 # 30 MB total + assert len(batches[2]) == 1 # 15 MB total - assert crud.calls == [ids[0:2], ids[2:4], ids[4:5]] - assert [d.id for d in batches[0]] == ids[0:2] - assert [d.id for d in batches[1]] == ids[2:4] - assert [d.id for d in batches[2]] == ids[4:5] + +def test_batch_documents_with_none_file_size() -> None: + """Test that documents with None file_size are treated as 0 bytes.""" + docs = create_fake_documents(10, file_size_kb=None) + batches = helpers.batch_documents(docs) + + # All files with None/0 size should fit in one batch (under both limits) + assert len(batches) == 1 + assert len(batches[0]) == 10 def test_batch_documents_empty_input() -> None: - crud = FakeDocumentCrud() - batches = helpers.batch_documents(crud, [], batch_size=3) + """Test that empty input returns empty batches.""" + batches = helpers.batch_documents([]) assert batches == [] - assert crud.calls == [] def test_ensure_unique_name_success(db: Session) -> None: @@ -125,3 +173,109 @@ def test_ensure_unique_name_conflict_with_vector_store_collection(db: Session) - assert exc.value.status_code == 409 assert "already exists" in exc.value.detail + + +# get_service_name + + +def test_get_service_name_openai() -> None: + """Test that OpenAI provider returns correct service name.""" + result = get_service_name("openai") + assert result == "openai vector store" + + +def test_get_service_name_case_insensitive() -> None: + """Test that provider name is case-insensitive.""" + assert get_service_name("OpenAI") == "openai vector store" + assert get_service_name("OPENAI") == "openai vector store" + assert get_service_name("OpEnAi") == "openai vector store" + + +def test_get_service_name_unknown_provider() -> None: + """Test that unknown providers return empty string.""" + assert get_service_name("unknown") == "" + assert get_service_name("bedrock") == "" # Commented out in the mapping + assert get_service_name("gemini") == "" # Commented out in the mapping + assert get_service_name("") == "" + + +# to_collection_public + + +def test_to_collection_public_vector_store() -> None: + """Test conversion of vector store collection to public model.""" + collection = Collection( + id=uuid4(), + project_id=1, + provider=ProviderType.openai, + llm_service_id="vs_123", + llm_service_name="openai vector store", # Matches get_service_name("openai") + name="Test Collection", + description="Test description", + inserted_at=now(), + updated_at=now(), + deleted_at=None, + ) + + result = to_collection_public(collection) + + # For vector store, should map to knowledge_base fields + assert result.id == collection.id + assert result.knowledge_base_id == "vs_123" + assert result.knowledge_base_provider == "openai vector store" + assert result.llm_service_id is None + assert result.llm_service_name is None + assert result.project_id == 1 + assert result.inserted_at == collection.inserted_at + assert result.updated_at == collection.updated_at + assert result.deleted_at is None + + +def test_to_collection_public_assistant() -> None: + """Test conversion of assistant collection to public model.""" + collection = Collection( + id=uuid4(), + project_id=2, + provider=ProviderType.openai, + llm_service_id="asst_456", + llm_service_name="gpt-4", # Does NOT match vector store name + name="Assistant Collection", + description="Assistant description", + inserted_at=now(), + updated_at=now(), + deleted_at=None, + ) + + result = to_collection_public(collection) + + # For assistant, should map to llm_service fields + assert result.id == collection.id + assert result.llm_service_id == "asst_456" + assert result.llm_service_name == "gpt-4" + assert result.knowledge_base_id is None + assert result.knowledge_base_provider is None + assert result.project_id == 2 + assert result.inserted_at == collection.inserted_at + assert result.updated_at == collection.updated_at + assert result.deleted_at is None + + +def test_to_collection_public_with_deleted_at() -> None: + """Test that deleted_at field is properly included when set.""" + deleted_time = now() + collection = Collection( + id=uuid4(), + project_id=3, + provider=ProviderType.openai, + llm_service_id="vs_789", + llm_service_name="openai vector store", + name="Deleted Collection", + description="Deleted", + inserted_at=now(), + updated_at=now(), + deleted_at=deleted_time, + ) + + result = to_collection_public(collection) + + assert result.deleted_at == deleted_time diff --git a/backend/app/tests/services/documents/__init__.py b/backend/app/tests/services/documents/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/tests/services/documents/test_helpers.py b/backend/app/tests/services/documents/test_helpers.py new file mode 100644 index 000000000..9a4b25344 --- /dev/null +++ b/backend/app/tests/services/documents/test_helpers.py @@ -0,0 +1,60 @@ +from io import BytesIO + +from fastapi import UploadFile + +from app.services.documents.helpers import calculate_file_size + + +def make_upload_file(content: bytes, size: int | None = None) -> UploadFile: + """Create an UploadFile with the given content and optional pre-set size.""" + return UploadFile(file=BytesIO(content), size=size) + + +class TestCalculateFileSizeWithSizeAttribute: + def test_uses_size_attribute_when_set(self) -> None: + """Uses file.size directly when it is provided.""" + file = make_upload_file(b"irrelevant", size=2048) + assert calculate_file_size(file) == 2 # 2048 / 1024 = 2.0 + + def test_rounds_fractional_kb(self) -> None: + """Rounds the result when size is not an exact multiple of 1024.""" + file = make_upload_file(b"irrelevant", size=1536) # 1.5 KB → rounds to 2 + assert calculate_file_size(file) == 2 + + def test_rounds_down_fractional_kb(self) -> None: + """Rounds down when fractional part is below .5.""" + file = make_upload_file(b"irrelevant", size=1300) # ~1.27 KB → rounds to 1 + assert calculate_file_size(file) == 1 + + def test_large_file_size(self) -> None: + """Correctly converts large sizes.""" + file = make_upload_file(b"irrelevant", size=10 * 1024 * 1024) # 10 MB + assert calculate_file_size(file) == 10 * 1024 # 10240 KB + + +class TestCalculateFileSizeViaSeek: + def test_falls_back_to_seek_when_size_is_none(self) -> None: + """Falls back to seek/tell when file.size is None.""" + file = make_upload_file(b"x" * 2048, size=None) + assert calculate_file_size(file) == 2 # 2048 / 1024 = 2 + + def test_falls_back_to_seek_when_size_is_zero(self) -> None: + """Falls back to seek/tell when file.size is 0 (falsy).""" + file = make_upload_file(b"x" * 3072, size=0) + assert calculate_file_size(file) == 3 # 3072 / 1024 = 3 + + def test_resets_file_pointer_after_seek(self) -> None: + """File pointer is back at position 0 after size calculation.""" + file = make_upload_file(b"hello world", size=None) + calculate_file_size(file) + assert file.file.tell() == 0 + + def test_seek_with_fractional_kb(self) -> None: + """Rounds correctly when content size is not a multiple of 1024.""" + file = make_upload_file(b"x" * 1600, size=None) # ~1.56 KB → rounds to 2 + assert calculate_file_size(file) == 2 + + def test_empty_file_via_seek(self) -> None: + """Returns 0 for an empty file when size is None.""" + file = make_upload_file(b"", size=None) + assert calculate_file_size(file) == 0