Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ path = "src/lib.rs"
rand = "0.8"
image = "0.25"
image-compare = "0.5.0"
memmap2 = "0.9"
rayon = "1.10"
memchr = "2.7"

[dev-dependencies]
divan = { version = "4.0.2", package = "codspeed-divan-compat" }
Expand Down
21 changes: 16 additions & 5 deletions benches/blob_corruption_checker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use divan::Bencher;
use eurorust_2025_workshop::blob_corruption_checker::find_corruptions_sequential;
use eurorust_2025_workshop::blob_corruption_checker::find_corruptions_parallel;

fn main() {
divan::main();
Expand All @@ -8,7 +8,7 @@ fn main() {
#[divan::bench(sample_count = 3, sample_size = 5)]
fn corruption_check(bencher: Bencher) {
bencher.bench_local(|| {
let corruptions = divan::black_box(find_corruptions_sequential(
let corruptions = divan::black_box(find_corruptions_parallel(
"reference.bin",
"corrupted.bin",
1024, // 1KB chunks
Expand All @@ -18,14 +18,25 @@ fn corruption_check(bencher: Bencher) {

// All corruptions should be 1KB aligned
for corruption in &corruptions {
assert_eq!(corruption.offset % 1024, 0, "Corruption offset should be 1KB aligned");
assert_eq!(corruption.length % 1024, 0, "Corruption length should be multiple of 1KB");
assert_eq!(
corruption.offset % 1024,
0,
"Corruption offset should be 1KB aligned"
);
assert_eq!(
corruption.length % 1024,
0,
"Corruption length should be multiple of 1KB"
);
}

// Check specific corruptions
assert_eq!(corruptions[0].offset, 14801920, "First corruption offset");
assert_eq!(corruptions[0].length, 2048, "First corruption length");
assert_eq!(corruptions[25].offset, 243891200, "Middle corruption offset");
assert_eq!(
corruptions[25].offset, 243891200,
"Middle corruption offset"
);
assert_eq!(corruptions[25].length, 4096, "Middle corruption length");
assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset");
assert_eq!(corruptions[49].length, 5120, "Last corruption length");
Expand Down
2 changes: 1 addition & 1 deletion benches/dna_matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn dna_matcher() {
);
let pattern = "AGTCCGTA";

let matches = divan::black_box(naive_dna_matcher(
let matches = divan::black_box(exported_dna_matcher(
divan::black_box(&genome),
divan::black_box(pattern),
));
Expand Down
2 changes: 1 addition & 1 deletion benches/lut_grayscale_bench.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eurorust_2025_workshop::lut_grayscale::*;
use image::{RgbImage};
use image::RgbImage;

fn main() {
divan::main();
Expand Down
21 changes: 11 additions & 10 deletions src/bfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::VecDeque;

/// A simple graph represented as an adjacency list
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -26,22 +26,23 @@ impl Graph {
/// Naive BFS implementation using Vec as a queue (intentionally slow)
/// Returns the order in which nodes were visited
pub fn bfs_naive(graph: &Graph, start: usize) -> Vec<usize> {
let mut visited = HashSet::new();
let mut queue = Vec::new(); // Using Vec instead of VecDeque - intentionally inefficient!
let mut result = Vec::new();
let mut visited = vec![false; graph.num_nodes()];
let mut queue = VecDeque::new(); // Using VecDeque for efficient FIFO queue
let mut result = Vec::with_capacity(graph.num_nodes());

queue.push(start);
visited.insert(start);
queue.push_back(start);
visited[start] = true;

while !queue.is_empty() {
// remove(0) is O(n) - this makes BFS slow!
let node = queue.remove(0);
// pop_front() is O(1) - this makes BFS efficient!
let node = queue.pop_front().unwrap();
result.push(node);

if let Some(neighbors) = graph.adjacency.get(node) {
for &neighbor in neighbors {
if visited.insert(neighbor) {
queue.push(neighbor);
if !visited[neighbor] {
visited[neighbor] = true;
queue.push_back(neighbor);
}
}
}
Expand Down
114 changes: 112 additions & 2 deletions src/blob_corruption_checker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use memmap2::Mmap;
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufReader, Read};

Expand Down Expand Up @@ -60,6 +62,85 @@ pub fn find_corruptions_sequential(
corruptions
}

pub fn find_corruptions_parallel(
reference_path: &str,
corrupted_path: &str,
chunk_size: usize,
) -> Vec<Corruption> {
// Memory map both files
let ref_file = File::open(reference_path).unwrap();
let corrupt_file = File::open(corrupted_path).unwrap();

// it is fine to use unsafe here since the files are not modified while mapped
let ref_mmap = unsafe { Mmap::map(&ref_file).unwrap() };
let corrupt_mmap = unsafe { Mmap::map(&corrupt_file).unwrap() };

let file_size = ref_mmap.len();

// Divide the file into chunks and process in parallel
let num_chunks = (file_size + chunk_size - 1) / chunk_size;

// Use fold/reduce pattern to stream and merge results
let corruptions = (0..num_chunks)
.into_par_iter()
.fold(
Vec::new,
|mut acc: Vec<Corruption>, chunk_idx| {
let offset = chunk_idx * chunk_size;
let end = std::cmp::min(offset + chunk_size, file_size);
let len = end - offset;

let ref_chunk = &ref_mmap[offset..end];
let corrupt_chunk = &corrupt_mmap[offset..end];

if ref_chunk != corrupt_chunk {
let corruption = Corruption {
offset: offset as u64,
length: len as u64,
};

// Try to merge with the last corruption in this thread's accumulator
if let Some(last) = acc.last_mut() {
if last.offset + last.length == corruption.offset {
last.length += corruption.length;
} else {
acc.push(corruption);
}
} else {
acc.push(corruption);
}
}

acc
},
)
.reduce(Vec::new, |mut a, b| {
// Merge two vectors of corruptions
if a.is_empty() {
return b;
}
if b.is_empty() {
return a;
}

// Check if we can merge the last of 'a' with the first of 'b'
let last_a = a.last_mut().unwrap();
let mut b_iter = b.into_iter();
let first_b = b_iter.next().unwrap();

if last_a.offset + last_a.length == first_b.offset {
last_a.length += first_b.length;
} else {
a.push(first_b);
}

a.extend(b_iter);
a
});

corruptions
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -92,10 +173,39 @@ mod tests {
"Middle corruption offset"
);
assert_eq!(corruptions[25].length, 4096, "Middle corruption length");
assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset");
assert_eq!(corruptions[49].length, 5120, "Last corruption length");
}

#[test]
fn test_find_corruptions_parallel() {
let corruptions = find_corruptions_parallel("reference.bin", "corrupted.bin", 1024);

assert_eq!(corruptions.len(), 50, "Should find 50 corruptions");

// All corruptions should be 1KB aligned
for corruption in &corruptions {
assert_eq!(
corruption.offset % 1024,
0,
"Corruption offset should be 1KB aligned"
);
assert_eq!(
corruption.length % 1024,
0,
"Corruption length should be multiple of 1KB"
);
}

// Check specific corruptions
assert_eq!(corruptions[0].offset, 14801920, "First corruption offset");
assert_eq!(corruptions[0].length, 2048, "First corruption length");
assert_eq!(
corruptions[49].offset, 507871232,
"Last corruption offset"
corruptions[25].offset, 243891200,
"Middle corruption offset"
);
assert_eq!(corruptions[25].length, 4096, "Middle corruption length");
assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset");
assert_eq!(corruptions[49].length, 5120, "Last corruption length");
}
}
92 changes: 88 additions & 4 deletions src/dna_matcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,97 @@
use rayon::prelude::*;
use memchr::memmem;

pub fn exported_dna_matcher(genome: &str, pattern: &str) -> Vec<String> {
chunked_dna_matcher(genome, pattern)
}

/// Naive approach: Read the entire file as a string and filter lines
pub fn naive_dna_matcher(genome: &str, pattern: &str) -> Vec<String> {
fn naive_dna_matcher(genome: &str, pattern: &str) -> Vec<String> {
genome
.lines()
.par_lines()
.filter(|line| !line.starts_with('>')) // Skip headers
.filter(|line| line.contains(pattern))
.map(|s| s.to_string())
.collect()
}

/// Chunked approach: Process genome in parallel byte chunks
fn chunked_dna_matcher(genome: &str, pattern: &str) -> Vec<String> {
let pattern_bytes = pattern.as_bytes();
let genome_bytes = genome.as_bytes();
let finder = memmem::Finder::new(pattern_bytes);

// Chunk size: balance between parallelism and overhead
// Aim for ~1000 lines per chunk, with typical DNA line length of 60-80 chars
let chunk_size = 64 * 1024; // 64KB per chunk
let total_len = genome_bytes.len();

// Find chunk boundaries that align with line boundaries
let mut chunk_starts = vec![0];
let mut pos = chunk_size;

while pos < total_len {
// Find the next newline after pos
let search_start = pos;
let search_end = std::cmp::min(pos + 1024, total_len); // Look ahead up to 1KB for newline

if let Some(newline_offset) = memchr::memchr(b'\n', &genome_bytes[search_start..search_end]) {
chunk_starts.push(search_start + newline_offset + 1);
pos = search_start + newline_offset + 1 + chunk_size;
} else {
// No newline found, just use the current position
chunk_starts.push(pos);
pos += chunk_size;
}
}
chunk_starts.push(total_len);

// Process chunks in parallel
let matches: Vec<String> = (0..chunk_starts.len() - 1)
.into_par_iter()
.flat_map(|i| {
let chunk_start = chunk_starts[i];
let chunk_end = chunk_starts[i + 1];
let chunk = &genome_bytes[chunk_start..chunk_end];

let mut local_matches = Vec::new();
let mut line_start = 0;

// Use memchr_iter for faster newline finding
for newline_pos in memchr::memchr_iter(b'\n', chunk) {
let line = &chunk[line_start..newline_pos];
line_start = newline_pos + 1;

// Skip headers and empty lines
if !line.is_empty() && line[0] != b'>' {
// Use memmem for fast substring search
if finder.find(line).is_some() {
// SAFETY: DNA sequences are ASCII-only, so we can skip UTF-8 validation
let line_str = unsafe { std::str::from_utf8_unchecked(line) };
local_matches.push(line_str.to_string());
}
}
}

// Handle last line if chunk doesn't end with newline
if line_start < chunk.len() {
let line = &chunk[line_start..];
if !line.is_empty() && line[0] != b'>' {
if finder.find(line).is_some() {
// SAFETY: DNA sequences are ASCII-only, so we can skip UTF-8 validation
let line_str = unsafe { std::str::from_utf8_unchecked(line) };
local_matches.push(line_str.to_string());
}
}
}

local_matches
})
.collect();

matches
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -16,7 +100,7 @@ mod tests {
fn test_naive_matcher() {
let test_genome = ">seq1\nACGTACGT\n>seq2\nAGTCCGTAAA\n>seq3\nGGGGGG";
let pattern = "AGTCCGTA";
let matches = naive_dna_matcher(test_genome, pattern);
let matches = exported_dna_matcher(test_genome, pattern);
assert_eq!(matches.len(), 1);
assert_eq!(matches[0], "AGTCCGTAAA");
}
Expand All @@ -28,7 +112,7 @@ mod tests {
.expect("Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'");
let pattern = "AGTCCGTA";

let matches = naive_dna_matcher(&genome, pattern);
let matches = exported_dna_matcher(&genome, pattern);

// With fixed seed (42), we should always get exactly 4927 matches
assert_eq!(
Expand Down
Loading