Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/ignored
*/ignored
/.env
/.docker.env
/src/web/badge/Cargo.lock
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/bin/docs_rs_admin/src/repackage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{fs, io};
use tracing::{debug, info, instrument};
use tracing::{debug, info, info_span, instrument};

/// repackage old rustdoc / source content.
///
Expand Down Expand Up @@ -108,14 +108,15 @@ pub async fn repackage(
/// repackage contents of a S3 path prefix into a single archive file.
///
/// Not performance optimized, for now it just tries to be simple.
#[instrument(skip(storage))]
async fn repackage_path(
storage: &AsyncStorage,
prefix: &str,
target_archive: &str,
) -> Result<Option<(Vec<FileEntry>, CompressionAlgorithm)>> {
const DOWNLOAD_CONCURRENCY: usize = 8;

let _span = info_span!("repackage_path", %prefix, %target_archive).entered();

info!("repackage path");
let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?;
let tempdir_path = tempdir.path().to_path_buf();
Expand Down
6 changes: 6 additions & 0 deletions crates/lib/docs_rs_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ async-stream = { workspace = true }
aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] }
aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] }
aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] }
base64 = { workspace = true }
bzip2 = "0.6.0"
chrono = { workspace = true }
crc32fast = "1.4.2"
dashmap = { version = "6.0.0", optional = true }
docs_rs_config = { path = "../docs_rs_config" }
docs_rs_env_vars = { path = "../docs_rs_env_vars" }
Expand Down Expand Up @@ -71,5 +73,9 @@ name = "archive_index_cache"
harness = false
required-features = ["testing"]

[[bench]]
name = "crc32"
harness = false

[lints]
workspace = true
20 changes: 20 additions & 0 deletions crates/lib/docs_rs_storage/benches/crc32.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
use docs_rs_storage::crc32_for_path;
use std::{fs, hint::black_box};

pub fn crc32_file(c: &mut Criterion) {
let fixture_path = tempfile::NamedTempFile::new().unwrap().into_temp_path();

let fixture = vec![b'x'; 16 * 1024 * 1024];
fs::write(&fixture_path, &fixture).unwrap();

let mut group = c.benchmark_group("crc32");
group.throughput(Throughput::Bytes(fixture.len() as u64));
group.bench_function("file_16mib", |b| {
b.iter(|| crc32_for_path(black_box(&fixture_path)).unwrap());
});
group.finish();
}

