diff --git a/.gitignore b/.gitignore index 3ab5f4610..4bfed8322 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/ignored +*/ignored /.env /.docker.env /src/web/badge/Cargo.lock diff --git a/Cargo.lock b/Cargo.lock index a5d183fd6..3a546d686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2272,8 +2272,10 @@ dependencies = [ "aws-config", "aws-sdk-s3", "aws-smithy-types-convert", + "base64", "bzip2", "chrono", + "crc32fast", "criterion", "dashmap", "docs_rs_config", diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index 9d949b8da..49e00d750 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -8,7 +8,7 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::{fs, io}; -use tracing::{debug, info, instrument}; +use tracing::{debug, info, info_span, instrument}; /// repackage old rustdoc / source content. /// @@ -108,7 +108,6 @@ pub async fn repackage( /// repackage contents of a S3 path prefix into a single archive file. /// /// Not performance optimized, for now it just tries to be simple. -#[instrument(skip(storage))] async fn repackage_path( storage: &AsyncStorage, prefix: &str, @@ -116,6 +115,8 @@ async fn repackage_path( ) -> Result, CompressionAlgorithm)>> { const DOWNLOAD_CONCURRENCY: usize = 8; + let _span = info_span!("repackage_path", %prefix, %target_archive).entered(); + info!("repackage path"); let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?; let tempdir_path = tempdir.path().to_path_buf(); diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index b7b6380aa..8a22eb248 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -23,8 +23,10 @@ async-stream = { workspace = true } aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } +base64 = { workspace = true } bzip2 = "0.6.0" chrono = { workspace = true } +crc32fast = "1.4.2" dashmap = { version = "6.0.0", optional = true } docs_rs_config = { path = "../docs_rs_config" } docs_rs_env_vars = { path = "../docs_rs_env_vars" } @@ -71,5 +73,9 @@ name = "archive_index_cache" harness = false required-features = ["testing"] +[[bench]] +name = "crc32" +harness = false + [lints] workspace = true diff --git a/crates/lib/docs_rs_storage/benches/crc32.rs b/crates/lib/docs_rs_storage/benches/crc32.rs new file mode 100644 index 000000000..12f3aa965 --- /dev/null +++ b/crates/lib/docs_rs_storage/benches/crc32.rs @@ -0,0 +1,20 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use docs_rs_storage::crc32_for_path; +use std::{fs, hint::black_box}; + +pub fn crc32_file(c: &mut Criterion) { + let fixture_path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + + let fixture = vec![b'x'; 16 * 1024 * 1024]; + fs::write(&fixture_path, &fixture).unwrap(); + + let mut group = c.benchmark_group("crc32"); + group.throughput(Throughput::Bytes(fixture.len() as u64)); + group.bench_function("file_16mib", |b| { + b.iter(|| crc32_for_path(black_box(&fixture_path)).unwrap()); + }); + group.finish(); +} + +criterion_group!(crc32_benches, crc32_file); +criterion_main!(crc32_benches); diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 334fb96c6..3e3b54621 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -830,7 +830,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::blob::StreamingBlob; + use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE}; use chrono::Utc; use docs_rs_config::AppConfig as _; use docs_rs_opentelemetry::testing::TestMetrics; @@ -839,14 +839,15 @@ mod tests { use zip::write::SimpleFileOptions; async fn create_test_archive(file_count: u32) -> Result { - spawn_blocking(move || { + let writer = spawn_blocking(move || { use std::io::Write as _; let tf = tempfile::tempfile()?; let objectcontent: Vec = (0..255).collect(); - let mut archive = zip::ZipWriter::new(tf); + let mut archive = + zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf)); for i in 0..file_count { archive.start_file( format!("testfile{i}"), @@ -858,8 +859,9 @@ mod tests { } Ok(archive.finish()?) }) - .await - .map(fs::File::from_std) + .await?; + + Ok(fs::File::from_std(writer.into_inner()?)) } struct FakeDownloader { diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index 385cd7f3f..818c1e4f4 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -1,16 +1,18 @@ use crate::{ Blob, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamUploadSource, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, }; use anyhow::{Result, anyhow}; +use chrono::Utc; use dashmap::DashMap; use docs_rs_headers::compute_etag; use futures_util::stream::{self, BoxStream}; use itertools::Itertools as _; +use tokio::fs; pub(crate) struct MemoryBackend { otel_metrics: StorageMetrics, @@ -46,16 +48,30 @@ impl StorageBackendMethods for MemoryBackend { Ok(blob.into()) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - self.otel_metrics - .uploaded_files - .add(batch.len() as u64, &[]); + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; - for upload in batch { - let blob: Blob = upload.into(); - self.objects.insert(blob.path.clone(), blob); - } + let content = match source { + StreamUploadSource::Bytes(content) => content.to_vec(), + StreamUploadSource::File(path) => fs::read(&path).await?, + }; + + let blob = Blob { + path, + mime, + date_updated: Utc::now(), + etag: Some(compute_etag(&content)), + content, + compression, + }; + self.otel_metrics.uploaded_files.add(1, &[]); + self.objects.insert(blob.path.clone(), blob); Ok(()) } diff --git a/crates/lib/docs_rs_storage/src/backends/mod.rs b/crates/lib/docs_rs_storage/src/backends/mod.rs index a257f8de3..effb27dc5 100644 --- a/crates/lib/docs_rs_storage/src/backends/mod.rs +++ b/crates/lib/docs_rs_storage/src/backends/mod.rs @@ -2,14 +2,14 @@ pub(crate) mod memory; pub(crate) mod s3; -use crate::{BlobUpload, StreamingBlob, types::FileRange}; +use crate::{StreamingBlob, blob::StreamUpload, types::FileRange}; use anyhow::Result; use futures_util::stream::BoxStream; pub(crate) trait StorageBackendMethods { async fn exists(&self, path: &str) -> Result; async fn get_stream(&self, path: &str, range: Option) -> Result; - async fn store_batch(&self, batch: Vec) -> Result<()>; + async fn upload_stream(&self, upload: StreamUpload) -> Result<()>; async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result>; async fn delete_prefix(&self, prefix: &str) -> Result<()>; } @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend { call_inner!(self, get_stream(path, range)) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - call_inner!(self, store_batch(batch)) + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + call_inner!(self, upload_stream(upload)) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 93f7a6c67..095949cfb 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -1,7 +1,8 @@ use crate::{ Config, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamUploadSource, StreamingBlob}, + crc32_for_path, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -13,15 +14,18 @@ use aws_sdk_s3::{ Client, config::{Region, retry::RetryConfig}, error::{ProvideErrorMetadata, SdkError}, - types::{Delete, ObjectIdentifier}, + primitives::{ByteStream, Length}, + types::{ChecksumAlgorithm, Delete, ObjectIdentifier}, }; use aws_smithy_types_convert::date_time::DateTimeExt; +use base64::{Engine as _, engine::general_purpose::STANDARD as b64}; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; -use futures_util::{ - future::TryFutureExt, - stream::{BoxStream, FuturesUnordered, StreamExt}, -}; +use docs_rs_utils::spawn_blocking; +use futures_util::stream::{BoxStream, StreamExt}; +use opentelemetry::KeyValue; +use std::time::Duration; +use tokio::{fs, time::sleep}; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -41,6 +45,8 @@ static NOT_FOUND_ERROR_CODES: [&str; 5] = [ "XMinioInvalidObjectName", ]; +const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + trait S3ResultExt { fn convert_errors(self) -> anyhow::Result; } @@ -230,45 +236,94 @@ impl StorageBackendMethods for S3Backend { }) } - async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { - // Attempt to upload the batch 3 times - for _ in 0..3 { - let mut futures = FuturesUnordered::new(); - for blob in batch.drain(..) { - futures.push( - self.client - .put_object() - .bucket(&self.bucket) - .key(&blob.path) - .body(blob.content.clone().into()) - .content_type(blob.mime.to_string()) - .set_content_encoding(blob.compression.map(|alg| alg.to_string())) - .send() - .map_ok(|_| { - self.otel_metrics.uploaded_files.add(1, &[]); - }) - .map_err(|err| { - warn!(?err, "Failed to upload blob to S3"); - // Reintroduce failed blobs for a retry - blob - }), - ); + async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; + + let (content_length, checksum_crc32) = match &source { + StreamUploadSource::Bytes(bytes) => (bytes.len() as u64, None), + StreamUploadSource::File(local_path) => { + let local_path = local_path.clone(); + + ( + fs::metadata(&local_path).await?.len(), + Some( + spawn_blocking(move || Ok(b64.encode(crc32_for_path(local_path)?))).await?, + ), + ) } + }; - while let Some(result) = futures.next().await { - // Push each failed blob back into the batch - if let Err(blob) = result { - batch.push(blob); + let mut last_err = None; + + for attempt in 1..=3 { + let body = match &source { + StreamUploadSource::Bytes(bytes) => ByteStream::from(bytes.clone()), + StreamUploadSource::File(path) => { + // NOTE: + // reading the upload-data from a local path is + // "retryable" in the AWS SDK sense. + // ".file" (file pointer) is not retryable. + ByteStream::read_from() + .path(path) + .buffer_size(S3_UPLOAD_BUFFER_SIZE) + .length(Length::Exact(content_length)) + .build() + .await? } + }; + + let mut request = self + .client + .put_object() + .bucket(&self.bucket) + .key(&path) + .body(body) + .content_length(content_length as i64) + .content_type(mime.to_string()) + .set_content_encoding(compression.map(|alg| alg.to_string())); + + // NOTE: when you try to stream-upload a local file, the AWS SDK by default + // uses a "middleware" to calculate the checksum for the content, to compare it after + // uploading. + // This piece is broken right now, but only when using S3 directly. On minio, all is + // fine. + // I don't want to disable checksums so we're sure the files are uploaded correctly. + // So the only alternative (outside of trying to fix the SDK) is to calculate the + // checksum ourselves. This is a little annoying because this means we have to read the + // whole file before upload. But since I don't want to load all files into memory before + // upload, this is the only option. + if let Some(checksum_crc32) = &checksum_crc32 { + request = request + .checksum_algorithm(ChecksumAlgorithm::Crc32) + .checksum_crc32(checksum_crc32); } - // If we uploaded everything in the batch, we're done - if batch.is_empty() { - return Ok(()); + match request.send().await { + Ok(_) => { + self.otel_metrics + .uploaded_files + .add(1, &[KeyValue::new("attempt", attempt.to_string())]); + return Ok(()); + } + Err(err) => { + warn!(?err, attempt, %path, "failed to upload blob to S3"); + last_err = Some(err); + + if attempt < 3 { + sleep(Duration::from_millis(10 * 2u64.pow(attempt))).await; + } + } } } - panic!("failed to upload 3 times, exiting"); + Err(last_err + .expect("upload retry loop exited without a result") + .into()) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/blob.rs b/crates/lib/docs_rs_storage/src/blob.rs index 4dd30c473..f6364ff41 100644 --- a/crates/lib/docs_rs_storage/src/blob.rs +++ b/crates/lib/docs_rs_storage/src/blob.rs @@ -4,8 +4,40 @@ use chrono::{DateTime, Utc}; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; use mime::Mime; -use std::io; -use tokio::io::{AsyncBufRead, AsyncBufReadExt}; +use std::{fmt, io::Cursor, path::PathBuf}; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt}; +use tokio_util::bytes::Bytes; + +pub enum StreamUploadSource { + Bytes(Bytes), + File(PathBuf), +} + +/// Represents a stream blob to be uploaded to storage. +/// +/// NOTE: Right now we only support uploads where the size is known in advance. +/// We can add support for streams with unknown size, but this would mean +/// using an intermediate fixed-size buffer and multipart uploads for these cases. +/// But: the multipart machinery is only worth the complexity if the stream is: +/// - unknown size +/// - bigger (there's a 5 MiB size limit for each part) +pub struct StreamUpload { + pub path: String, + pub mime: Mime, + pub source: StreamUploadSource, + pub compression: Option, +} + +impl From for StreamUpload { + fn from(value: BlobUpload) -> Self { + Self { + path: value.path, + mime: value.mime, + source: StreamUploadSource::Bytes(value.content.into()), + compression: value.compression, + } + } +} /// represents a blob to be uploaded to storage. #[derive(Clone, Debug, PartialEq, Eq)] @@ -60,7 +92,7 @@ pub struct StreamingBlob { pub content: Box, } -impl std::fmt::Debug for StreamingBlob { +impl fmt::Debug for StreamingBlob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StreamingBlob") .field("path", &self.path) @@ -110,7 +142,7 @@ impl StreamingBlob { let mut content = SizedBuffer::new(max_size); content.reserve(self.content_length); - tokio::io::copy(&mut self.content, &mut content).await?; + io::copy(&mut self.content, &mut content).await?; Ok(Blob { path: self.path, @@ -132,7 +164,7 @@ impl From for StreamingBlob { etag: value.etag, compression: value.compression, content_length: value.content.len(), - content: Box::new(io::Cursor::new(value.content)), + content: Box::new(Cursor::new(value.content)), } } } @@ -157,7 +189,7 @@ mod test { compression: alg, etag: Some(compute_etag(&content)), content_length: content.len(), - content: Box::new(io::Cursor::new(content)), + content: Box::new(Cursor::new(content)), } } @@ -226,7 +258,7 @@ mod test { let mut compressed_content = Vec::new(); let alg = CompressionAlgorithm::Zstd; compress_async( - &mut io::Cursor::new(CONTENT.to_vec()), + &mut Cursor::new(CONTENT.to_vec()), &mut compressed_content, alg, ) diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index af2ccdd8b..9f1bc9f14 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -75,8 +75,6 @@ impl AppConfig for ArchiveIndexCacheConfig { #[derive(Debug)] pub struct Config { - pub temp_dir: PathBuf, - // Storage params pub storage_backend: StorageKind, @@ -101,19 +99,15 @@ pub struct Config { // config for the local archive index cache pub archive_index_cache: Arc, - // How much we want to parallelize local filesystem logic. - // For pure I/O this could be quite high (32/64), but - // we often also add compression on top of it, which is CPU-bound, - // even when just light / simpler compression. - pub local_filesystem_parallelism: usize, + // How much we want to parallelize file uploads / downloads. + pub network_parallelism: usize, } impl AppConfig for Config { fn from_environment() -> anyhow::Result { - let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; + let cores = std::thread::available_parallelism()?.get(); Ok(Self { - temp_dir: prefix.join("tmp"), storage_backend: env("DOCSRS_STORAGE_BACKEND", StorageKind::default())?, aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?, s3_bucket: env("DOCSRS_S3_BUCKET", "rust-docs-rs".to_string())?, @@ -124,10 +118,7 @@ impl AppConfig for Config { max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] s3_bucket_is_temporary: false, - local_filesystem_parallelism: env( - "DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", - std::thread::available_parallelism()?.get(), - )?, + network_parallelism: env("DOCSRS_NETWORK_PARALLELISM", 8usize.min(cores))?, }) } diff --git a/crates/lib/docs_rs_storage/src/lib.rs b/crates/lib/docs_rs_storage/src/lib.rs index 3c70b88b1..92d9bb185 100644 --- a/crates/lib/docs_rs_storage/src/lib.rs +++ b/crates/lib/docs_rs_storage/src/lib.rs @@ -22,6 +22,7 @@ pub use storage::blocking::Storage; pub use storage::non_blocking::AsyncStorage; pub use types::StorageKind; pub use utils::{ + crc32::crc32_for_path, file_list::get_file_list, storage_path::{rustdoc_archive_path, rustdoc_json_path, source_archive_path}, }; diff --git a/crates/lib/docs_rs_storage/src/storage/blocking.rs b/crates/lib/docs_rs_storage/src/storage/blocking.rs index bd4c5a152..59c28caaa 100644 --- a/crates/lib/docs_rs_storage/src/storage/blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/blocking.rs @@ -1,4 +1,4 @@ -use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage, types::FileRange}; +use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage}; use anyhow::Result; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use std::{fmt, path::Path, sync::Arc}; @@ -10,7 +10,6 @@ pub struct Storage { runtime: runtime::Handle, } -#[allow(dead_code)] impl Storage { pub fn new(inner: Arc, runtime: runtime::Handle) -> Self { Self { inner, runtime } @@ -70,17 +69,6 @@ impl Storage { self.runtime.block_on(self.inner.get(path, max_size)) } - pub(crate) fn get_range( - &self, - path: &str, - max_size: usize, - range: FileRange, - compression: Option, - ) -> Result { - self.runtime - .block_on(self.inner.get_range(path, max_size, range, compression)) - } - pub fn get_from_archive( &self, archive_path: &str, @@ -140,17 +128,6 @@ impl Storage { self.runtime.block_on(self.inner.store_one(path, content)) } - // Store file into the backend at the given path (also used to detect mime type), returns the - // chosen compression algorithm - pub fn store_path( - &self, - target_path: impl Into + std::fmt::Debug, - source_path: impl AsRef + std::fmt::Debug, - ) -> Result { - self.runtime - .block_on(self.inner.store_path(target_path, source_path)) - } - /// sync wrapper for the list_prefix function /// purely for testing purposes since it collects all files into a Vec. #[cfg(feature = "testing")] diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index c7cc48bd0..ba438e72a 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -4,7 +4,7 @@ use crate::{ Config, archive_index::{self, ARCHIVE_INDEX_FILE_EXTENSION}, backends::{StorageBackend, StorageBackendMethods, s3::S3Backend}, - blob::{Blob, BlobUpload, StreamingBlob}, + blob::{Blob, StreamUpload, StreamUploadSource, StreamingBlob}, compression::{compress, compress_async}, errors::PathNotFoundError, file::FileEntry, @@ -21,10 +21,19 @@ use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{fmt, path::Path, pin::Pin, sync::Arc}; -use tokio::{fs, io}; +use std::{ + fmt, + io::{Cursor, Write as _}, + path::Path, + pin::Pin, + sync::Arc, +}; +use tokio::{fs, io, io::AsyncWriteExt as _}; use tracing::{info_span, instrument, trace, warn}; +/// buffer size when writing zip files. +pub(crate) const ZIP_BUFFER_SIZE: usize = 1024 * 1024; + pub struct AsyncStorage { backend: StorageBackend, config: Arc, @@ -181,8 +190,7 @@ impl AsyncStorage { Ok(self.get_raw_stream(path).await?.decompress().await?) } - /// get, decompress and materialize part of an object from store - #[instrument(skip(self))] + #[cfg(test)] pub(crate) async fn get_range( &self, path: &str, @@ -296,11 +304,18 @@ impl AsyncStorage { root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); - let (zip_content, file_paths) = + + // Keep the TempPath guards alive until after both uploads complete; dropping them earlier + // would delete the files while S3 is still reading from them. + let zip_temp_path = tempfile::NamedTempFile::new()?.into_temp_path(); + let zip_path = zip_temp_path.to_path_buf(); + + let file_paths = spawn_blocking({ use std::{io, fs}; let archive_path = archive_path.to_owned(); let root_dir = root_dir.to_owned(); + let zip_path = zip_path.clone(); move || { let mut file_paths = Vec::new(); @@ -316,7 +331,8 @@ impl AsyncStorage { // also has to be added as supported algorithm for storage compression, together // with a mapping in `storage::archive_index::Index::new_from_zip`. - let zip_content = { + + { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -324,7 +340,9 @@ impl AsyncStorage { .compression_method(zip::CompressionMethod::Bzip2) .compression_level(Some(3)); - let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new())); + // rustdoc archives can become a couple of GiB big, so we better use a tempfile. + let zip_file = fs::File::create(&zip_path)?; + let mut zip = zip::ZipWriter::new(io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, zip_file)); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -334,56 +352,58 @@ impl AsyncStorage { file_paths.push(FileEntry{path: file_path, size: file.metadata()?.len()}); } - zip.finish()?.into_inner() - }; + let mut zip_file = zip.finish()?.into_inner()?; + zip_file.flush()?; + } - Ok(( - zip_content, - file_paths - )) + Ok(file_paths) } }) .await?; let alg = CompressionAlgorithm::default(); let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); - let (zip_content, compressed_index_content) = { + let compressed_index_temp_path = tempfile::NamedTempFile::new()?.into_temp_path(); + let compressed_index_path = compressed_index_temp_path.to_path_buf(); + { let _span = info_span!("create_archive_index", %remote_index_path).entered(); - fs::create_dir_all(&self.config.temp_dir).await?; - let local_index_path = - tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); + let local_index_path = tempfile::NamedTempFile::new()?.into_temp_path(); - let zip_cursor = - archive_index::create(std::io::Cursor::new(zip_content), &local_index_path).await?; - let zip_content = zip_cursor.into_inner(); - - let mut buf: Vec = Vec::new(); - compress_async( - &mut io::BufReader::new(fs::File::open(&local_index_path).await?), - &mut buf, - alg, + archive_index::create( + io::BufReader::new(fs::File::open(&zip_path).await?), + &local_index_path, ) .await?; - (zip_content, buf) - }; - self.backend - .store_batch(vec![ - BlobUpload { - path: archive_path.to_string(), - mime: mimes::APPLICATION_ZIP.clone(), - content: zip_content, - compression: None, - }, - BlobUpload { - path: remote_index_path, - mime: mime::APPLICATION_OCTET_STREAM, - content: compressed_index_content, - compression: Some(alg), - }, - ]) - .await?; + // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. + let mut compressed_index_file = fs::File::create(&compressed_index_path).await?; + { + let mut compressed_index_writer = io::BufWriter::new(&mut compressed_index_file); + compress_async( + &mut io::BufReader::new(fs::File::open(&local_index_path).await?), + &mut compressed_index_writer, + alg, + ) + .await?; + compressed_index_writer.flush().await?; + } + } + + tokio::try_join!( + self.backend.upload_stream(StreamUpload { + path: archive_path.to_string(), + mime: mimes::APPLICATION_ZIP.clone(), + source: StreamUploadSource::File(zip_path), + compression: None, + }), + self.backend.upload_stream(StreamUpload { + path: remote_index_path, + mime: mime::APPLICATION_OCTET_STREAM, + source: StreamUploadSource::File(compressed_index_path), + compression: Some(alg), + }) + )?; Ok((file_paths, CompressionAlgorithm::Bzip2)) } @@ -395,65 +415,64 @@ impl AsyncStorage { prefix: impl AsRef + fmt::Debug, root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { - let prefix = prefix.as_ref(); + let prefix = prefix.as_ref().to_path_buf(); let root_dir = root_dir.as_ref(); let alg = CompressionAlgorithm::default(); - let (file_paths_and_mimes, blobs): (Vec<_>, Vec<_>) = walk_dir_recursive(&root_dir) + let file_paths_and_mimes: Vec<_> = walk_dir_recursive(&root_dir) .err_into::() - .map_ok(|item| async move { - // Some files have insufficient permissions - // (like .lock file created by cargo in documentation directory). - // Skip these files. - let Ok(file) = fs::File::open(&item).await else { - return Ok(None); - }; - - let content = { - let mut buf: Vec = Vec::new(); - compress_async(io::BufReader::new(file), &mut buf, alg).await?; - buf - }; - - let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); - - let file_size = item.metadata.len(); - - let file_info = FileEntry { - path: item.relative.clone(), - size: file_size, - }; - let mime = file_info.mime(); - - Ok(Some(( - file_info, - BlobUpload { - path: bucket_path, - mime, - content, - compression: Some(alg), - }, - ))) + .map_ok(|item| { + let prefix = prefix.clone(); + async move { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + let Ok(file) = fs::File::open(&item).await else { + return Ok(None); + }; + + let content = { + let mut buf: Vec = Vec::new(); + compress_async(io::BufReader::new(file), &mut buf, alg).await?; + buf + }; + + let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); + + let file_size = item.metadata.len(); + + let file_info = FileEntry { + path: item.relative.clone(), + size: file_size, + }; + let mime = file_info.mime().clone(); + + self.backend + .upload_stream(StreamUpload { + path: bucket_path, + mime, + source: StreamUploadSource::Bytes(content.into()), + compression: Some(alg), + }) + .await?; + + Ok(Some(file_info)) + } }) - .try_buffer_unordered(self.config.local_filesystem_parallelism) + .try_buffer_unordered(self.config.network_parallelism) .try_filter_map(|item| future::ready(Ok(item))) - .try_fold( - (Vec::new(), Vec::new()), - |(mut file_paths_and_mimes, mut blobs), (file_info, blob)| async move { - file_paths_and_mimes.push(file_info); - blobs.push(blob); - Ok((file_paths_and_mimes, blobs)) - }, - ) + .try_collect() .await?; - self.backend.store_batch(blobs).await?; Ok((file_paths_and_mimes, alg)) } #[cfg(test)] - pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { - self.backend.store_batch(blobs).await + pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { + for blob in blobs { + self.backend.upload_stream(blob.into()).await?; + } + Ok(()) } // Store file into the backend at the given path, uncompressed. @@ -469,12 +488,12 @@ impl AsyncStorage { let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: None, - }]) + }) .await?; Ok(()) @@ -489,54 +508,17 @@ impl AsyncStorage { content: impl Into>, ) -> Result { let path = path.into(); - let content = content.into(); let alg = CompressionAlgorithm::default(); - let content = compress(&*content, alg)?; + let content = compress(Cursor::new(content.into()), alg)?; let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, - compression: Some(alg), - }]) - .await?; - - Ok(alg) - } - - #[instrument(skip(self))] - pub async fn store_path( - &self, - target_path: impl Into + fmt::Debug, - source_path: impl AsRef + fmt::Debug, - ) -> Result { - let target_path = target_path.into(); - let source_path = source_path.as_ref(); - - let alg = CompressionAlgorithm::default(); - - let content = { - let mut buf: Vec = Vec::new(); - compress_async( - io::BufReader::new(fs::File::open(source_path).await?), - &mut buf, - alg, - ) - .await?; - buf - }; - - let mime = detect_mime(&target_path).to_owned(); - - self.backend - .store_batch(vec![BlobUpload { - path: target_path, - mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: Some(alg), - }]) + }) .await?; Ok(alg) @@ -593,6 +575,7 @@ impl fmt::Debug for AsyncStorage { #[cfg(test)] mod backend_tests { use super::*; + use crate::blob::BlobUpload; use crate::{PathNotFoundError, errors::SizeLimitReached}; use docs_rs_headers::compute_etag; use docs_rs_opentelemetry::testing::TestMetrics; diff --git a/crates/lib/docs_rs_storage/src/utils/crc32.rs b/crates/lib/docs_rs_storage/src/utils/crc32.rs new file mode 100644 index 000000000..815b22b93 --- /dev/null +++ b/crates/lib/docs_rs_storage/src/utils/crc32.rs @@ -0,0 +1,24 @@ +use crc32fast::Hasher; +use std::{ + fs, + io::{self, Read as _}, + path::Path, +}; + +pub fn crc32_for_path(path: impl AsRef) -> Result<[u8; 4], io::Error> { + let path = path.as_ref(); + + let mut file = fs::File::open(path)?; + let mut hasher = Hasher::new(); + let mut buffer = [0; 256 * 1024]; + + loop { + let read = file.read(&mut buffer)?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + } + + Ok(hasher.finalize().to_be_bytes()) +} diff --git a/crates/lib/docs_rs_storage/src/utils/mod.rs b/crates/lib/docs_rs_storage/src/utils/mod.rs index 65bab4072..2a1258aad 100644 --- a/crates/lib/docs_rs_storage/src/utils/mod.rs +++ b/crates/lib/docs_rs_storage/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod crc32; pub(crate) mod file_list; pub(crate) mod sized_buffer; pub(crate) mod storage_path;