Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 165 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
"program_transformers",
"blockbuster",
"core",
"bubblegum-backfill"
"bubblegum"
]

[dependencies]
Expand Down Expand Up @@ -61,13 +61,14 @@ solana-account-decoder = {workspace=true}
bytemuck = {workspace=true}
mpl-core = {workspace=true}
mpl-token-metadata = {workspace=true}
sha3 = { workspace = true }
solana-zk-token-sdk = {workspace=true}
spl-noop = {workspace=true}
spl-pod = {workspace=true}
anchor-client = {workspace=true}
spl-token-2022 = {workspace=true}
spl-token-group-interface = {workspace=true}
das-bubblegum-backfill = {workspace=true}
das-bubblegum = {workspace=true}
spl-token-metadata-interface = {workspace=true}
flatbuffers = {workspace=true}
rand = {workspace=true}
Expand All @@ -76,6 +77,7 @@ solana-program = {workspace=true}
blockbuster = {workspace=true}
signal-hook = {workspace=true}
base64 = {workspace=true}
spl-account-compression = {workspace = true}


[[bin]]
Expand Down Expand Up @@ -118,15 +120,15 @@ mime_guess = "2.0.4"
num-derive = "0.3.3"
num-traits = "0.2.19"
program_transformers = { path = "program_transformers" }
das-bubblegum-backfill = { path = "bubblegum-backfill" }
das-bubblegum = { path = "bubblegum" }
das-core = { path = "core" }
schemars = "0.8.21"
schemars_derive = "0.8.21"
sea-query = "0.28.5"
serde = "1.0.203"
serde_json = "1.0.118"
spl-concurrent-merkle-tree = "0.2.0"
spl-account-compression = "0.3.0"
spl-account-compression = "0.4.2"
spl-token = ">= 3.5.0, < 5.0"
thiserror = "1.0.61"
tracing = "0.1.40"
Expand All @@ -144,11 +146,10 @@ solana-account-decoder = "~1.17"
bytemuck = { version = "1.16.1", features = ["derive"] }
mpl-core = { version = "0.7.1", features = ["serde"] }
mpl-token-metadata = "4.1.2"
sha3 = "0.10.8"
solana-zk-token-sdk = "~1.17"
spl-noop = "0.2.0"
spl-pod = { version = "0.1.0", features = ["serde-traits"] }


spl-token-metadata-interface = "0.2.0"
flatbuffers = "23.5.26"
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion blockbuster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
license = "AGPL-3.0"
name = "blockbuster"
readme = "../README.md"

repository = "https://github.com/metaplex-foundation/blockbuster"
version = "2.3.0"

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions blockbuster/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(deprecated)]
pub mod error;
pub mod instruction;
pub mod program_handler;
Expand Down
69 changes: 0 additions & 69 deletions bubblegum-backfill/src/lib.rs

This file was deleted.

50 changes: 0 additions & 50 deletions bubblegum-backfill/src/worker/program_transformer.rs

This file was deleted.

3 changes: 2 additions & 1 deletion bubblegum-backfill/Cargo.toml → bubblegum/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "das-bubblegum-backfill"
name = "das-bubblegum"
version = { workspace = true }
edition = { workspace = true }
repository = { workspace = true }
Expand Down Expand Up @@ -28,6 +28,7 @@ num-traits = { workspace = true }
sea-orm = { workspace = true }
serde_json = { workspace = true }
solana-sdk = { workspace = true }
sha3 = { workspace = true }
solana-transaction-status = { workspace = true }
spl-account-compression = { workspace = true, features = ["no-entrypoint"] }
spl-token = { workspace = true, features = ["no-entrypoint"] }
Expand Down
24 changes: 24 additions & 0 deletions bubblegum/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## DAS Backfill

The DAS Backfill library facilitates the initial setup and data backfilling for DAS, focusing on the bubblegum program. This program's indexing heavily relies on transaction data. While the library supports parallel backfilling across different trees, it ensures that transactions within each tree are processed sequentially. This approach guarantees accurate representation of every modification in the merkle tree within DAS.