criterion_group!(crc32_benches, crc32_file);
criterion_main!(crc32_benches);
12 changes: 7 additions & 5 deletions crates/lib/docs_rs_storage/src/archive_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::blob::StreamingBlob;
use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE};
use chrono::Utc;
use docs_rs_config::AppConfig as _;
use docs_rs_opentelemetry::testing::TestMetrics;
Expand All @@ -839,14 +839,15 @@ mod tests {
use zip::write::SimpleFileOptions;

async fn create_test_archive(file_count: u32) -> Result<fs::File> {
spawn_blocking(move || {
let writer = spawn_blocking(move || {
use std::io::Write as _;

let tf = tempfile::tempfile()?;

let objectcontent: Vec<u8> = (0..255).collect();

let mut archive = zip::ZipWriter::new(tf);
let mut archive =
zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf));
for i in 0..file_count {
archive.start_file(
format!("testfile{i}"),
Expand All @@ -858,8 +859,9 @@ mod tests {
}
Ok(archive.finish()?)
})
.await
.map(fs::File::from_std)
.await?;

Ok(fs::File::from_std(writer.into_inner()?))
}

struct FakeDownloader {
Expand Down
34 changes: 25 additions & 9 deletions crates/lib/docs_rs_storage/src/backends/memory.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::{
Blob,
backends::StorageBackendMethods,
blob::{BlobUpload, StreamingBlob},
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
errors::PathNotFoundError,
metrics::StorageMetrics,
types::FileRange,
};
use anyhow::{Result, anyhow};
use chrono::Utc;
use dashmap::DashMap;
use docs_rs_headers::compute_etag;
use futures_util::stream::{self, BoxStream};
use itertools::Itertools as _;
use tokio::fs;

pub(crate) struct MemoryBackend {
otel_metrics: StorageMetrics,
Expand Down Expand Up @@ -46,16 +48,30 @@ impl StorageBackendMethods for MemoryBackend {
Ok(blob.into())
}

async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()> {
self.otel_metrics
.uploaded_files
.add(batch.len() as u64, &[]);
async fn upload_stream(&self, upload: StreamUpload) -> Result<()> {
let StreamUpload {
path,
mime,
source,
compression,
} = upload;

for upload in batch {
let blob: Blob = upload.into();
self.objects.insert(blob.path.clone(), blob);
}
let content = match source {
StreamUploadSource::Bytes(content) => content.to_vec(),
StreamUploadSource::File(path) => fs::read(&path).await?,
};

let blob = Blob {
path,
mime,
date_updated: Utc::now(),
etag: Some(compute_etag(&content)),
content,
compression,
};

self.otel_metrics.uploaded_files.add(1, &[]);
self.objects.insert(blob.path.clone(), blob);
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions crates/lib/docs_rs_storage/src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
pub(crate) mod memory;
pub(crate) mod s3;

use crate::{BlobUpload, StreamingBlob, types::FileRange};
use crate::{StreamingBlob, blob::StreamUpload, types::FileRange};
use anyhow::Result;
use futures_util::stream::BoxStream;

pub(crate) trait StorageBackendMethods {
async fn exists(&self, path: &str) -> Result<bool>;
async fn get_stream(&self, path: &str, range: Option<FileRange>) -> Result<StreamingBlob>;
async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()>;
async fn upload_stream(&self, upload: StreamUpload) -> Result<()>;
async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String>>;
async fn delete_prefix(&self, prefix: &str) -> Result<()>;
}
Expand Down Expand Up @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend {
call_inner!(self, get_stream(path, range))
}

async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()> {
call_inner!(self, store_batch(batch))
async fn upload_stream(&self, upload: StreamUpload) -> Result<()> {
call_inner!(self, upload_stream(upload))
}

async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String>> {
Expand Down
129 changes: 92 additions & 37 deletions crates/lib/docs_rs_storage/src/backends/s3.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
Config,
backends::StorageBackendMethods,
blob::{BlobUpload, StreamingBlob},
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
crc32_for_path,
errors::PathNotFoundError,
metrics::StorageMetrics,
types::FileRange,
Expand All @@ -13,15 +14,18 @@ use aws_sdk_s3::{
Client,
config::{Region, retry::RetryConfig},
error::{ProvideErrorMetadata, SdkError},
types::{Delete, ObjectIdentifier},
primitives::{ByteStream, Length},
types::{ChecksumAlgorithm, Delete, ObjectIdentifier},
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use base64::{Engine as _, engine::general_purpose::STANDARD as b64};
use chrono::Utc;
use docs_rs_headers::{ETag, compute_etag};
use futures_util::{
future::TryFutureExt,
stream::{BoxStream, FuturesUnordered, StreamExt},
};
use docs_rs_utils::spawn_blocking;
use futures_util::stream::{BoxStream, StreamExt};
use opentelemetry::KeyValue;
use std::time::Duration;
use tokio::{fs, time::sleep};
use tracing::{error, warn};

// error codes to check for when trying to determine if an error is
Expand All @@ -41,6 +45,8 @@ static NOT_FOUND_ERROR_CODES: [&str; 5] = [
"XMinioInvalidObjectName",
];

const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB

trait S3ResultExt<T> {
fn convert_errors(self) -> anyhow::Result<T>;
}
Expand Down Expand Up @@ -230,45 +236,94 @@ impl StorageBackendMethods for S3Backend {
})
}

async fn store_batch(&self, mut batch: Vec<BlobUpload>) -> Result<(), Error> {
// Attempt to upload the batch 3 times
for _ in 0..3 {
let mut futures = FuturesUnordered::new();
for blob in batch.drain(..) {
futures.push(
self.client
.put_object()
.bucket(&self.bucket)
.key(&blob.path)
.body(blob.content.clone().into())
.content_type(blob.mime.to_string())
.set_content_encoding(blob.compression.map(|alg| alg.to_string()))
.send()
.map_ok(|_| {
self.otel_metrics.uploaded_files.add(1, &[]);
})
.map_err(|err| {
warn!(?err, "Failed to upload blob to S3");
// Reintroduce failed blobs for a retry
blob
}),
);
async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> {
let StreamUpload {
path,
mime,
source,
compression,
} = upload;

let (content_length, checksum_crc32) = match &source {
StreamUploadSource::Bytes(bytes) => (bytes.len() as u64, None),
StreamUploadSource::File(local_path) => {
let local_path = local_path.clone();

(
fs::metadata(&local_path).await?.len(),
Some(
spawn_blocking(move || Ok(b64.encode(crc32_for_path(local_path)?))).await?,
),
)
}
};

while let Some(result) = futures.next().await {
// Push each failed blob back into the batch
if let Err(blob) = result {
batch.push(blob);
let mut last_err = None;

for attempt in 1..=3 {
let body = match &source {
StreamUploadSource::Bytes(bytes) => ByteStream::from(bytes.clone()),
StreamUploadSource::File(path) => {
// NOTE:
// reading the upload-data from a local path is
// "retryable" in the AWS SDK sense.
// ".file" (file pointer) is not retryable.
ByteStream::read_from()
.path(path)
.buffer_size(S3_UPLOAD_BUFFER_SIZE)
.length(Length::Exact(content_length))
.build()
.await?
}
};

let mut request = self
.client
.put_object()
.bucket(&self.bucket)
.key(&path)
.body(body)
.content_length(content_length as i64)
.content_type(mime.to_string())
.set_content_encoding(compression.map(|alg| alg.to_string()));

// NOTE: when you try to stream-upload a local file, the AWS SDK by default
// uses a "middleware" to calculate the checksum for the content, to compare it after
// uploading.
// This piece is broken right now, but only when using S3 directly. On minio, all is
// fine.
// I don't want to disable checksums so we're sure the files are uploaded correctly.
// So the only alternative (outside of trying to fix the SDK) is to calculate the
// checksum ourselves. This is a little annoying because this means we have to read the
// whole file before upload. But since I don't want to load all files into memory before
// upload, this is the only option.
if let Some(checksum_crc32) = &checksum_crc32 {
request = request
.checksum_algorithm(ChecksumAlgorithm::Crc32)
.checksum_crc32(checksum_crc32);
}

// If we uploaded everything in the batch, we're done
if batch.is_empty() {
return Ok(());
match request.send().await {
Ok(_) => {
self.otel_metrics
.uploaded_files
.add(1, &[KeyValue::new("attempt", attempt.to_string())]);
return Ok(());
}
Err(err) => {
warn!(?err, attempt, %path, "failed to upload blob to S3");
last_err = Some(err);

if attempt < 3 {
sleep(Duration::from_millis(10 * 2u64.pow(attempt))).await;
}
}
}
}

panic!("failed to upload 3 times, exiting");
Err(last_err
.expect("upload retry loop exited without a result")
.into())
}

async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String, Error>> {
Expand Down
Loading
Loading