From 6769ba6a52ffb490f98c2c0c576191e33ce85efb Mon Sep 17 00:00:00 2001 From: Martin Frisk Date: Wed, 8 Apr 2026 12:29:16 +0200 Subject: [PATCH] Check and forward any trailers on stream end. When errors occur on a stream it is sent as a trailer, but the current client does not look for them. This makes an error sent from a server look like a successfully completed stream. This PR makes tonic-web-wasm-client behave the same as most grpc implementations in that the errors in the trailer is forwarded. --- src/response_body.rs | 10 ++++++-- test-suite/simple/client/tests/web.rs | 28 +++++++++++++++++++++ test-suite/simple/proto/echo.proto | 2 ++ test-suite/simple/server/src/main.rs | 36 +++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 2 deletions(-) diff --git a/src/response_body.rs b/src/response_body.rs index 86f7b45..1fbbd87 100644 --- a/src/response_body.rs +++ b/src/response_body.rs @@ -279,8 +279,11 @@ impl Body for ResponseBody { return Poll::Ready(Some(Ok(http_body::Frame::data(data.freeze())))); } - // If reading data is finished return `None` + // If reading data is finished, return trailers (if available) before ending if self.state.finished_data() { + if let Some(trailers) = self.trailer.take() { + return Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))); + } return Poll::Ready(None); } @@ -300,7 +303,10 @@ impl Body for ResponseBody { let data = self.data.take().unwrap(); return Poll::Ready(Some(Ok(http_body::Frame::data(data.freeze())))); } else if self.state.finished_data() { - // If we finished reading data continue return `None` + // If we finished reading data, return trailers before ending + if let Some(trailers) = self.trailer.take() { + return Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))); + } return Poll::Ready(None); } else if self.finished_stream { // If stream is finished but data is not finished return error diff --git a/test-suite/simple/client/tests/web.rs b/test-suite/simple/client/tests/web.rs index 8fcaaa7..266441e 100644 --- a/test-suite/simple/client/tests/web.rs +++ b/test-suite/simple/client/tests/web.rs @@ -93,6 +93,34 @@ async fn test_infinite_echo_stream() { assert!(response.is_some()); } +#[wasm_bindgen_test] +async fn test_echo_stream_error() { + let mut client = build_client(); + + let mut stream_response = client + .echo_stream_error(EchoRequest { + message: "John".to_string(), + }) + .await + .expect("success stream response") + .into_inner(); + + // First two messages should succeed + for i in 0..2 { + let response = stream_response.message().await.expect("stream message"); + assert!(response.is_some(), "message {} should be present", i); + assert_eq!(response.unwrap().message, "echo(John)"); + } + + // Third message should be an error from the trailer + let error = stream_response + .message() + .await + .expect_err("should receive error from trailer"); + assert_eq!(error.code(), Code::Internal); + assert_eq!(error.message(), "stream error after 2 messages"); +} + #[wasm_bindgen_test] async fn test_error_response() { let mut client = build_client(); diff --git a/test-suite/simple/proto/echo.proto b/test-suite/simple/proto/echo.proto index aa0eaef..883b688 100644 --- a/test-suite/simple/proto/echo.proto +++ b/test-suite/simple/proto/echo.proto @@ -12,6 +12,8 @@ service Echo { rpc EchoErrorResponse (EchoRequest) returns (EchoResponse) {} rpc EchoTimeout (EchoRequest) returns (EchoResponse) {} + + rpc EchoStreamError (EchoRequest) returns (stream EchoResponse) {} } message EchoRequest { diff --git a/test-suite/simple/server/src/main.rs b/test-suite/simple/server/src/main.rs index 4d875be..2fea4d2 100644 --- a/test-suite/simple/server/src/main.rs +++ b/test-suite/simple/server/src/main.rs @@ -61,6 +61,16 @@ impl Echo for EchoService { Ok(Response::new(InfiniteMessageStream::new(request.message))) } + type EchoStreamErrorStream = ErrorAfterMessagesStream; + + async fn echo_stream_error( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + Ok(Response::new(ErrorAfterMessagesStream::new(request.message))) + } + async fn echo_error_response( &self, _: tonic::Request, @@ -118,6 +128,32 @@ impl Stream for InfiniteMessageStream { } } +pub struct ErrorAfterMessagesStream { + message: String, + count: u8, +} + +impl ErrorAfterMessagesStream { + pub fn new(message: String) -> Self { + Self { message, count: 0 } + } +} + +impl Stream for ErrorAfterMessagesStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + if self.count < 2 { + self.count += 1; + Poll::Ready(Some(Ok(EchoResponse { + message: format!("echo({})", self.message), + }))) + } else { + Poll::Ready(Some(Err(Status::internal("stream error after 2 messages")))) + } + } +} + const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); const DEFAULT_EXPOSED_HEADERS: [HeaderName; 3] = [ HeaderName::from_static("grpc-status"),