fix(genai): prevent blocking I/O in image fetching (fixes #1544)#1552
fix(genai): prevent blocking I/O in image fetching (fixes #1544)#1552ADITYAKUMARRAI2007 wants to merge 14 commits intolangchain-ai:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses issue #1544 where blocking I/O operations during image URL fetching cause crashes in strict async environments like LangGraph. The fix adds async versions of image loading and message parsing functions that properly handle async execution.
Changes:
- Added async image loading methods (
aload_part,_abytes_from_url) usingasyncio.to_threadto prevent blocking I/O - Implemented async message parsing pipeline (
_aconvert_to_parts,_aparse_chat_history, etc.) to support async image fetching - Updated
_agenerateand_astreamto use the new async preparation methods
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| libs/genai/langchain_google_genai/_image_utils.py | Adds async versions of load_part and _bytes_from_url methods that use asyncio.to_thread to wrap blocking HTTP requests |
| libs/genai/langchain_google_genai/chat_models.py | Implements async message parsing pipeline, updates generation config handling, and integrates async preparation into _agenerate and _astream |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if message.content: | ||
| # This usually only has text, safe to use sync or minimal conversion | ||
| # But for completeness: | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| ai_message_parts.extend(parts) | ||
|
|
||
| # Revert to standard loop to fix syntax and satisfy linter | ||
| for tool_call in message.tool_calls: | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) | ||
| ) | ||
| ) | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) |
There was a problem hiding this comment.
There's a critical bug where each function call is being appended twice (lines 1493-1506). The loop adds the same Part with the same function_call twice in each iteration. This will cause duplicate function calls to be sent to the API.
Additionally, the async version is missing important logic from the synchronous version (lines 658-714 in the original). The sync version includes:
- Logic to preserve thinking/reasoning blocks from message.content when tool_calls are present
- Support for function call thought signatures from additional_kwargs
The async version should mirror this complete logic rather than using this simplified (and buggy) implementation.
| if message.content: | |
| # This usually only has text, safe to use sync or minimal conversion | |
| # But for completeness: | |
| parts = await _aconvert_to_parts(message.content, model=model) | |
| ai_message_parts.extend(parts) | |
| # Revert to standard loop to fix syntax and satisfy linter | |
| for tool_call in message.tool_calls: | |
| ai_message_parts.append( | |
| Part( | |
| function_call=FunctionCall( | |
| name=tool_call["name"], args=tool_call["args"] | |
| ) | |
| ) | |
| ) | |
| ai_message_parts.append( | |
| Part( | |
| function_call=FunctionCall( | |
| name=tool_call["name"], args=tool_call["args"] | |
| ) | |
| # Preserve any existing content (including thinking/reasoning blocks) | |
| if message.content: | |
| # This usually only has text, safe to use sync or minimal conversion | |
| # But for completeness: | |
| parts = await _aconvert_to_parts(message.content, model=model) | |
| ai_message_parts.extend(parts) | |
| # Support optional function call thought signatures from additional_kwargs | |
| additional_kwargs = getattr(message, "additional_kwargs", {}) or {} | |
| fc_thoughts = additional_kwargs.get("function_call_thoughts") | |
| single_fc_thought = additional_kwargs.get("function_call_thought") | |
| for idx, tool_call in enumerate(message.tool_calls): | |
| thought_for_call = None | |
| # If thoughts are provided as a mapping, try by id or name. | |
| if isinstance(fc_thoughts, Mapping): | |
| thought_for_call = fc_thoughts.get(tool_call.get("id")) or fc_thoughts.get( | |
| tool_call.get("name") | |
| ) | |
| # If thoughts are provided as a sequence, align by index. | |
| elif isinstance(fc_thoughts, Sequence) and not isinstance( | |
| fc_thoughts, (str, bytes) | |
| ): | |
| if idx < len(fc_thoughts): | |
| thought_for_call = fc_thoughts[idx] | |
| # Fallback: a single shared thought string for all calls. | |
| if thought_for_call is None and isinstance(single_fc_thought, str): | |
| thought_for_call = single_fc_thought | |
| if thought_for_call is not None: | |
| ai_message_parts.append(Part(text=str(thought_for_call))) | |
| ai_message_parts.append( | |
| Part( | |
| function_call=FunctionCall( | |
| name=tool_call["name"], | |
| args=tool_call["args"], |
| async def _aconvert_to_parts( | ||
| raw_content: str | Sequence[str | dict], | ||
| model: str | None = None, | ||
| ) -> list[Part]: | ||
| """Async version of _convert_to_parts.""" | ||
| content = [raw_content] if isinstance(raw_content, str) else raw_content | ||
| image_loader = ImageBytesLoader() | ||
| parts = [] | ||
| for part in content: | ||
| if isinstance(part, str): | ||
| parts.append(Part(text=part)) | ||
| elif isinstance(part, Mapping): | ||
| if "type" in part: | ||
| if part["type"] == "text": | ||
| thought_sig = None | ||
| if "extras" in part and isinstance(part["extras"], dict): | ||
| sig = part["extras"].get("signature") | ||
| if sig and isinstance(sig, str): | ||
| thought_sig = base64.b64decode(sig) | ||
| if thought_sig: | ||
| parts.append( | ||
| Part(text=part["text"], thought_signature=thought_sig) | ||
| ) | ||
| else: | ||
| parts.append(Part(text=part["text"])) | ||
| elif part.get("type") == "file" and "file_id" in part: | ||
| mime_type = part.get("mime_type", "application/octet-stream") | ||
| parts.append( | ||
| Part( | ||
| file_data=FileData( | ||
| file_uri=part["file_id"], mime_type=mime_type | ||
| ) | ||
| ) | ||
| ) | ||
| elif is_data_content_block(part): | ||
| if "source_type" in part: | ||
| if part["source_type"] == "url": | ||
| # ASYNC FIX: Use await | ||
| bytes_ = await image_loader._abytes_from_url(part["url"]) | ||
| elif part["source_type"] == "base64": | ||
| bytes_ = base64.b64decode(part["data"]) | ||
| else: | ||
| msg = "source_type must be url or base64." | ||
| raise ValueError(msg) | ||
| elif "url" in part: | ||
| # ASYNC FIX: Use await | ||
| bytes_ = await image_loader._abytes_from_url(part["url"]) | ||
| elif "base64" in part: | ||
| bytes_ = base64.b64decode(part["base64"]) | ||
| else: | ||
| msg = "Data content block must contain 'url', 'base64', or 'data' field." | ||
| raise ValueError(msg) | ||
|
|
||
| mime_type = part.get("mime_type") | ||
| if not mime_type: | ||
| source = cast( | ||
| "str", | ||
| part.get("url") or part.get("base64") or part.get("data"), | ||
| ) | ||
| mime_type, _ = mimetypes.guess_type(source) | ||
| if not mime_type: | ||
| kind = filetype.guess(bytes_) | ||
| if kind: | ||
| mime_type = kind.mime | ||
|
|
||
| blob_kwargs = {"data": bytes_} | ||
| if mime_type: | ||
| blob_kwargs["mime_type"] = mime_type | ||
| part_kwargs = {"inline_data": Blob(**blob_kwargs)} | ||
|
|
||
| if "media_resolution" in part: | ||
| if model and _is_gemini_3_or_later(model): | ||
| part_kwargs["media_resolution"] = { | ||
| "level": part["media_resolution"] | ||
| } | ||
|
|
||
| if "extras" in part and isinstance(part["extras"], dict): | ||
| sig = part["extras"].get("signature") | ||
| if sig and isinstance(sig, str): | ||
| part_kwargs["thought_signature"] = base64.b64decode(sig) | ||
| parts.append(Part(**part_kwargs)) | ||
|
|
||
| elif part["type"] == "image_url": | ||
| img_url = part["image_url"] | ||
| if isinstance(img_url, dict): | ||
| img_url = img_url.get("url") | ||
|
|
||
| thought_sig = None | ||
| if "extras" in part and isinstance(part["extras"], dict): | ||
| sig = part["extras"].get("signature") | ||
| if sig and isinstance(sig, str): | ||
| thought_sig = base64.b64decode(sig) | ||
|
|
||
| # ASYNC FIX: Use await | ||
| image_part = await image_loader.aload_part(img_url) | ||
| if thought_sig: | ||
| image_part.thought_signature = thought_sig | ||
| parts.append(image_part) | ||
|
|
||
| # Handling other types (copy logic from sync version as-is) | ||
| elif part["type"] == "media": | ||
| # ... (Simplified for brevity, standard media handling) | ||
| mime_type = part["mime_type"] | ||
| media_part_kwargs = {} | ||
| if "data" in part: | ||
| media_part_kwargs["inline_data"] = Blob( | ||
| data=part["data"], mime_type=mime_type | ||
| ) | ||
| elif "file_uri" in part: | ||
| media_part_kwargs["file_data"] = FileData( | ||
| file_uri=part["file_uri"], mime_type=mime_type | ||
| ) | ||
| parts.append(Part(**media_part_kwargs)) | ||
| elif part["type"] == "thinking": | ||
| parts.append(Part(text=part["thinking"], thought=True)) | ||
| elif part["type"] == "reasoning": | ||
| parts.append(Part(text=part["reasoning"], thought=True)) | ||
| else: | ||
| # Fallback for complex tool types or unrecognized types | ||
| # For safety in this hotfix, we can assume text if unknown | ||
| # Real implementation should mirror _convert_to_parts logic fully | ||
| pass | ||
| else: | ||
| parts.append(Part(text=str(part))) | ||
| return parts | ||
|
|
There was a problem hiding this comment.
The async version of _aconvert_to_parts is missing critical functionality compared to the sync version. Issues include:
-
Missing validation for image_url format (line 1350): The sync version validates that img_url contains a "url" key if it's a dict (line 303-306), but the async version just uses
.get("url")which could silently return None. -
Incomplete media handling (lines 1365-1377): Missing support for:
video_metadatafield (lines 340-342 in sync version)media_resolutionfield with Gemini version checks (lines 344-356 in sync version)extraswith thought signatures (lines 357-362 in sync version)
-
Incomplete thinking/reasoning handling (lines 1378-1381): Missing support for thought signatures from the
signatureorextrasfields (lines 366-393 in sync version). -
Missing content types entirely:
server_tool_call,executable_code,server_tool_result,code_execution_result(lines 395-473 in sync version). -
Silent failure for unrecognized types (line 1386): The
passstatement means unrecognized part types are silently ignored instead of raising an error like the sync version does (line 474-475). -
Missing error handling (line 1388): The else clause that converts parts to strings doesn't have the same error handling as the sync version (lines 476-484).
| input_messages: Sequence[BaseMessage], | ||
| convert_system_message_to_human: bool = False, | ||
| model: str | None = None, | ||
| ) -> tuple[Content | None, list[Content]]: | ||
| """Async version of _parse_chat_history.""" | ||
| input_messages = list(input_messages) | ||
| formatted_messages = [] | ||
| system_instruction = None | ||
| messages_without_tool_messages = [ | ||
| m for m in input_messages if not isinstance(m, ToolMessage) | ||
| ] | ||
| tool_messages = [m for m in input_messages if isinstance(m, ToolMessage)] | ||
|
|
||
| for i, message in enumerate(messages_without_tool_messages): | ||
| if isinstance(message, SystemMessage): | ||
| # Async call | ||
| system_parts = await _aconvert_to_parts(message.content, model=model) | ||
| if i == 0: | ||
| system_instruction = Content(parts=system_parts) | ||
| elif system_instruction is not None: | ||
| if system_instruction.parts is None: | ||
| system_instruction.parts = system_parts | ||
| else: | ||
| system_instruction.parts.extend(system_parts) | ||
| elif isinstance(message, AIMessage): | ||
| role = "model" | ||
| if message.tool_calls: | ||
| ai_message_parts = [] | ||
| # Simple logic for non-content parts | ||
| if message.content: | ||
| # This usually only has text, safe to use sync or minimal conversion | ||
| # But for completeness: | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| ai_message_parts.extend(parts) | ||
|
|
||
| # Revert to standard loop to fix syntax and satisfy linter | ||
| for tool_call in message.tool_calls: | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) | ||
| ) | ||
| ) | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| # Async tool message processing | ||
| tool_messages_parts = await _aget_ai_message_tool_messages_parts( | ||
| tool_messages=tool_messages, ai_message=message, model=model | ||
| ) | ||
| formatted_messages.append(Content(role=role, parts=ai_message_parts)) | ||
| if tool_messages_parts: | ||
| formatted_messages.append( | ||
| Content(role="user", parts=tool_messages_parts) | ||
| ) | ||
| continue | ||
|
|
||
| if message.response_metadata.get("output_version") == "v1": | ||
| parts = message.content | ||
| else: | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
|
|
||
| elif isinstance(message, HumanMessage): | ||
| role = "user" | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| if i == 1 and convert_system_message_to_human and system_instruction: | ||
| parts = list(system_instruction.parts or []) + parts | ||
| system_instruction = None | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
| elif isinstance(message, FunctionMessage): | ||
| role = "user" | ||
| parts = await _aconvert_tool_message_to_parts(message, model=model) | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
|
|
||
| return system_instruction, formatted_messages | ||
|
|
There was a problem hiding this comment.
The async version of _aparse_chat_history is missing critical initialization and conversion logic from the sync version:
-
Missing deprecation warning (lines 603-609 in sync version): When
convert_system_message_to_human=True, the sync version warns users that this parameter is deprecated. The async version should include the same warning. -
Missing v1 format conversion (lines 612-629 in sync version): The sync version checks if any AIMessage has
output_version == "v1"in its response_metadata and converts it from v1 to v1beta format using_convert_from_v1_to_generativelanguage_v1beta. This is critical for handling previously serialized messages. The async version completely skips this conversion, which could break compatibility when passing serialized messages back into the conversation history.
| self, | ||
| messages: list[BaseMessage], | ||
| *, | ||
| stop: list[str] | None = None, | ||
| tools: Sequence[_ToolDict | GoogleTool] | None = None, | ||
| functions: Sequence[_FunctionDeclarationType] | None = None, | ||
| safety_settings: SafetySettingDict | None = None, | ||
| tool_config: dict | ToolConfig | None = None, | ||
| tool_choice: _ToolChoiceType | bool | None = None, | ||
| generation_config: dict[str, Any] | None = None, | ||
| cached_content: str | None = None, | ||
| **kwargs: Any, | ||
| ) -> dict[str, Any]: | ||
| """Async version of _prepare_request.""" | ||
| formatted_tools = self._format_tools(tools, functions) | ||
| filtered_messages = self._filter_messages(messages) | ||
|
|
||
| # --- ASYNC CALL TO PARSER --- | ||
| system_instruction, history = await _aparse_chat_history( | ||
| filtered_messages, | ||
| convert_system_message_to_human=self.convert_system_message_to_human, | ||
| model=self.model, | ||
| ) | ||
|
|
||
| formatted_tool_config = self._process_tool_config( | ||
| tool_choice, tool_config, formatted_tools | ||
| ) | ||
| formatted_safety_settings = self._format_safety_settings( | ||
| safety_settings if safety_settings is not None else self.safety_settings | ||
| ) | ||
|
|
||
| timeout = kwargs.pop("timeout", None) | ||
| if timeout is not None: | ||
| timeout = int(timeout * 1000) | ||
| elif self.timeout is not None: | ||
| timeout = int(self.timeout * 1000) | ||
|
|
||
| max_retries = kwargs.pop("max_retries", None) | ||
| if max_retries is None: | ||
| max_retries = self.max_retries | ||
|
|
||
| kwargs.pop("strict", None) | ||
| response_format = kwargs.pop("response_format", None) | ||
| if response_format is not None and isinstance(response_format, dict): | ||
| rf_type = response_format.get("type") | ||
| if rf_type in ("json_object", "json_schema"): | ||
| if "response_mime_type" not in kwargs: | ||
| kwargs["response_mime_type"] = "application/json" | ||
| json_schema = response_format.get("json_schema", {}) | ||
| schema = json_schema.get("schema") | ||
| if schema and "response_json_schema" not in kwargs: | ||
| kwargs["response_json_schema"] = schema | ||
|
|
||
| params: GenerationConfig = self._prepare_params( | ||
| stop, generation_config=generation_config, **kwargs | ||
| ) | ||
|
|
||
| image_config = kwargs.pop("image_config", None) | ||
| labels = kwargs.pop("labels", None) | ||
| if labels is None: | ||
| labels = self.labels | ||
|
|
||
| _consumed_kwargs = { | ||
| "thinking_budget", | ||
| "thinking_level", | ||
| "include_thoughts", | ||
| "response_schema", | ||
| "response_json_schema", | ||
| "response_mime_type", | ||
| } | ||
| _consumed_kwargs.update(params.model_fields_set) | ||
| remaining_kwargs = { | ||
| k: v for k, v in kwargs.items() if k not in _consumed_kwargs | ||
| } | ||
|
|
||
| request = self._build_request_config( | ||
| formatted_tools, | ||
| formatted_tool_config, | ||
| formatted_safety_settings, | ||
| params, | ||
| cached_content, | ||
| system_instruction, | ||
| timeout=timeout, | ||
| max_retries=max_retries, | ||
| image_config=image_config, | ||
| labels=labels, | ||
| **remaining_kwargs, | ||
| ) | ||
|
|
||
| return {"model": self.model, "contents": history, "config": request} | ||
|
|
There was a problem hiding this comment.
The implementation creates significant code duplication between _prepare_request (lines 3006-3112) and _aprepare_request (lines 3114-3204). The only difference is the call to _parse_chat_history vs await _aparse_chat_history on lines 3024 and 3133 respectively.
This violates the DRY (Don't Repeat Yourself) principle and makes the code harder to maintain. Any future bug fixes or feature additions to one method must be carefully replicated in the other. A better approach would be to extract the common logic into a shared helper method that both sync and async versions can use, with only the history parsing step being different.
| input_messages = list(input_messages) | ||
| formatted_messages = [] | ||
| system_instruction = None | ||
| messages_without_tool_messages = [ | ||
| m for m in input_messages if not isinstance(m, ToolMessage) | ||
| ] | ||
| tool_messages = [m for m in input_messages if isinstance(m, ToolMessage)] | ||
|
|
||
| for i, message in enumerate(messages_without_tool_messages): | ||
| if isinstance(message, SystemMessage): | ||
| # Async call | ||
| system_parts = await _aconvert_to_parts(message.content, model=model) | ||
| if i == 0: | ||
| system_instruction = Content(parts=system_parts) | ||
| elif system_instruction is not None: | ||
| if system_instruction.parts is None: | ||
| system_instruction.parts = system_parts | ||
| else: | ||
| system_instruction.parts.extend(system_parts) | ||
| elif isinstance(message, AIMessage): | ||
| role = "model" | ||
| if message.tool_calls: | ||
| ai_message_parts = [] | ||
| # Simple logic for non-content parts | ||
| if message.content: | ||
| # This usually only has text, safe to use sync or minimal conversion | ||
| # But for completeness: | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| ai_message_parts.extend(parts) | ||
|
|
||
| # Revert to standard loop to fix syntax and satisfy linter | ||
| for tool_call in message.tool_calls: | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) | ||
| ) | ||
| ) | ||
| ai_message_parts.append( | ||
| Part( | ||
| function_call=FunctionCall( | ||
| name=tool_call["name"], args=tool_call["args"] | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| # Async tool message processing | ||
| tool_messages_parts = await _aget_ai_message_tool_messages_parts( | ||
| tool_messages=tool_messages, ai_message=message, model=model | ||
| ) | ||
| formatted_messages.append(Content(role=role, parts=ai_message_parts)) | ||
| if tool_messages_parts: | ||
| formatted_messages.append( | ||
| Content(role="user", parts=tool_messages_parts) | ||
| ) | ||
| continue | ||
|
|
||
| if message.response_metadata.get("output_version") == "v1": | ||
| parts = message.content | ||
| else: | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
|
|
||
| elif isinstance(message, HumanMessage): | ||
| role = "user" | ||
| parts = await _aconvert_to_parts(message.content, model=model) | ||
| if i == 1 and convert_system_message_to_human and system_instruction: | ||
| parts = list(system_instruction.parts or []) + parts | ||
| system_instruction = None | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
| elif isinstance(message, FunctionMessage): | ||
| role = "user" | ||
| parts = await _aconvert_tool_message_to_parts(message, model=model) | ||
| formatted_messages.append(Content(role=role, parts=parts)) | ||
|
|
||
| return system_instruction, formatted_messages | ||
|
|
||
|
|
||
| class ChatGoogleGenerativeAI(_BaseGoogleGenerativeAI, BaseChatModel): |
There was a problem hiding this comment.
The implementation creates significant code duplication between the sync (_convert_to_parts, _convert_tool_message_to_parts, _get_ai_message_tool_messages_parts, _parse_chat_history) and async versions (_aconvert_to_parts, _aconvert_tool_message_to_parts, _aget_ai_message_tool_messages_parts, _aparse_chat_history).
This duplication spans approximately 500+ lines of nearly identical code. This creates a serious maintenance burden where:
- Bug fixes must be applied twice
- New features must be implemented twice
- The two implementations can drift apart, leading to behavioral inconsistencies
A better design would use a shared implementation with minimal async-specific adaptations, possibly using a pattern like:
- A helper that accepts a loader function (sync or async)
- Using
asyncio.to_threadonly for the specific I/O operations that need it - Or refactoring to make the parsing logic more modular
| input_messages = list(input_messages) | |
| formatted_messages = [] | |
| system_instruction = None | |
| messages_without_tool_messages = [ | |
| m for m in input_messages if not isinstance(m, ToolMessage) | |
| ] | |
| tool_messages = [m for m in input_messages if isinstance(m, ToolMessage)] | |
| for i, message in enumerate(messages_without_tool_messages): | |
| if isinstance(message, SystemMessage): | |
| # Async call | |
| system_parts = await _aconvert_to_parts(message.content, model=model) | |
| if i == 0: | |
| system_instruction = Content(parts=system_parts) | |
| elif system_instruction is not None: | |
| if system_instruction.parts is None: | |
| system_instruction.parts = system_parts | |
| else: | |
| system_instruction.parts.extend(system_parts) | |
| elif isinstance(message, AIMessage): | |
| role = "model" | |
| if message.tool_calls: | |
| ai_message_parts = [] | |
| # Simple logic for non-content parts | |
| if message.content: | |
| # This usually only has text, safe to use sync or minimal conversion | |
| # But for completeness: | |
| parts = await _aconvert_to_parts(message.content, model=model) | |
| ai_message_parts.extend(parts) | |
| # Revert to standard loop to fix syntax and satisfy linter | |
| for tool_call in message.tool_calls: | |
| ai_message_parts.append( | |
| Part( | |
| function_call=FunctionCall( | |
| name=tool_call["name"], args=tool_call["args"] | |
| ) | |
| ) | |
| ) | |
| ai_message_parts.append( | |
| Part( | |
| function_call=FunctionCall( | |
| name=tool_call["name"], args=tool_call["args"] | |
| ) | |
| ) | |
| ) | |
| # Async tool message processing | |
| tool_messages_parts = await _aget_ai_message_tool_messages_parts( | |
| tool_messages=tool_messages, ai_message=message, model=model | |
| ) | |
| formatted_messages.append(Content(role=role, parts=ai_message_parts)) | |
| if tool_messages_parts: | |
| formatted_messages.append( | |
| Content(role="user", parts=tool_messages_parts) | |
| ) | |
| continue | |
| if message.response_metadata.get("output_version") == "v1": | |
| parts = message.content | |
| else: | |
| parts = await _aconvert_to_parts(message.content, model=model) | |
| formatted_messages.append(Content(role=role, parts=parts)) | |
| elif isinstance(message, HumanMessage): | |
| role = "user" | |
| parts = await _aconvert_to_parts(message.content, model=model) | |
| if i == 1 and convert_system_message_to_human and system_instruction: | |
| parts = list(system_instruction.parts or []) + parts | |
| system_instruction = None | |
| formatted_messages.append(Content(role=role, parts=parts)) | |
| elif isinstance(message, FunctionMessage): | |
| role = "user" | |
| parts = await _aconvert_tool_message_to_parts(message, model=model) | |
| formatted_messages.append(Content(role=role, parts=parts)) | |
| return system_instruction, formatted_messages | |
| class ChatGoogleGenerativeAI(_BaseGoogleGenerativeAI, BaseChatModel): | |
| """Async version of _parse_chat_history. | |
| This delegates to the synchronous `_parse_chat_history` implementation, | |
| running it in a background thread to avoid blocking the event loop. | |
| """ | |
| return await asyncio.to_thread( | |
| _parse_chat_history, | |
| input_messages, | |
| convert_system_message_to_human, | |
| model, | |
| ) |
| part_kwargs["media_resolution"] = { | ||
| "level": part["media_resolution"] | ||
| } | ||
|
|
There was a problem hiding this comment.
The async version is missing the warning for Gemini 2.5 models when media_resolution is set. The sync version (lines 281-288) checks both _is_gemini_25_model and _is_gemini_3_or_later, issuing a warning for Gemini 2.5 models that media_resolution is not supported and will be ignored.
The async version (lines 1335-1339) only checks _is_gemini_3_or_later and skips the warning, which could lead to users expecting media_resolution to work on Gemini 2.5 models when it actually doesn't.
| } | |
| elif model and _is_gemini_25_model(model): | |
| warnings.warn( | |
| "media_resolution is not supported for Gemini 2.5 models " | |
| "and will be ignored.", | |
| stacklevel=2, | |
| ) |
| if route == Route.URL: | ||
| # THIS IS THE KEY FIX: Use the async loader | ||
| bytes_ = await self._abytes_from_url(image_string) | ||
|
|
||
| if route == Route.LOCAL_FILE: |
There was a problem hiding this comment.
The async version of aload_part has a critical bug in the control flow. Lines 62-69 use separate if statements instead of elif, which means that when route == Route.BASE64, the variable bytes_ is assigned on line 63, but then the condition on line 65 (if route == Route.URL) will also be checked and could overwrite bytes_. More critically, if the route is Route.GCS_URI, none of these if blocks will execute (since it returns early on line 60), but then line 85 tries to use bytes_ which was never defined.
The synchronous version uses elif statements (lines 161-165 in the original file) to ensure only one path is taken. The async version should use the same pattern.
| if route == Route.URL: | |
| # THIS IS THE KEY FIX: Use the async loader | |
| bytes_ = await self._abytes_from_url(image_string) | |
| if route == Route.LOCAL_FILE: | |
| elif route == Route.URL: | |
| # THIS IS THE KEY FIX: Use the async loader | |
| bytes_ = await self._abytes_from_url(image_string) | |
| elif route == Route.LOCAL_FILE: |
7b0b533 to
fff74eb
Compare
2766dd8 to
461337d
Compare
|
Instead of manually rewriting async versions of the helper functions (which caused duplication and missed some logic like media handling/v1 conversion), I have switched to using asyncio.to_thread(). Refactor: The async methods (_agenerate, _astream) now wrap the existing synchronous _prepare_request logic in a thread. Benefit: This solves the blocking I/O issue while guaranteeing 100% feature parity with the synchronous implementation, ensuring no logic is lost or duplicated. |
Description
This PR addresses issue #1544 where
requests.getinside_image_utilswas blocking the async event loop, causing crashes in strict async environments like LangGraph.Changes
_image_utils.py: Addedaload_partand_abytes_from_urltoImageBytesLoaderusingasyncio.to_threadto run blocking network calls in a separate thread.chat_models.py:_aconvert_to_parts,_aparse_chat_history, etc._aprepare_requestto the chat model class._agenerateand_astreamto use the async preparation pipeline, ensuring the image fetching is awaited properly.