Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/response_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,11 @@ impl Body for ResponseBody {
return Poll::Ready(Some(Ok(http_body::Frame::data(data.freeze()))));
}

// If reading data is finished return `None`
// If reading data is finished, return trailers (if available) before ending
if self.state.finished_data() {
if let Some(trailers) = self.trailer.take() {
return Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers))));
}
return Poll::Ready(None);
}

Expand All @@ -300,7 +303,10 @@ impl Body for ResponseBody {
let data = self.data.take().unwrap();
return Poll::Ready(Some(Ok(http_body::Frame::data(data.freeze()))));
} else if self.state.finished_data() {
// If we finished reading data continue return `None`
// If we finished reading data, return trailers before ending
if let Some(trailers) = self.trailer.take() {
return Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers))));
}
return Poll::Ready(None);
} else if self.finished_stream {
// If stream is finished but data is not finished return error
Expand Down
28 changes: 28 additions & 0 deletions test-suite/simple/client/tests/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,34 @@ async fn test_infinite_echo_stream() {
assert!(response.is_some());
}

#[wasm_bindgen_test]
async fn test_echo_stream_error() {
let mut client = build_client();

let mut stream_response = client
.echo_stream_error(EchoRequest {
message: "John".to_string(),
})
.await
.expect("success stream response")
.into_inner();

// First two messages should succeed
for i in 0..2 {
let response = stream_response.message().await.expect("stream message");
assert!(response.is_some(), "message {} should be present", i);
assert_eq!(response.unwrap().message, "echo(John)");
}

// Third message should be an error from the trailer
let error = stream_response
.message()
.await
.expect_err("should receive error from trailer");
assert_eq!(error.code(), Code::Internal);
assert_eq!(error.message(), "stream error after 2 messages");
}

#[wasm_bindgen_test]
async fn test_error_response() {
let mut client = build_client();
Expand Down
2 changes: 2 additions & 0 deletions test-suite/simple/proto/echo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ service Echo {
rpc EchoErrorResponse (EchoRequest) returns (EchoResponse) {}

rpc EchoTimeout (EchoRequest) returns (EchoResponse) {}

rpc EchoStreamError (EchoRequest) returns (stream EchoResponse) {}
}

message EchoRequest {
Expand Down
36 changes: 36 additions & 0 deletions test-suite/simple/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ impl Echo for EchoService {
Ok(Response::new(InfiniteMessageStream::new(request.message)))
}

type EchoStreamErrorStream = ErrorAfterMessagesStream;

async fn echo_stream_error(
&self,
request: Request<EchoRequest>,
) -> Result<Response<Self::EchoStreamErrorStream>, Status> {
let request = request.into_inner();
Ok(Response::new(ErrorAfterMessagesStream::new(request.message)))
}

async fn echo_error_response(
&self,
_: tonic::Request<EchoRequest>,
Expand Down Expand Up @@ -118,6 +128,32 @@ impl Stream for InfiniteMessageStream {
}
}

pub struct ErrorAfterMessagesStream {
message: String,
count: u8,
}

impl ErrorAfterMessagesStream {
pub fn new(message: String) -> Self {
Self { message, count: 0 }
}
}

impl Stream for ErrorAfterMessagesStream {
type Item = Result<EchoResponse, Status>;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.count < 2 {
self.count += 1;
Poll::Ready(Some(Ok(EchoResponse {
message: format!("echo({})", self.message),
})))
} else {
Poll::Ready(Some(Err(Status::internal("stream error after 2 messages"))))
}
}
}

const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
const DEFAULT_EXPOSED_HEADERS: [HeaderName; 3] = [
HeaderName::from_static("grpc-status"),
Expand Down
Loading