## Usage

```rust
use das_backfill::{
BubblegumBackfillArgs,
BubblegumBackfillContext,
start_bubblegum_backfill
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let database_pool = sqlx::PgPool::connect("your_database_url").await?;
let solana_rpc = Rpc::new("your_solana_rpc_url");

let context = BubblegumBackfillContext::new(database_pool, solana_rpc);
let args = BubblegumBackfillArgs::parse(); // Parses args from CLI

start_bubblegum_backfill(context, args).await
}
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::ErrorKind;
use crate::Rpc;
use crate::{error::ErrorKind, Rpc};
use anyhow::Result;
use clap::Args;
use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value};
Expand Down
2 changes: 2 additions & 0 deletions bubblegum/src/backfill/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod gap;
pub mod worker;
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use tokio::{
task::JoinHandle,
};

use crate::gap::TreeGapFill;
use crate::BubblegumBackfillContext;
use crate::{backfill::gap::TreeGapFill, BubblegumContext};

#[derive(Parser, Debug, Clone)]
pub struct GapWorkerArgs {
Expand All @@ -26,7 +25,7 @@ pub struct GapWorkerArgs {
impl GapWorkerArgs {
pub fn start(
&self,
context: BubblegumBackfillContext,
context: BubblegumContext,
forward: Sender<Signature>,
) -> Result<(JoinHandle<()>, Sender<TreeGapFill>)> {
let (gap_sender, mut gap_receiver) = channel::<TreeGapFill>(self.gap_channel_size);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod gap;
mod program_transformer;
mod transaction;
mod tree;
pub mod gap;
pub mod program_transformer;
pub mod transaction;
pub mod tree;

pub use gap::GapWorkerArgs;
pub use program_transformer::ProgramTransformerWorkerArgs;
Expand Down
70 changes: 70 additions & 0 deletions bubblegum/src/backfill/worker/program_transformer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use anyhow::Result;
use clap::Parser;
use das_core::{create_download_metadata_notifier, DownloadMetadataInfo};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::error;
use program_transformers::{ProgramTransformer, TransactionInfo};
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Sender, UnboundedSender};
use tokio::task::JoinHandle;

use crate::BubblegumContext;

#[derive(Parser, Debug, Clone)]
pub struct ProgramTransformerWorkerArgs {
#[arg(long, env, default_value = "100000")]
pub program_transformer_channel_size: usize,
#[arg(long, env, default_value = "50")]
pub program_transformer_worker_count: usize,
}

impl ProgramTransformerWorkerArgs {
pub fn start(
&self,
context: BubblegumContext,
forwarder: UnboundedSender<DownloadMetadataInfo>,
) -> Result<(JoinHandle<()>, Sender<TransactionInfo>)> {
let (sender, mut receiver) =
channel::<TransactionInfo>(self.program_transformer_channel_size);

let worker_forwarder = forwarder.clone();
let worker_pool = context.database_pool.clone();
let worker_count = self.program_transformer_worker_count;
let handle = tokio::spawn(async move {
let download_metadata_notifier =
create_download_metadata_notifier(worker_forwarder.clone()).await;
let program_transformer = Arc::new(ProgramTransformer::new(
worker_pool.clone(),
download_metadata_notifier,
));

let mut handlers = FuturesUnordered::new();

while let Some(transaction) = receiver.recv().await {
if handlers.len() >= worker_count {
handlers.next().await;
}

let program_transformer_clone = Arc::clone(&program_transformer);
let handle = tokio::spawn(async move {
if let Err(err) = program_transformer_clone
.handle_transaction(&transaction)
.await
{
error!(
"Failed to handle bubblegum instruction for txn {:?}: {:?}",
transaction.signature, err
);
}
});

handlers.push(handle);
}

futures::future::join_all(handlers).await;
});

Ok((handle, sender))
}
}
Loading