Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2156,9 +2156,12 @@ def _normalize_status_code(exc: Exception) -> Optional[int]:
mapped_status_code = _normalize_status_code(mapped_exception)
original_status_code = _normalize_status_code(e)

if mapped_status_code is not None and 400 <= mapped_status_code < 500:
# Raise non-retriable client errors directly (skip fallback).
# Exception: 429 (rate-limit) IS retriable/transient — allow it
# through so the Router can switch to a different model group.
if mapped_status_code is not None and 400 <= mapped_status_code < 500 and mapped_status_code != 429:
raise mapped_exception
if original_status_code is not None and 400 <= original_status_code < 500:
if original_status_code is not None and 400 <= original_status_code < 500 and original_status_code != 429:
raise mapped_exception

from litellm.exceptions import MidStreamFallbackError
Expand Down
29 changes: 18 additions & 11 deletions litellm/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1597,17 +1597,24 @@ async def stream_with_fallbacks():
"content_policy_fallbacks", self.content_policy_fallbacks
)
initial_kwargs["original_function"] = self._acompletion
initial_kwargs["messages"] = messages + [
{
"role": "system",
"content": "You are a helpful assistant. You are given a message and you need to respond to it. You are also given a generated content. You need to respond to the message in continuation of the generated content. Do not repeat the same content. Your response should be in continuation of this text: ",
},
{
"role": "assistant",
"content": e.generated_content,
"prefix": True,
},
]
if e.is_pre_first_chunk or not e.generated_content:
# No content was generated before the error (e.g. a
# rate-limit 429 on the very first chunk). Retry with
# the original messages — adding a continuation prompt
# would waste tokens and confuse the model.
initial_kwargs["messages"] = messages
else:
initial_kwargs["messages"] = messages + [
{
"role": "system",
"content": "You are a helpful assistant. You are given a message and you need to respond to it. You are also given a generated content. You need to respond to the message in continuation of the generated content. Do not repeat the same content. Your response should be in continuation of this text: ",
},
{
"role": "assistant",
"content": e.generated_content,
"prefix": True,
},
]
self._update_kwargs_before_fallbacks(
model=model_group, kwargs=initial_kwargs
)
Expand Down
35 changes: 35 additions & 0 deletions tests/test_litellm/litellm_core_utils/test_streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,41 @@ async def _raise_bad_request(**kwargs):
assert "invalid maxOutputTokens" in str(excinfo.value)


@pytest.mark.asyncio
async def test_vertex_streaming_rate_limit_triggers_midstream_fallback(logging_obj: Logging):
"""Ensure Vertex 429 rate-limit errors raise MidStreamFallbackError, not RateLimitError.

429 is technically a 4xx status code, but unlike other client errors (400, 401, 403)
it is transient and retriable. The Router's fallback system should be able to catch
it via MidStreamFallbackError and switch to a different model group.

Regression test for https://github.com/BerriAI/litellm/issues/20870
"""
from litellm.exceptions import MidStreamFallbackError
from litellm.llms.vertex_ai.common_utils import VertexAIError

async def _raise_rate_limit(**kwargs):
raise VertexAIError(
status_code=429,
message="Resource exhausted. Please try again later.",
headers=None,
)

response = CustomStreamWrapper(
completion_stream=None,
model="gemini-3-flash-preview",
logging_obj=logging_obj,
custom_llm_provider="vertex_ai_beta",
make_call=_raise_rate_limit,
)

with pytest.raises(MidStreamFallbackError) as excinfo:
await response.__anext__()

assert excinfo.value.is_pre_first_chunk is True
assert excinfo.value.generated_content == ""


def test_streaming_handler_with_created_time_propagation(
initialized_custom_stream_wrapper: CustomStreamWrapper, logging_obj: Logging
):
Expand Down
89 changes: 87 additions & 2 deletions tests/test_litellm/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,13 +1357,98 @@ async def __anext__(self):
fallback_kwargs = mock_fallback_utils.call_args.kwargs["kwargs"]
modified_messages = fallback_kwargs["messages"]

# Should have assistant message with empty content
assert modified_messages[2]["content"] == ""
# With empty generated_content, should use original messages (no
# continuation prompt) to avoid wasting tokens on an empty prefix.
assert len(modified_messages) == 1
assert modified_messages[0] == {"role": "user", "content": "Test"}
print("✓ Handles empty generated content correctly")

print("✓ Edge case tests passed!")


@pytest.mark.asyncio
async def test_acompletion_streaming_iterator_pre_first_chunk_skips_continuation():
"""Pre-first-chunk errors (e.g. 429 rate-limit before any content) should
retry with original messages, not add a continuation prompt.

Regression test for https://github.com/BerriAI/litellm/issues/18229
and https://github.com/BerriAI/litellm/issues/20870
"""
from unittest.mock import MagicMock

from litellm.exceptions import MidStreamFallbackError

router = litellm.Router(
model_list=[
{
"model_name": "gpt-4",
"litellm_params": {"model": "gpt-4", "api_key": "fake-key-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo", "api_key": "fake-key-2"},
},
],
fallbacks=[{"gpt-4": ["gpt-3.5-turbo"]}],
)

messages = [{"role": "user", "content": "Hello"}]
initial_kwargs = {"model": "gpt-4", "stream": True}

# Simulate a 429 rate-limit error on the very first chunk
pre_first_chunk_error = MidStreamFallbackError(
message="Rate limit exceeded",
model="gpt-4",
llm_provider="vertex_ai",
generated_content="",
is_pre_first_chunk=True,
)

class AsyncIteratorImmediateError:
def __init__(self):
self.model = "gpt-4"
self.custom_llm_provider = "vertex_ai"
self.logging_obj = MagicMock()
self.chunks = []

def __aiter__(self):
return self

async def __anext__(self):
raise pre_first_chunk_error

mock_response = AsyncIteratorImmediateError()

class EmptyAsyncIterator:
def __aiter__(self):
return self

async def __anext__(self):
raise StopAsyncIteration

with patch.object(
router,
"async_function_with_fallbacks_common_utils",
return_value=EmptyAsyncIterator(),
) as mock_fallback_utils:
iterator = await router._acompletion_streaming_iterator(
model_response=mock_response,
messages=messages,
initial_kwargs=initial_kwargs,
)
async for _ in iterator:
pass

assert mock_fallback_utils.called
fallback_kwargs = mock_fallback_utils.call_args.kwargs["kwargs"]
modified_messages = fallback_kwargs["messages"]

# Pre-first-chunk: should use original messages without continuation prompt
assert len(modified_messages) == 1
assert modified_messages[0] == {"role": "user", "content": "Hello"}
print("✓ Pre-first-chunk error uses original messages (no continuation prompt)")


@pytest.mark.asyncio
async def test_acompletion_streaming_iterator_preserves_hidden_params():
"""
Expand Down
Loading