diff --git a/examples/asynchronous_processing.rs b/examples/asynchronous_processing.rs index c02b81fa5..d6e50cb5d 100644 --- a/examples/asynchronous_processing.rs +++ b/examples/asynchronous_processing.rs @@ -4,7 +4,7 @@ use std::time::Duration; use clap::{Arg, Command}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; -use log::info; +use log::{error, info, warn}; use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::StreamConsumer; @@ -58,6 +58,8 @@ async fn run_async_processor( input_topic: String, output_topic: String, ) { + // Count how many time we tried to connect to kafka + let mut attempt = 0; // Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`. let consumer: StreamConsumer = ClientConfig::new() .set("group.id", &group_id) @@ -80,42 +82,57 @@ async fn run_async_processor( .expect("Producer creation error"); // Create the outer pipeline on the message stream. - let stream_processor = consumer.stream().try_for_each(|borrowed_message| { - let producer = producer.clone(); - let output_topic = output_topic.to_string(); - async move { - // Process each message - record_borrowed_message_receipt(&borrowed_message).await; - // Borrowed messages can't outlive the consumer they are received from, so they need to - // be owned in order to be sent to a separate thread. - let owned_message = borrowed_message.detach(); - record_owned_message_receipt(&owned_message).await; - tokio::spawn(async move { - // The body of this block will be executed on the main thread pool, - // but we perform `expensive_computation` on a separate thread pool - // for CPU-intensive tasks via `tokio::task::spawn_blocking`. - let computation_result = - tokio::task::spawn_blocking(|| expensive_computation(owned_message)) - .await - .expect("failed to wait for expensive computation"); - let produce_future = producer.send( - FutureRecord::to(&output_topic) - .key("some key") - .payload(&computation_result), - Duration::from_secs(0), - ); - match produce_future.await { - Ok(delivery) => println!("Sent: {:?}", delivery), - Err((e, _)) => println!("Error: {:?}", e), + while attempt < 10 { + let stream_processor = consumer.stream().try_for_each(|borrowed_message| { + let producer = producer.clone(); + let output_topic = output_topic.to_string(); + async move { + // Process each message + record_borrowed_message_receipt(&borrowed_message).await; + // Borrowed messages can't outlive the consumer they are received from, so they need to + // be owned in order to be sent to a separate thread. + let owned_message = borrowed_message.detach(); + record_owned_message_receipt(&owned_message).await; + tokio::spawn(async move { + // The body of this block will be executed on the main thread pool, + // but we perform `expensive_computation` on a separate thread pool + // for CPU-intensive tasks via `tokio::task::spawn_blocking`. + let computation_result = + tokio::task::spawn_blocking(|| expensive_computation(owned_message)) + .await + .expect("failed to wait for expensive computation"); + let produce_future = producer.send( + FutureRecord::to(&output_topic) + .key("some key") + .payload(&computation_result), + Duration::from_secs(0), + ); + match produce_future.await { + Ok(delivery) => println!("Sent: {:?}", delivery), + Err((e, _)) => println!("Error: {:?}", e), + } + }); + Ok(()) + } + }); + + info!("starting event loop"); + let stream_result = stream_processor.await; + match stream_result { + Ok(_) => { + info!("connected"); + } + Err(err) => { + if attempt + 1 < 10 { + warn!("connect failed: {:?}; retrying", err); + attempt += 1; + } else { + error!("connect failed after 10 attempts: {:?}", err); + panic!("cannot connect to Kafka"); } - }); - Ok(()) + } } - }); - - info!("Starting event loop"); - stream_processor.await.expect("stream processing failed"); - info!("Stream processing terminated"); + } } #[tokio::main] @@ -128,7 +145,10 @@ async fn main() { .short('b') .long("brokers") .help("Broker list in kafka format") - .default_value("localhost:9092"), + .default_value("localhost:9092"), // NOTE: Using 'localhost' can be unreliable: it may resolve to IPv6 (::1) or IPv4 (127.0.0.1) + // depending on OS/DNS/gai.conf, and IPv6 may be unreachable in some setups (e.g., Docker or disabled IPv6). + // When running locally, prefer specifying the exact loopback IP to avoid resolution/stack surprises, + // e.g., 127.0.0.1:9092 (IPv4) or [::1]:9092 (IPv6). ) .arg( Arg::new("group-id")