Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
473 changes: 205 additions & 268 deletions rust/Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ unit_bindings = "deny"
# See https://rust-lang.github.io/rust-clippy/, we might want to add more
enum_glob_use = "deny"

# this is needed until getsentry/symbolic update the js-source-scopes to >0.6.0
# I made a PR for the upstream fix here https://github.com/getsentry/symbolic/pull/945
[patch.crates-io]
js-source-scopes = { git = "https://github.com/getsentry/js-source-scopes", rev = "abea51a0e74840a8d1ec96a61ff36b488994f68c" }

[workspace.dependencies]
anyhow = "1.0"
assert-json-diff = "2.0.2"
Expand Down
2 changes: 1 addition & 1 deletion rust/capture-logs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ base64 = { workspace = true }
rdkafka = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
apache-avro = { version = "0.18.0", features = ["zstandard"] }
apache-avro = { version = "0.21.0", features = ["zstandard"] }
prost = "0.13.5"
bytes = { workspace = true }
tower-http = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/capture-logs/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl KafkaSink {
}),
),
}) {
Err((err, _)) => Err(anyhow!(format!("kafka error: {}", err))),
Err((err, _)) => Err(anyhow!(format!("kafka error: {err}"))),
Ok(delivery_future) => Ok(delivery_future),
}?;

Expand Down
6 changes: 3 additions & 3 deletions rust/capture/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl EphemeralTopic {
std::thread::sleep(Duration::from_millis(100));
continue;
}
bail!("kafka read error: {}", err);
bail!("kafka read error: {err}");
}
None => bail!("kafka read timeout"),
}
Expand Down Expand Up @@ -321,7 +321,7 @@ impl EphemeralTopic {
std::thread::sleep(Duration::from_millis(100));
continue;
}
bail!("kafka read error: {}", err);
bail!("kafka read error: {err}");
}
None => bail!("kafka read timeout"),
}
Expand Down Expand Up @@ -368,7 +368,7 @@ impl EphemeralTopic {
std::thread::sleep(Duration::from_millis(100));
continue;
}
bail!("kafka read error: {}", err);
bail!("kafka read error: {err}");
}
None => bail!("kafka read timeout"),
}
Expand Down
2 changes: 1 addition & 1 deletion rust/common/hypercache/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn set_cache_value(
.body(json_str.into_bytes().into())
.send()
.await
.map_err(|e| anyhow::anyhow!("Failed to set S3 object: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to set S3 object: {e}"))?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rust/cymbal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ sqlx = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
sourcemap = "9.0.0"
symbolic = { version = "12.12.1", features = ["sourcemapcache"] }
symbolic = { version = "12.17.0", features = ["sourcemapcache"] }
proguard = "5.6.2"
reqwest = { workspace = true }
sha2 = "0.10.8"
Expand Down
2 changes: 1 addition & 1 deletion rust/feature-flags/src/utils/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ impl TestContext {
redis
.set(cache_key, payload)
.await
.map_err(|e| anyhow::anyhow!("Failed to set cache: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to set cache: {e}"))?;

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions rust/kafka-deduplicator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,11 @@ impl Config {
.parse()
.with_context(|| format!("Failed to parse scientific notation: {s}"))?;
if float_val < 0.0 {
return Err(anyhow::anyhow!(
"Storage capacity cannot be negative: {}",
s
));
return Err(anyhow::anyhow!("Storage capacity cannot be negative: {s}"));
}
if float_val > u64::MAX as f64 {
return Err(anyhow::anyhow!(
"Storage capacity exceeds maximum value: {}",
s
"Storage capacity exceeds maximum value: {s}"
));
}
return Ok(float_val as u64);
Expand Down
11 changes: 2 additions & 9 deletions rust/kafka-deduplicator/src/deduplication_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,7 @@ impl DeduplicationProcessor {
key, output_topic, e
);
Err(anyhow::anyhow!(
"Failed to publish event with key '{}' to topic '{}': {}",
key,
output_topic,
e
"Failed to publish event with key '{key}' to topic '{output_topic}': {e}"
))
}
}
Expand Down Expand Up @@ -641,11 +638,7 @@ impl MessageProcessor for DeduplicationProcessor {
topic, partition, offset, e
);
return Err(anyhow::anyhow!(
"Failed to parse RawEvent from data field at {}:{} offset {}: {}",
topic,
partition,
offset,
e
"Failed to parse RawEvent from data field at {topic}:{partition} offset {offset}: {e}"
));
}
};
Expand Down
2 changes: 1 addition & 1 deletion rust/kafka-deduplicator/src/kafka/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl PartitionTrackingInfo {
}
// No-op transitions (already in target state)
(current, target) if current == target => {
return Err(anyhow::anyhow!("Already in state {:?}", current));
return Err(anyhow::anyhow!("Already in state {current:?}"));
}
// Invalid transitions
_ => {
Expand Down
2 changes: 1 addition & 1 deletion rust/kafka-deduplicator/src/processor_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ mod tests {
if let Some(fail_offset) = self.fail_on_offset {
if offset == fail_offset {
self.failed_count.fetch_add(1, Ordering::SeqCst);
return Err(anyhow::anyhow!("Simulated failure at offset {}", offset));
return Err(anyhow::anyhow!("Simulated failure at offset {offset}"));
}
}

Expand Down
10 changes: 4 additions & 6 deletions rust/kafka-deduplicator/src/rocksdb/dedup_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,18 @@ impl TryFrom<&SerializableRawEvent> for RawEvent {
let uuid = serializable
.uuid
.as_ref()
.map(|s| s.parse().map_err(|e| anyhow!("Invalid UUID: {}", e)))
.map(|s| s.parse().map_err(|e| anyhow!("Invalid UUID: {e}")))
.transpose()?;

let distinct_id = serializable
.distinct_id_json
.as_ref()
.map(|s| {
serde_json::from_str(s).map_err(|e| anyhow!("Invalid distinct_id JSON: {}", e))
})
.map(|s| serde_json::from_str(s).map_err(|e| anyhow!("Invalid distinct_id JSON: {e}")))
.transpose()?;

let properties: HashMap<String, serde_json::Value> =
serde_json::from_str(&serializable.properties_json)
.map_err(|e| anyhow!("Invalid properties JSON: {}", e))?;
.map_err(|e| anyhow!("Invalid properties JSON: {e}"))?;

Ok(RawEvent {
uuid,
Expand Down Expand Up @@ -119,7 +117,7 @@ impl VersionedMetadata {
bincode::serde::decode_from_slice(payload, bincode::config::standard())?;
Ok(VersionedMetadata::V1(v1))
}
_ => Err(anyhow::anyhow!("unknown version: {}", version)),
_ => Err(anyhow::anyhow!("unknown version: {version}")),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/kafka-deduplicator/src/rocksdb/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl RocksDbStore {
Ok(_) => Ok(()),
Err(e) => {
self.metrics.counter(ROCKSDB_ERRORS_COUNTER).increment(1);
Err(anyhow::anyhow!("Failed to flush: {}", e))
Err(anyhow::anyhow!("Failed to flush: {e}"))
}
}
}
Expand All @@ -368,7 +368,7 @@ impl RocksDbStore {
pub fn flush_wal(&self, sync: bool) -> Result<()> {
self.db
.flush_wal(sync)
.map_err(|e| anyhow::anyhow!("Failed to flush WAL (sync={}): {}", sync, e))
.map_err(|e| anyhow::anyhow!("Failed to flush WAL (sync={sync}): {e}"))
}

/// Get the latest sequence number from the database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn send_test_messages(
producer
.send(record, Timeout::After(Duration::from_secs(5)))
.await
.map_err(|(e, _)| anyhow::anyhow!("Failed to send message: {}", e))?;
.map_err(|(e, _)| anyhow::anyhow!("Failed to send message: {e}"))?;
}

// Give kafka some time to process the messages
Expand Down
Loading