Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions crates/dekaf/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ impl TaskManager {
// messages emitted by the task manager after that session is closed would be lost.
// Instead, we'll create a separate log forwarder for this task manager that will report
// its logs to the correct task's ops logs, irrespective of the session that spawned it.
tokio::spawn(logging::forward_logs(

let handle = tokio::spawn(logging::forward_logs(
GazetteWriter::new(self.clone()),
stop_signal.clone(),
self.clone().run_task_manager(
Expand All @@ -233,6 +234,22 @@ impl TaskManager {
),
));

tokio::spawn(async move {
let handle_resp = handle.await;

match handle_resp {
Ok(Err(task_mgr_err)) => {
tracing::error!(?task_mgr_err, "run_task_manager error!")
}
Ok(_) => {
tracing::info!("run_task_manager exited Ok")
}
Err(e) => {
tracing::error!(?e, "run_task_manager panic!");
}
}
});

TaskStateListener(receiver)
}

Expand Down Expand Up @@ -538,18 +555,26 @@ async fn update_partition_info(
}
};

let partition_result = fetch_partitions(
&journal_client,
&collection_spec.name,
partition_selector
let partition_result = match tokio::time::timeout(
timeout,
fetch_partitions(
&journal_client,
&collection_spec.name,
partition_selector,
),
)
.await
.map(|partitions| {
(journal_client, claims, partitions)
})
.map_err(|e| {
SharedError::from(e.context(format!("Partition fetch failed for collection '{}'", collection_spec.name)))
});
{
Ok(Ok(partitions)) => Ok((journal_client, claims, partitions)),
Ok(Err(e)) => Err(SharedError::from(e.context(format!(
"Partition fetch failed for collection '{}'",
collection_spec.name
)))),
Err(_) => Err(SharedError::from(anyhow::anyhow!(
"Timed out fetching partitions for collection '{}'",
collection_spec.name
))),
};

// Return the result associated with this template name
(template_name, partition_result)
Expand Down Expand Up @@ -634,8 +659,13 @@ pub async fn fetch_partitions(
..Default::default()
};

tracing::info!("Starting to list journals");
let response = journal_client.list(request).await?;

let journals_len = response.journals.len();

tracing::info!(num_journals = journals_len, "Finished listing journals");

let mut partitions = Vec::with_capacity(response.journals.len());

for journal in response.journals {
Expand Down
Loading