Skip to content

feat(taskbroker): Passthrough mode#617

Open
untitaker wants to merge 1 commit intomainfrom
passthrough
Open

feat(taskbroker): Passthrough mode#617
untitaker wants to merge 1 commit intomainfrom
passthrough

Conversation

@untitaker
Copy link
Copy Markdown
Member

ref STREAM-882

Introduce passthrough mode, so that a broker can be used to spawn tasks
from any topic with any type of message format. This will make it easier
to migrate existing consumers to be tasks instead, without changing data
layout in prod. For more information refer to the ticket above.

ref STREAM-882

Introduce passthrough mode, so that a broker can be used to spawn tasks
from any topic with any type of message format. This will make it easier
to migrate existing consumers to be tasks instead, without changing data
layout in prod. For more information refer to the ticket above.
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 30, 2026

@untitaker untitaker marked this pull request as ready for review April 30, 2026 16:17
@untitaker untitaker requested a review from a team as a code owner April 30, 2026 16:17
Comment thread src/config.rs
/// Maps every application to its worker endpoint, both represented as strings.
pub worker_map: BTreeMap<String, String>,

/// Enable passthrough mode for consuming raw bytes from legacy topics.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Enable passthrough mode for consuming raw bytes from legacy topics.
/// Enable passthrough mode for consuming raw bytes from raw topics.

Comment on lines +51 to +55
#[derive(Serialize)]
struct Params<'a> {
args: (&'a [u8],),
kwargs: HashMap<(), ()>,
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, why define this struct here vs outside of the function?

Comment thread src/config.rs
Comment on lines +306 to +307
/// In passthrough mode, raw Kafka message bytes are wrapped into TaskActivation.
pub passthrough_mode: bool,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a nit, but the naming feel backwards. Passthrough means letting things through:

  • When consuming a tasks topic we expect activations and do not do anything to them till they are stored. This seems the passthrough mode to me.
  • When consuming raw topics we need to pre-process them, into Activations before passing them to the rest of the pipeline.

Am I getting this wrong ?

Comment thread src/config.rs
Comment on lines +318 to +319
/// Processing deadline duration in seconds for passthrough activations.
pub passthrough_processing_deadline_duration: u64,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we did not assign them a deadline? Today there is no explicit deadline apart for max poll time in kafka.
I am not saying we should set one, I'd much rather having a deadline that is explicit and we control rather than letting the infrastructure kill tasks in unexpected way.
We would have to verify if we have tasks that routinely take long period of times in multi processing or in any other setup that does not cause issues with the max poll time.

kwargs: HashMap::new(),
};

rmp_serde::to_vec_named(&params).map_err(|e| anyhow!("Failed to encode msgpack: {}", e))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a real world scenario where this error would happen? Would we consider the message invalid and DLQ? Please add a TODO to make this clear if that is the case.

) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
move |msg: Arc<OwnedMessage>| {
let Some(payload) = msg.payload() else {
return Err(anyhow!("Message has no payload"));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be careful before erroring here
https://github.com/getsentry/arroyo/blob/main/arroyo/backends/kafka/consumer.py#L521

Arroyo manages this turning the Null message into an empty binary string.
I doubt anybody process those messages but I cannot verify either.

Would this be a DLQ scenario as well? If not, what is the value in crashing here ?

let id = Uuid::new_v4().to_string();
let parameters_bytes = encode_passthrough_params(payload)?;
let now = Utc::now();
let received_at = prost_types::Timestamp {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not have a task timestamp in taskbroker?
We have consumers that do something with old messages (where the broker timestamp is old), it is convenient, if we only provide the timestamp at which the task is received by the consumer that information (old message) is lost. Have you considered using the message timestamp here ?

Comment on lines +102 to +107
metrics::histogram!(
"consumer.passthrough.payload_size_bytes",
"namespace" => config.namespace.clone(),
"taskname" => config.taskname.clone()
)
.record(payload.len() as f64);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these metrics batched or sampled ? Producing metrics per message is a performance bottleneck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants