Skip to content

bug(connectors): consume() return value discarded in sink runtime #2927

@mlevkov

Description

@mlevkov

Description

The connector runtime calls consume() on each sink plugin via FFI but discards the return value. If a sink's consume() returns an error (e.g., HTTP timeout, database connection failure, serialization error), the runtime ignores it — no logging, no retry, no metric. It proceeds to the next poll cycle as if delivery succeeded.

Location

core/connectors/runtime/src/sink.rs, lines 585-593:

(consume)(
    plugin_id,
    topic_meta.as_ptr(),
    topic_meta.len(),
    messages_meta.as_ptr(),
    messages_meta.len(),
    messages.as_ptr(),
    messages.len(),
);

The FFI function returns an i32 (0 = success, 1 = error), but the return value is never checked.

Impact

This affects all 7 sink connectors (Elasticsearch, Iceberg, MongoDB, PostgreSQL, Quickwit, Stdout, and the new HTTP sink). Every Err(...) returned by any sink's consume() implementation is silently swallowed by the runtime.

As a workaround, sink implementations must log errors internally before returning Err. The HTTP sink (#2925) documents this limitation and logs all errors at error! level inside consume().

Suggested Fix

Check the return value and log at minimum:

let result = (consume)(
    plugin_id,
    topic_meta.as_ptr(),
    topic_meta.len(),
    messages_meta.as_ptr(),
    messages_meta.len(),
    messages.as_ptr(),
    messages.len(),
);

if result != 0 {
    error!(
        "Sink plugin {} consume() returned error code {}",
        plugin_id, result
    );
    // Optionally: increment error metric, trigger retry, etc.
}

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions