diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index 0f6705faf..02e099c1e 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -63,6 +63,8 @@ pub enum RuntimeError { TokenFileReadError(String, String), #[error("Token file is empty: {0}")] TokenFileEmpty(String), + #[error("Sink consume failed for plugin {0} with return code: {1}")] + SinkConsumeFailed(u32, i32), } impl RuntimeError { @@ -78,6 +80,7 @@ impl RuntimeError { RuntimeError::TokenFileNotFound(_) => "invalid_configuration", RuntimeError::TokenFileReadError(_, _) => "invalid_configuration", RuntimeError::TokenFileEmpty(_) => "invalid_configuration", + RuntimeError::SinkConsumeFailed(_, _) => "sink_consume_failed", _ => "error", } } diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 48961aca6..d6567bc79 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -582,7 +582,7 @@ async fn process_messages( RuntimeError::FailedToSerializeRawMessages })?; - (consume)( + let result = (consume)( plugin_id, topic_meta.as_ptr(), topic_meta.len(), @@ -592,5 +592,10 @@ async fn process_messages( messages.len(), ); + if result != 0 { + error!("Sink consume failed for plugin {plugin_id} with return code: {result}"); + return Err(RuntimeError::SinkConsumeFailed(plugin_id, result)); + } + Ok(processed_count) }