From b9e2546a46168ac78e381ca24d36e85a789c665c Mon Sep 17 00:00:00 2001 From: Wilfred Almeida Date: Sun, 24 Nov 2024 20:00:24 +0530 Subject: [PATCH] refactor: bring in changes from upstream Signed-off-by: Wilfred Almeida --- Cargo.lock | 255 ++++--- Cargo.toml | 13 +- blockbuster/Cargo.toml | 2 +- blockbuster/src/lib.rs | 1 + bubblegum-backfill/src/lib.rs | 69 -- .../src/worker/program_transformer.rs | 50 -- {bubblegum-backfill => bubblegum}/Cargo.toml | 3 +- bubblegum/README.md | 24 + .../src => bubblegum/src/backfill}/gap.rs | 3 +- bubblegum/src/backfill/mod.rs | 2 + .../src/backfill}/worker/gap.rs | 5 +- .../src/backfill}/worker/mod.rs | 8 +- .../backfill/worker/program_transformer.rs | 70 ++ .../src/backfill}/worker/transaction.rs | 70 +- .../src/backfill}/worker/tree.rs | 51 +- .../src/error.rs | 0 bubblegum/src/lib.rs | 188 +++++ {bubblegum-backfill => bubblegum}/src/tree.rs | 5 +- bubblegum/src/verify.rs | 182 +++++ .../src/dao/generated/tokens.rs | 2 +- digital_asset_types/src/dapi/change_logs.rs | 4 +- digital_asset_types/tests/common.rs | 4 +- program_transformers/Cargo.toml | 1 + program_transformers/src/asset_upserts.rs | 5 +- program_transformers/src/bubblegum/burn.rs | 4 +- .../src/bubblegum/cancel_redeem.rs | 4 +- .../src/bubblegum/collection_verification.rs | 4 +- .../src/bubblegum/creator_verification.rs | 4 +- program_transformers/src/bubblegum/db.rs | 149 ++-- .../src/bubblegum/delegate.rs | 4 +- program_transformers/src/bubblegum/mint_v1.rs | 12 +- program_transformers/src/bubblegum/mod.rs | 27 +- program_transformers/src/bubblegum/redeem.rs | 4 +- .../src/bubblegum/transfer.rs | 5 +- .../src/bubblegum/update_metadata.rs | 12 +- program_transformers/src/lib.rs | 56 +- .../src/mpl_core_program/mod.rs | 42 -- .../src/mpl_core_program/v1_asset.rs | 642 ------------------ program_transformers/src/token/mod.rs | 169 ----- .../src/token_metadata/master_edition.rs | 107 --- .../src/token_metadata/mod.rs | 53 -- .../src/token_metadata/v1_asset.rs | 413 ----------- src/main.rs | 27 +- 43 files changed, 837 insertions(+), 1918 deletions(-) delete mode 100644 bubblegum-backfill/src/lib.rs delete mode 100644 bubblegum-backfill/src/worker/program_transformer.rs rename {bubblegum-backfill => bubblegum}/Cargo.toml (95%) create mode 100644 bubblegum/README.md rename {bubblegum-backfill/src => bubblegum/src/backfill}/gap.rs (98%) create mode 100644 bubblegum/src/backfill/mod.rs rename {bubblegum-backfill/src => bubblegum/src/backfill}/worker/gap.rs (94%) rename {bubblegum-backfill/src => bubblegum/src/backfill}/worker/mod.rs (67%) create mode 100644 bubblegum/src/backfill/worker/program_transformer.rs rename {bubblegum-backfill/src => bubblegum/src/backfill}/worker/transaction.rs (78%) rename {bubblegum-backfill/src => bubblegum/src/backfill}/worker/tree.rs (72%) rename {bubblegum-backfill => bubblegum}/src/error.rs (100%) create mode 100644 bubblegum/src/lib.rs rename {bubblegum-backfill => bubblegum}/src/tree.rs (98%) create mode 100644 bubblegum/src/verify.rs delete mode 100644 program_transformers/src/mpl_core_program/mod.rs delete mode 100644 program_transformers/src/mpl_core_program/v1_asset.rs delete mode 100644 program_transformers/src/token/mod.rs delete mode 100644 program_transformers/src/token_metadata/master_edition.rs delete mode 100644 program_transformers/src/token_metadata/mod.rs delete mode 100644 program_transformers/src/token_metadata/v1_asset.rs diff --git a/Cargo.lock b/Cargo.lock index 29f0621..7b06c29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,7 +268,7 @@ dependencies = [ "borsh 0.10.3", "bytemuck", "getrandom 0.2.15", - "solana-program", + "solana-program 1.17.14", "thiserror", ] @@ -739,7 +739,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -756,7 +756,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -988,7 +988,7 @@ dependencies = [ "solana-transaction-status", "solana-zk-token-sdk", "spl-account-compression", - "spl-concurrent-merkle-tree", + "spl-concurrent-merkle-tree 0.2.0", "spl-noop", "spl-pod", "spl-token", @@ -1077,7 +1077,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", "syn_derive", ] @@ -1201,9 +1201,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.16.1" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" dependencies = [ "bytemuck_derive", ] @@ -1216,7 +1216,7 @@ checksum = "1ee891b04274a59bd38b412188e24b849617b2e45a0fd8d057deb63e7403761b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1368,7 +1368,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1627,7 +1627,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1638,12 +1638,12 @@ checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ "darling_core", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] -name = "das-bubblegum-backfill" -version = "0.1.0" +name = "das-bubblegum" +version = "0.2.0" dependencies = [ "anchor-client", "anyhow", @@ -1661,8 +1661,9 @@ dependencies = [ "program_transformers", "sea-orm 0.10.7", "serde_json", + "sha3 0.10.8", "solana-client", - "solana-program", + "solana-program 1.17.14", "solana-sdk", "solana-transaction-status", "spl-account-compression", @@ -1675,7 +1676,7 @@ dependencies = [ [[package]] name = "das-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "backon", @@ -1781,7 +1782,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1818,7 +1819,7 @@ dependencies = [ [[package]] name = "digital_asset_types" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "blockbuster", @@ -1839,7 +1840,7 @@ dependencies = [ "serde", "serde_json", "solana-sdk", - "spl-concurrent-merkle-tree", + "spl-concurrent-merkle-tree 0.2.0", "thiserror", "tokio", "url", @@ -1873,7 +1874,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1896,7 +1897,7 @@ checksum = "a6cbae11b3de8fce2a456e8ea3dada226b35fe791f0dc1d360c0941f0bb681f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -1999,7 +2000,7 @@ checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -2264,7 +2265,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -2747,7 +2748,7 @@ checksum = "0122b7114117e64a63ac49f752a5ca4624d534c7b1c7de796ac196381cd2d947" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -2952,7 +2953,7 @@ dependencies = [ [[package]] name = "lightdas" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-client", "anchor-lang", @@ -2966,7 +2967,7 @@ dependencies = [ "bs58 0.4.0", "bytemuck", "clap 4.5.7", - "das-bubblegum-backfill", + "das-bubblegum", "das-core", "derive_more", "digital_asset_types", @@ -2997,16 +2998,18 @@ dependencies = [ "sea-query 0.28.5", "serde", "serde_json", + "sha3 0.10.8", "signal-hook", "solana-account-decoder", "solana-client", "solana-geyser-plugin-interface", - "solana-program", + "solana-program 1.17.14", "solana-rpc-client-api", "solana-sdk", "solana-transaction-status", "solana-zk-token-sdk", - "spl-concurrent-merkle-tree", + "spl-account-compression", + "spl-concurrent-merkle-tree 0.2.0", "spl-noop", "spl-pod", "spl-token-2022", @@ -3194,7 +3197,7 @@ dependencies = [ "kaigan", "num-derive 0.3.3", "num-traits", - "solana-program", + "solana-program 1.17.14", "thiserror", ] @@ -3213,7 +3216,7 @@ dependencies = [ "serde", "serde_json", "serde_with 3.8.1", - "solana-program", + "solana-program 1.17.14", "thiserror", ] @@ -3228,7 +3231,7 @@ dependencies = [ "num-traits", "serde", "serde_with 3.8.1", - "solana-program", + "solana-program 1.17.14", "thiserror", ] @@ -3342,7 +3345,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3423,7 +3426,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3435,7 +3438,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3497,7 +3500,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3577,7 +3580,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3699,7 +3702,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -3896,16 +3899,16 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] [[package]] name = "program_transformers" -version = "0.1.0" +version = "0.2.0" dependencies = [ "blockbuster", "bs58 0.4.0", @@ -3918,6 +3921,7 @@ dependencies = [ "mpl-bubblegum", "num-traits", "sea-orm 0.10.7", + "serde", "serde_json", "solana-sdk", "solana-transaction-status", @@ -3966,7 +3970,7 @@ checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4553,7 +4557,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4579,7 +4583,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4602,7 +4606,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4695,7 +4699,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.68", + "syn 2.0.89", "unicode-ident", ] @@ -4811,7 +4815,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", "thiserror", ] @@ -4921,7 +4925,7 @@ checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4932,7 +4936,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -4996,7 +5000,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -5008,7 +5012,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -5309,7 +5313,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -5458,13 +5462,59 @@ dependencies = [ "sha3 0.10.8", "solana-frozen-abi", "solana-frozen-abi-macro", - "solana-sdk-macro", + "solana-sdk-macro 1.17.14", "thiserror", "tiny-bip39", "wasm-bindgen", "zeroize", ] +[[package]] +name = "solana-program" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "676cb677930c71e989df31f34640e29c927f0bbc07ce85a1598ff549ef85cb5d" +dependencies = [ + "ark-bn254", + "ark-ec", + "ark-ff", + "ark-serialize", + "base64 0.22.1", + "bincode", + "bitflags 2.6.0", + "blake3", + "borsh 0.10.3", + "borsh 1.5.1", + "bs58 0.5.1", + "bv", + "bytemuck", + "bytemuck_derive", + "console_error_panic_hook", + "console_log", + "curve25519-dalek", + "getrandom 0.2.15", + "js-sys", + "lazy_static", + "libsecp256k1", + "log", + "memoffset 0.9.1", + "num-bigint 0.4.6", + "num-derive 0.4.2", + "num-traits", + "parking_lot 0.12.3", + "rand 0.8.5", + "rustc_version", + "rustversion", + "serde", + "serde_bytes", + "serde_derive", + "sha2 0.10.8", + "sha3 0.10.8", + "solana-sdk-macro 2.0.17", + "thiserror", + "wasm-bindgen", +] + [[package]] name = "solana-program-runtime" version = "1.17.14" @@ -5682,8 +5732,8 @@ dependencies = [ "solana-frozen-abi", "solana-frozen-abi-macro", "solana-logger", - "solana-program", - "solana-sdk-macro", + "solana-program 1.17.14", + "solana-sdk-macro 1.17.14", "thiserror", "uriparse", "wasm-bindgen", @@ -5699,7 +5749,20 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.68", + "syn 2.0.89", +] + +[[package]] +name = "solana-sdk-macro" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0768aa8d51ef38e559784ccd62f0523f3e6ed9ba8903f49fb1d961ee8aff4f3e" +dependencies = [ + "bs58 0.5.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.89", ] [[package]] @@ -5851,7 +5914,7 @@ dependencies = [ "solana-frozen-abi", "solana-frozen-abi-macro", "solana-metrics", - "solana-program", + "solana-program 1.17.14", "solana-program-runtime", "solana-sdk", "thiserror", @@ -5879,7 +5942,7 @@ dependencies = [ "serde", "serde_json", "sha3 0.9.1", - "solana-program", + "solana-program 1.17.14", "solana-sdk", "subtle", "thiserror", @@ -5929,13 +5992,14 @@ dependencies = [ [[package]] name = "spl-account-compression" -version = "0.3.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c43bd4455d9fb29b9e4f83c087ccffa2f6f41fecfc0549932ae391d00f3378" +checksum = "2785042005954aec5d5db7fcb99a78754b222be906a89d10a3d66ebdbc8e9548" dependencies = [ "anchor-lang", "bytemuck", - "spl-concurrent-merkle-tree", + "solana-program 2.0.17", + "spl-concurrent-merkle-tree 0.4.0", "spl-noop", ] @@ -5949,7 +6013,7 @@ dependencies = [ "borsh 0.10.3", "num-derive 0.4.2", "num-traits", - "solana-program", + "solana-program 1.17.14", "spl-token", "spl-token-2022", "thiserror", @@ -5962,7 +6026,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "141eaea58588beae81b71d101373a53f096737739873de42d6b1368bc2b8fc30" dependencies = [ "bytemuck", - "solana-program", + "solana-program 1.17.14", + "thiserror", +] + +[[package]] +name = "spl-concurrent-merkle-tree" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85d1bbb97252d8a1b90d3d56425038928382a306b71dbba4c836973c94b33f96" +dependencies = [ + "bytemuck", + "solana-program 2.0.17", "thiserror", ] @@ -5973,7 +6048,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cce5d563b58ef1bb2cdbbfe0dfb9ffdc24903b10ae6a4df2d8f425ece375033f" dependencies = [ "bytemuck", - "solana-program", + "solana-program 1.17.14", "spl-discriminator-derive", ] @@ -5985,7 +6060,7 @@ checksum = "07fd7858fc4ff8fb0e34090e41d7eb06a823e1057945c26d480bfc21d2338a93" dependencies = [ "quote", "spl-discriminator-syn", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -5997,7 +6072,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.68", + "syn 2.0.89", "thiserror", ] @@ -6007,7 +6082,7 @@ version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f180b03318c3dbab3ef4e1e4d46d5211ae3c780940dd0a28695aba4b59a75a" dependencies = [ - "solana-program", + "solana-program 1.17.14", ] [[package]] @@ -6016,7 +6091,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dd67ea3d0070a12ff141f5da46f9695f49384a03bce1203a5608f5739437950" dependencies = [ - "solana-program", + "solana-program 1.17.14", ] [[package]] @@ -6029,7 +6104,7 @@ dependencies = [ "borsh 0.10.3", "bytemuck", "serde", - "solana-program", + "solana-program 1.17.14", "solana-zk-token-sdk", "spl-program-error", ] @@ -6042,7 +6117,7 @@ checksum = "249e0318493b6bcf27ae9902600566c689b7dfba9f1bdff5893e92253374e78c" dependencies = [ "num-derive 0.4.2", "num-traits", - "solana-program", + "solana-program 1.17.14", "spl-program-error-derive", "thiserror", ] @@ -6056,7 +6131,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -6066,7 +6141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "615d381f48ddd2bb3c57c7f7fb207591a2a05054639b18a62e785117dd7a8683" dependencies = [ "bytemuck", - "solana-program", + "solana-program 1.17.14", "spl-discriminator", "spl-pod", "spl-program-error", @@ -6084,7 +6159,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "num_enum 0.6.1", - "solana-program", + "solana-program 1.17.14", "thiserror", ] @@ -6099,7 +6174,7 @@ dependencies = [ "num-derive 0.4.2", "num-traits", "num_enum 0.7.2", - "solana-program", + "solana-program 1.17.14", "solana-security-txt", "solana-zk-token-sdk", "spl-memo", @@ -6119,7 +6194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d" dependencies = [ "bytemuck", - "solana-program", + "solana-program 1.17.14", "spl-discriminator", "spl-pod", "spl-program-error", @@ -6132,7 +6207,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c16ce3ba6979645fb7627aa1e435576172dd63088dc7848cb09aa331fa1fe4f" dependencies = [ "borsh 0.10.3", - "solana-program", + "solana-program 1.17.14", "spl-discriminator", "spl-pod", "spl-program-error", @@ -6147,7 +6222,7 @@ checksum = "7aabdb7c471566f6ddcee724beb8618449ea24b399e58d464d6b5bc7db550259" dependencies = [ "arrayref", "bytemuck", - "solana-program", + "solana-program 1.17.14", "spl-discriminator", "spl-pod", "spl-program-error", @@ -6162,7 +6237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a468e6f6371f9c69aae760186ea9f1a01c2908351b06a5e0026d21cfc4d7ecac" dependencies = [ "bytemuck", - "solana-program", + "solana-program 1.17.14", "spl-discriminator", "spl-pod", "spl-program-error", @@ -6465,9 +6540,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ "proc-macro2", "quote", @@ -6483,7 +6558,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -6569,22 +6644,22 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -6689,7 +6764,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -6825,7 +6900,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -7112,7 +7187,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", "wasm-bindgen-shared", ] @@ -7146,7 +7221,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7463,7 +7538,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] @@ -7483,7 +7558,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.89", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a76cfa0..dddba87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ "program_transformers", "blockbuster", "core", - "bubblegum-backfill" + "bubblegum" ] [dependencies] @@ -61,13 +61,14 @@ solana-account-decoder = {workspace=true} bytemuck = {workspace=true} mpl-core = {workspace=true} mpl-token-metadata = {workspace=true} +sha3 = { workspace = true } solana-zk-token-sdk = {workspace=true} spl-noop = {workspace=true} spl-pod = {workspace=true} anchor-client = {workspace=true} spl-token-2022 = {workspace=true} spl-token-group-interface = {workspace=true} -das-bubblegum-backfill = {workspace=true} +das-bubblegum = {workspace=true} spl-token-metadata-interface = {workspace=true} flatbuffers = {workspace=true} rand = {workspace=true} @@ -76,6 +77,7 @@ solana-program = {workspace=true} blockbuster = {workspace=true} signal-hook = {workspace=true} base64 = {workspace=true} +spl-account-compression = {workspace = true} [[bin]] @@ -118,7 +120,7 @@ mime_guess = "2.0.4" num-derive = "0.3.3" num-traits = "0.2.19" program_transformers = { path = "program_transformers" } -das-bubblegum-backfill = { path = "bubblegum-backfill" } +das-bubblegum = { path = "bubblegum" } das-core = { path = "core" } schemars = "0.8.21" schemars_derive = "0.8.21" @@ -126,7 +128,7 @@ sea-query = "0.28.5" serde = "1.0.203" serde_json = "1.0.118" spl-concurrent-merkle-tree = "0.2.0" -spl-account-compression = "0.3.0" +spl-account-compression = "0.4.2" spl-token = ">= 3.5.0, < 5.0" thiserror = "1.0.61" tracing = "0.1.40" @@ -144,11 +146,10 @@ solana-account-decoder = "~1.17" bytemuck = { version = "1.16.1", features = ["derive"] } mpl-core = { version = "0.7.1", features = ["serde"] } mpl-token-metadata = "4.1.2" +sha3 = "0.10.8" solana-zk-token-sdk = "~1.17" spl-noop = "0.2.0" spl-pod = { version = "0.1.0", features = ["serde-traits"] } - - spl-token-metadata-interface = "0.2.0" flatbuffers = "23.5.26" rand = "0.8.5" diff --git a/blockbuster/Cargo.toml b/blockbuster/Cargo.toml index 00dd4b5..e110009 100644 --- a/blockbuster/Cargo.toml +++ b/blockbuster/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" license = "AGPL-3.0" name = "blockbuster" readme = "../README.md" - +repository = "https://github.com/metaplex-foundation/blockbuster" version = "2.3.0" [dependencies] diff --git a/blockbuster/src/lib.rs b/blockbuster/src/lib.rs index 983c728..e8e6eee 100644 --- a/blockbuster/src/lib.rs +++ b/blockbuster/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] pub mod error; pub mod instruction; pub mod program_handler; diff --git a/bubblegum-backfill/src/lib.rs b/bubblegum-backfill/src/lib.rs deleted file mode 100644 index 3b5495e..0000000 --- a/bubblegum-backfill/src/lib.rs +++ /dev/null @@ -1,69 +0,0 @@ -mod error; -mod gap; -mod tree; -pub mod worker; - -pub use error::ErrorKind; - -use anyhow::Result; -use clap::Parser; -use das_core::Rpc; -use futures::{stream::FuturesUnordered, StreamExt}; -use tree::TreeResponse; -use worker::TreeWorkerArgs; - -#[derive(Clone)] -pub struct BubblegumBackfillContext { - pub database_pool: sqlx::PgPool, - pub solana_rpc: Rpc, -} - -impl BubblegumBackfillContext { - pub const fn new(database_pool: sqlx::PgPool, solana_rpc: Rpc) -> Self { - Self { - database_pool, - solana_rpc, - } - } -} - -#[derive(Debug, Parser, Clone)] -pub struct BubblegumBackfillArgs { - /// Number of tree crawler workers - #[arg(long, env, default_value = "20")] - pub tree_crawler_count: usize, - - /// The list of trees to crawl. If not specified, all trees will be crawled. - #[arg(long, env, use_value_delimiter = true)] - pub only_trees: Option>, - - #[clap(flatten)] - pub tree_worker: TreeWorkerArgs, -} - -pub async fn start_bubblegum_backfill( - context: BubblegumBackfillContext, - args: BubblegumBackfillArgs, -) -> Result<()> { - let trees = if let Some(ref only_trees) = args.only_trees { - TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? - } else { - TreeResponse::all(&context.solana_rpc).await? - }; - - let mut crawl_handles = FuturesUnordered::new(); - - for tree in trees { - if crawl_handles.len() >= args.tree_crawler_count { - crawl_handles.next().await; - } - let context = context.clone(); - let handle = args.tree_worker.start(context, tree); - - crawl_handles.push(handle); - } - - futures::future::try_join_all(crawl_handles).await?; - - Ok(()) -} diff --git a/bubblegum-backfill/src/worker/program_transformer.rs b/bubblegum-backfill/src/worker/program_transformer.rs deleted file mode 100644 index da90182..0000000 --- a/bubblegum-backfill/src/worker/program_transformer.rs +++ /dev/null @@ -1,50 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use das_core::{create_download_metadata_notifier, DownloadMetadataInfo}; -use log::error; -use program_transformers::{ProgramTransformer, TransactionInfo}; -use tokio::sync::mpsc::{channel, Sender, UnboundedSender}; -use tokio::task::JoinHandle; - -use crate::BubblegumBackfillContext; - -#[derive(Parser, Debug, Clone)] -pub struct ProgramTransformerWorkerArgs { - #[arg(long, env, default_value = "100000")] - pub program_transformer_channel_size: usize, -} - -impl ProgramTransformerWorkerArgs { - pub fn start( - &self, - context: BubblegumBackfillContext, - forwarder: UnboundedSender, - ) -> Result<(JoinHandle<()>, Sender)> { - let (sender, mut receiver) = - channel::(self.program_transformer_channel_size); - - let handle = tokio::spawn(async move { - let mut transactions = Vec::new(); - let pool = context.database_pool.clone(); - - let download_metadata_notifier = create_download_metadata_notifier(forwarder).await; - - let program_transformer = - ProgramTransformer::new(pool, download_metadata_notifier, true); - - while let Some(gap) = receiver.recv().await { - transactions.push(gap); - } - - transactions.sort_by(|a, b| b.signature.cmp(&a.signature)); - - for transaction in transactions { - if let Err(e) = program_transformer.handle_transaction(&transaction).await { - error!("handle transaction: {:?}", e) - }; - } - }); - - Ok((handle, sender)) - } -} diff --git a/bubblegum-backfill/Cargo.toml b/bubblegum/Cargo.toml similarity index 95% rename from bubblegum-backfill/Cargo.toml rename to bubblegum/Cargo.toml index 8b49f75..0ecd470 100644 --- a/bubblegum-backfill/Cargo.toml +++ b/bubblegum/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "das-bubblegum-backfill" +name = "das-bubblegum" version = { workspace = true } edition = { workspace = true } repository = { workspace = true } @@ -28,6 +28,7 @@ num-traits = { workspace = true } sea-orm = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } +sha3 = { workspace = true } solana-transaction-status = { workspace = true } spl-account-compression = { workspace = true, features = ["no-entrypoint"] } spl-token = { workspace = true, features = ["no-entrypoint"] } diff --git a/bubblegum/README.md b/bubblegum/README.md new file mode 100644 index 0000000..e989833 --- /dev/null +++ b/bubblegum/README.md @@ -0,0 +1,24 @@ +## DAS Backfill + +The DAS Backfill library facilitates the initial setup and data backfilling for DAS, focusing on the bubblegum program. This program's indexing heavily relies on transaction data. While the library supports parallel backfilling across different trees, it ensures that transactions within each tree are processed sequentially. This approach guarantees accurate representation of every modification in the merkle tree within DAS. + +## Usage + +```rust +use das_backfill::{ + BubblegumBackfillArgs, + BubblegumBackfillContext, + start_bubblegum_backfill +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let database_pool = sqlx::PgPool::connect("your_database_url").await?; + let solana_rpc = Rpc::new("your_solana_rpc_url"); + + let context = BubblegumBackfillContext::new(database_pool, solana_rpc); + let args = BubblegumBackfillArgs::parse(); // Parses args from CLI + + start_bubblegum_backfill(context, args).await +} +``` diff --git a/bubblegum-backfill/src/gap.rs b/bubblegum/src/backfill/gap.rs similarity index 98% rename from bubblegum-backfill/src/gap.rs rename to bubblegum/src/backfill/gap.rs index feb523b..e167e88 100644 --- a/bubblegum-backfill/src/gap.rs +++ b/bubblegum/src/backfill/gap.rs @@ -1,5 +1,4 @@ -use super::ErrorKind; -use crate::Rpc; +use crate::{error::ErrorKind, Rpc}; use anyhow::Result; use clap::Args; use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; diff --git a/bubblegum/src/backfill/mod.rs b/bubblegum/src/backfill/mod.rs new file mode 100644 index 0000000..5a4a874 --- /dev/null +++ b/bubblegum/src/backfill/mod.rs @@ -0,0 +1,2 @@ +pub mod gap; +pub mod worker; diff --git a/bubblegum-backfill/src/worker/gap.rs b/bubblegum/src/backfill/worker/gap.rs similarity index 94% rename from bubblegum-backfill/src/worker/gap.rs rename to bubblegum/src/backfill/worker/gap.rs index 07b88f4..68523fd 100644 --- a/bubblegum-backfill/src/worker/gap.rs +++ b/bubblegum/src/backfill/worker/gap.rs @@ -9,8 +9,7 @@ use tokio::{ task::JoinHandle, }; -use crate::gap::TreeGapFill; -use crate::BubblegumBackfillContext; +use crate::{backfill::gap::TreeGapFill, BubblegumContext}; #[derive(Parser, Debug, Clone)] pub struct GapWorkerArgs { @@ -26,7 +25,7 @@ pub struct GapWorkerArgs { impl GapWorkerArgs { pub fn start( &self, - context: BubblegumBackfillContext, + context: BubblegumContext, forward: Sender, ) -> Result<(JoinHandle<()>, Sender)> { let (gap_sender, mut gap_receiver) = channel::(self.gap_channel_size); diff --git a/bubblegum-backfill/src/worker/mod.rs b/bubblegum/src/backfill/worker/mod.rs similarity index 67% rename from bubblegum-backfill/src/worker/mod.rs rename to bubblegum/src/backfill/worker/mod.rs index 0bc7f9e..81435b3 100644 --- a/bubblegum-backfill/src/worker/mod.rs +++ b/bubblegum/src/backfill/worker/mod.rs @@ -1,7 +1,7 @@ -mod gap; -mod program_transformer; -mod transaction; -mod tree; +pub mod gap; +pub mod program_transformer; +pub mod transaction; +pub mod tree; pub use gap::GapWorkerArgs; pub use program_transformer::ProgramTransformerWorkerArgs; diff --git a/bubblegum/src/backfill/worker/program_transformer.rs b/bubblegum/src/backfill/worker/program_transformer.rs new file mode 100644 index 0000000..75f75c0 --- /dev/null +++ b/bubblegum/src/backfill/worker/program_transformer.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use clap::Parser; +use das_core::{create_download_metadata_notifier, DownloadMetadataInfo}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use log::error; +use program_transformers::{ProgramTransformer, TransactionInfo}; +use std::sync::Arc; +use tokio::sync::mpsc::{channel, Sender, UnboundedSender}; +use tokio::task::JoinHandle; + +use crate::BubblegumContext; + +#[derive(Parser, Debug, Clone)] +pub struct ProgramTransformerWorkerArgs { + #[arg(long, env, default_value = "100000")] + pub program_transformer_channel_size: usize, + #[arg(long, env, default_value = "50")] + pub program_transformer_worker_count: usize, +} + +impl ProgramTransformerWorkerArgs { + pub fn start( + &self, + context: BubblegumContext, + forwarder: UnboundedSender, + ) -> Result<(JoinHandle<()>, Sender)> { + let (sender, mut receiver) = + channel::(self.program_transformer_channel_size); + + let worker_forwarder = forwarder.clone(); + let worker_pool = context.database_pool.clone(); + let worker_count = self.program_transformer_worker_count; + let handle = tokio::spawn(async move { + let download_metadata_notifier = + create_download_metadata_notifier(worker_forwarder.clone()).await; + let program_transformer = Arc::new(ProgramTransformer::new( + worker_pool.clone(), + download_metadata_notifier, + )); + + let mut handlers = FuturesUnordered::new(); + + while let Some(transaction) = receiver.recv().await { + if handlers.len() >= worker_count { + handlers.next().await; + } + + let program_transformer_clone = Arc::clone(&program_transformer); + let handle = tokio::spawn(async move { + if let Err(err) = program_transformer_clone + .handle_transaction(&transaction) + .await + { + error!( + "Failed to handle bubblegum instruction for txn {:?}: {:?}", + transaction.signature, err + ); + } + }); + + handlers.push(handle); + } + + futures::future::join_all(handlers).await; + }); + + Ok((handle, sender)) + } +} diff --git a/bubblegum-backfill/src/worker/transaction.rs b/bubblegum/src/backfill/worker/transaction.rs similarity index 78% rename from bubblegum-backfill/src/worker/transaction.rs rename to bubblegum/src/backfill/worker/transaction.rs index 0786b56..910b79a 100644 --- a/bubblegum-backfill/src/worker/transaction.rs +++ b/bubblegum/src/backfill/worker/transaction.rs @@ -1,4 +1,4 @@ -use crate::error::ErrorKind; +use crate::{error::ErrorKind, BubblegumContext}; use anyhow::Result; use clap::Parser; use das_core::Rpc; @@ -33,6 +33,7 @@ impl TryFrom for Pubkey { } } +#[derive(Debug)] pub struct FetchedEncodedTransactionWithStatusMeta(pub EncodedConfirmedTransactionWithStatusMeta); impl TryFrom for TransactionInfo { @@ -60,49 +61,41 @@ impl TryFrom for TransactionInfo { .transaction .meta .ok_or(ErrorKind::Generic( - "unable to get meta from transaction".to_string(), + "transaction metadata is missing".to_string(), ))?; for address in msg.static_account_keys().iter().copied() { account_keys.push(address); } - let ui_loaded_addresses = meta.loaded_addresses; - let message_address_table_lookup = msg.address_table_lookups(); - - if message_address_table_lookup.is_some() { - if let OptionSerializer::Some(ui_lookup_table) = ui_loaded_addresses { - for address in ui_lookup_table.writable { - account_keys.push(PubkeyString(address).try_into()?); - } - - for address in ui_lookup_table.readonly { - account_keys.push(PubkeyString(address).try_into()?); - } + let ui_loaded_addresses = match meta.loaded_addresses { + OptionSerializer::Some(addresses) => addresses, + OptionSerializer::None => { + return Err(ErrorKind::Generic( + "loaded addresses data is missing".to_string(), + )) } - } - - let mut meta_inner_instructions = Vec::new(); + OptionSerializer::Skip => { + return Err(ErrorKind::Generic( + "loaded addresses are skipped".to_string(), + )); + } + }; - let compiled_instruction = msg.instructions().to_vec(); + let writtable_loaded_addresses = ui_loaded_addresses.writable; + let readable_loaded_addresses = ui_loaded_addresses.readonly; - let mut instructions = Vec::new(); + if msg.address_table_lookups().is_some() { + for address in writtable_loaded_addresses { + account_keys.push(PubkeyString(address).try_into()?); + } - for inner in compiled_instruction { - instructions.push(InnerInstruction { - stack_height: Some(0), - instruction: CompiledInstruction { - program_id_index: inner.program_id_index, - accounts: inner.accounts, - data: inner.data, - }, - }); + for address in readable_loaded_addresses { + account_keys.push(PubkeyString(address).try_into()?); + } } - meta_inner_instructions.push(InnerInstructions { - index: 0, - instructions, - }); + let mut meta_inner_instructions = Vec::new(); if let OptionSerializer::Some(inner_instructions) = meta.inner_instructions { for ix in inner_instructions { @@ -115,9 +108,9 @@ impl TryFrom for TransactionInfo { instruction: CompiledInstruction { program_id_index: compiled.program_id_index, accounts: compiled.accounts, - data: bs58::decode(compiled.data) - .into_vec() - .map_err(|e| ErrorKind::Generic(e.to_string()))?, + data: bs58::decode(compiled.data).into_vec().map_err(|e| { + ErrorKind::Generic(format!("Error decoding data: {}", e)) + })?, }, }); } @@ -150,11 +143,13 @@ pub struct SignatureWorkerArgs { pub signature_worker_count: usize, } +type TransactionSender = Sender; + impl SignatureWorkerArgs { pub fn start( &self, - context: crate::BubblegumBackfillContext, - forwarder: Sender, + context: BubblegumContext, + forwarder: TransactionSender, ) -> Result<(JoinHandle<()>, Sender)> { let (sig_sender, mut sig_receiver) = channel::(self.signature_channel_size); let worker_count = self.signature_worker_count; @@ -188,6 +183,7 @@ async fn queue_transaction<'a>( signature: Signature, ) -> Result<(), ErrorKind> { let transaction = client.get_transaction(&signature).await?; + sender .send(FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?) .await diff --git a/bubblegum-backfill/src/worker/tree.rs b/bubblegum/src/backfill/worker/tree.rs similarity index 72% rename from bubblegum-backfill/src/worker/tree.rs rename to bubblegum/src/backfill/worker/tree.rs index f647e63..e02c213 100644 --- a/bubblegum-backfill/src/worker/tree.rs +++ b/bubblegum/src/backfill/worker/tree.rs @@ -1,7 +1,7 @@ use crate::{ - gap::{TreeGapFill, TreeGapModel}, + backfill::gap::{TreeGapFill, TreeGapModel}, tree::TreeResponse, - BubblegumBackfillContext, + BubblegumContext, }; use anyhow::Result; use clap::Parser; @@ -27,13 +27,12 @@ pub struct TreeWorkerArgs { #[clap(flatten)] pub program_transformer_worker: ProgramTransformerWorkerArgs, + + #[clap(long, env, default_value = "false")] + pub force: bool, } impl TreeWorkerArgs { - pub fn start( - &self, - context: BubblegumBackfillContext, - tree: TreeResponse, - ) -> JoinHandle> { + pub fn start(&self, context: BubblegumContext, tree: TreeResponse) -> JoinHandle> { let db_pool = context.database_pool.clone(); let metadata_json_download_db_pool = context.database_pool.clone(); @@ -44,6 +43,7 @@ impl TreeWorkerArgs { let program_transformer_worker_args = self.program_transformer_worker.clone(); let signature_worker_args = self.signature_worker.clone(); let gap_worker_args = self.gap_worker.clone(); + let force = self.force; tokio::spawn(async move { let (metadata_json_download_worker, metadata_json_download_sender) = @@ -67,23 +67,31 @@ impl TreeWorkerArgs { .map(TryInto::try_into) .collect::, _>>()?; - let upper_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_desc(cl_audits_v2::Column::Seq) - .one(&conn) - .await?; - - let lower_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_asc(cl_audits_v2::Column::Seq) - .one(&conn) - .await?; + let upper_known_seq = if force { + None + } else { + cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_desc(cl_audits_v2::Column::Seq) + .one(&conn) + .await? + }; + + let lower_known_seq = if force { + None + } else { + cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_asc(cl_audits_v2::Column::Seq) + .one(&conn) + .await? + }; if let Some(upper_seq) = upper_known_seq { let signature = Signature::try_from(upper_seq.tx.as_ref())?; - gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); - } else if tree.seq > 0 { + // Reprocess the entire tree if force is true or if the tree has a seq of 0 to keep the current behavior + } else if force || tree.seq > 0 { gaps.push(TreeGapFill::new(tree.pubkey, None, None)); } @@ -98,9 +106,10 @@ impl TreeWorkerArgs { error!("send gap: {:?}", e); } } - drop(tree_gap_sender); } + drop(tree_gap_sender); + futures::future::try_join4( gap_worker, signature_worker, diff --git a/bubblegum-backfill/src/error.rs b/bubblegum/src/error.rs similarity index 100% rename from bubblegum-backfill/src/error.rs rename to bubblegum/src/error.rs diff --git a/bubblegum/src/lib.rs b/bubblegum/src/lib.rs new file mode 100644 index 0000000..9050a57 --- /dev/null +++ b/bubblegum/src/lib.rs @@ -0,0 +1,188 @@ +pub mod backfill; +mod error; +pub mod tree; + +use das_core::{MetadataJsonDownloadWorkerArgs, Rpc}; +pub use error::ErrorKind; +mod verify; +pub use verify::ProofReport; + +use anyhow::Result; +use backfill::worker::{ProgramTransformerWorkerArgs, SignatureWorkerArgs, TreeWorkerArgs}; +use clap::Parser; +use digital_asset_types::dao::cl_audits_v2; +use futures::{stream::FuturesUnordered, StreamExt}; +use sea_orm::ColumnTrait; +use sea_orm::QueryOrder; +use sea_orm::SqlxPostgresConnector; +use sea_orm::{EntityTrait, QueryFilter}; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use std::str::FromStr; +use tracing::error; +use tree::TreeResponse; + +#[derive(Clone)] +pub struct BubblegumContext { + pub database_pool: sqlx::PgPool, + pub solana_rpc: Rpc, +} + +impl BubblegumContext { + pub const fn new(database_pool: sqlx::PgPool, solana_rpc: Rpc) -> Self { + Self { + database_pool, + solana_rpc, + } + } +} + +#[derive(Debug, Parser, Clone)] +pub struct BackfillArgs { + /// Number of tree crawler workers + #[arg(long, env, default_value = "20")] + pub tree_crawler_count: usize, + + /// The list of trees to crawl. If not specified, all trees will be crawled. + #[arg(long, env, use_value_delimiter = true)] + pub only_trees: Option>, + + #[clap(flatten)] + pub tree_worker: TreeWorkerArgs, +} + +pub async fn start_backfill(context: BubblegumContext, args: BackfillArgs) -> Result<()> { + let trees = if let Some(ref only_trees) = args.only_trees { + TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? + } else { + TreeResponse::all(&context.solana_rpc).await? + }; + + let mut crawl_handles = FuturesUnordered::new(); + + for tree in trees { + if crawl_handles.len() >= args.tree_crawler_count { + crawl_handles.next().await; + } + let context = context.clone(); + let handle = args.tree_worker.start(context, tree); + + crawl_handles.push(handle); + } + + futures::future::try_join_all(crawl_handles).await?; + + Ok(()) +} + +#[derive(Debug, Parser, Clone)] +pub struct BubblegumReplayArgs { + /// The tree to replay. + #[arg(long, env)] + pub tree: String, + + /// The list of sequences to replay. If not specified, all sequences will be replayed. + #[arg(long, env, use_value_delimiter = true)] + pub only_sequences: Option>, + + #[clap(flatten)] + pub signature_worker: SignatureWorkerArgs, + + #[clap(flatten)] + pub program_transformer_worker: ProgramTransformerWorkerArgs, + + #[clap(flatten)] + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, +} + +pub async fn start_bubblegum_replay( + context: BubblegumContext, + args: BubblegumReplayArgs, +) -> Result<()> { + let pubkey = Pubkey::from_str(&args.tree) + .map(|pubkey| pubkey.to_bytes().to_vec()) + .map_err(|e| anyhow::anyhow!("Invalid tree pubkey: {:?}", e))?; + + let database_pool = context.database_pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool); + + let mut query = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(pubkey)) + .order_by_asc(cl_audits_v2::Column::Seq); + + if let Some(sequences) = args.only_sequences { + query = query.filter(cl_audits_v2::Column::Seq.is_in(sequences)); + } + + let cl_audits = query.all(&conn).await?; + + let metadata_json_download_worker_args = args.metadata_json_download_worker.clone(); + let program_transformer_worker_args = args.program_transformer_worker.clone(); + let signature_worker_args = args.signature_worker.clone(); + + let metadata_json_download_db_pool = context.database_pool.clone(); + let program_transformer_context = context.clone(); + let signature_context = context.clone(); + + let (metadata_json_download_worker, metadata_json_download_sender) = + metadata_json_download_worker_args.start(metadata_json_download_db_pool)?; + + let (program_transformer_worker, transaction_info_sender) = program_transformer_worker_args + .start(program_transformer_context, metadata_json_download_sender)?; + + let (signature_worker, signature_sender) = + signature_worker_args.start(signature_context, transaction_info_sender)?; + + for audit in cl_audits { + let signature = Signature::try_from(audit.tx.as_ref())?; + if let Err(e) = signature_sender.send(signature).await { + error!("send signature: {:?}", e); + } + } + + drop(signature_sender); + + futures::future::try_join3( + signature_worker, + program_transformer_worker, + metadata_json_download_worker, + ) + .await?; + + Ok(()) +} + +#[derive(Debug, Parser, Clone)] +pub struct VerifyArgs { + /// The list of trees to verify. If not specified, all trees will be crawled. + #[arg(long, env, use_value_delimiter = true)] + pub only_trees: Option>, + + #[arg(long, env, default_value = "20")] + pub max_concurrency: usize, +} + +pub async fn verify_bubblegum( + context: BubblegumContext, + args: VerifyArgs, +) -> Result> { + let trees = if let Some(ref only_trees) = args.only_trees { + TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? + } else { + TreeResponse::all(&context.solana_rpc).await? + }; + + let (sender, receiver) = tokio::sync::mpsc::channel(trees.len()); + + tokio::spawn(async move { + for tree in trees { + if let Ok(report) = verify::check(context.clone(), tree, args.max_concurrency).await { + if sender.send(report).await.is_err() { + error!("Failed to send report"); + } + } + } + }); + + Ok(receiver) +} diff --git a/bubblegum-backfill/src/tree.rs b/bubblegum/src/tree.rs similarity index 98% rename from bubblegum-backfill/src/tree.rs rename to bubblegum/src/tree.rs index efb5c1c..405c900 100644 --- a/bubblegum-backfill/src/tree.rs +++ b/bubblegum/src/tree.rs @@ -1,4 +1,4 @@ -use super::ErrorKind; +use super::error::ErrorKind; use anyhow::Result; use borsh::BorshDeserialize; use das_core::Rpc; @@ -10,7 +10,7 @@ use spl_account_compression::state::{ }; use std::str::FromStr; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct TreeHeaderResponse { pub max_depth: u32, pub max_buffer_size: u32, @@ -33,7 +33,6 @@ impl TryFrom for TreeHeaderResponse { } } -#[derive(Debug, Clone)] pub struct TreeResponse { pub pubkey: Pubkey, pub tree_header: TreeHeaderResponse, diff --git a/bubblegum/src/verify.rs b/bubblegum/src/verify.rs new file mode 100644 index 0000000..e99972c --- /dev/null +++ b/bubblegum/src/verify.rs @@ -0,0 +1,182 @@ +use super::BubblegumContext; +use crate::error::ErrorKind; +use crate::tree::TreeResponse; +use anyhow::{anyhow, Result}; +use digital_asset_types::dapi::get_proof_for_asset; +use digital_asset_types::rpc::AssetProof; +use futures::stream::{FuturesUnordered, StreamExt}; +use mpl_bubblegum::accounts::TreeConfig; +use sea_orm::SqlxPostgresConnector; +use sha3::{Digest, Keccak256}; +use solana_sdk::pubkey::Pubkey; +use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs; +use std::fmt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::debug; + +trait TryFromAssetProof { + fn try_from_asset_proof(proof: AssetProof) -> Result + where + Self: Sized; +} + +impl TryFromAssetProof for ProveLeafArgs { + fn try_from_asset_proof(proof: AssetProof) -> Result { + Ok(ProveLeafArgs { + current_root: bs58::decode(&proof.root) + .into_vec() + .map_err(|e| anyhow!(e))? + .try_into() + .map_err(|_| anyhow!("Invalid root length"))?, + leaf: bs58::decode(&proof.leaf) + .into_vec() + .map_err(|e| anyhow!(e))? + .try_into() + .map_err(|_| anyhow!("Invalid leaf length"))?, + proof_vec: proof + .proof + .iter() + .map(|p| { + bs58::decode(p) + .into_vec() + .map_err(|e| anyhow!(e)) + .and_then(|v| v.try_into().map_err(|_| anyhow!("Invalid proof length"))) + }) + .collect::>>()?, + index: proof.node_index as u32, + }) + } +} + +fn hash(left: &[u8], right: &[u8]) -> [u8; 32] { + let mut hasher = Keccak256::new(); + hasher.update(left); + hasher.update(right); + let result = hasher.finalize(); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&result); + hash +} + +fn verify_merkle_proof(proof: &ProveLeafArgs) -> bool { + let mut node = proof.leaf; + for (i, sibling) in proof.proof_vec.iter().enumerate() { + if (proof.index >> i) & 1 == 0 { + node = hash(&node, sibling); + } else { + node = hash(sibling, &node); + } + } + node == proof.current_root +} + +fn leaf_proof_result(proof: AssetProof) -> Result { + match ProveLeafArgs::try_from_asset_proof(proof) { + Ok(proof) if verify_merkle_proof(&proof) => Ok(ProofResult::Correct), + Ok(_) => Ok(ProofResult::Incorrect), + Err(_) => Ok(ProofResult::Corrupt), + } +} + +#[derive(Debug, Default)] +pub struct ProofReport { + pub tree_pubkey: Pubkey, + pub total_leaves: usize, + pub incorrect_proofs: usize, + pub not_found_proofs: usize, + pub correct_proofs: usize, + pub corrupt_proofs: usize, +} + +#[derive(Debug)] +enum ProofResult { + Correct, + Incorrect, + NotFound, + Corrupt, +} + +impl fmt::Display for ProofResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ProofResult::Correct => write!(f, "Correct proof found"), + ProofResult::Incorrect => write!(f, "Incorrect proof found"), + ProofResult::NotFound => write!(f, "Proof not found"), + ProofResult::Corrupt => write!(f, "Corrupt proof found"), + } + } +} + +pub async fn check( + context: BubblegumContext, + tree: TreeResponse, + max_concurrency: usize, +) -> Result { + let (tree_config_pubkey, _) = TreeConfig::find_pda(&tree.pubkey); + + let pool = context.database_pool.clone(); + + let account = context.solana_rpc.get_account(&tree_config_pubkey).await?; + let account = account + .value + .ok_or_else(|| ErrorKind::Generic("Account not found".to_string()))?; + + let tree_config = TreeConfig::from_bytes(account.data.as_slice())?; + + let report = Arc::new(Mutex::new(ProofReport { + tree_pubkey: tree.pubkey, + total_leaves: tree_config.num_minted as usize, + ..ProofReport::default() + })); + + let mut tasks = FuturesUnordered::new(); + + for i in 0..tree_config.num_minted { + if tasks.len() >= max_concurrency { + tasks.next().await; + } + + let db = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); + let tree_pubkey = tree.pubkey; + let report = Arc::clone(&report); + + tasks.push(tokio::spawn(async move { + let (asset, _) = Pubkey::find_program_address( + &[b"asset", &tree_pubkey.to_bytes(), &i.to_le_bytes()], + &mpl_bubblegum::ID, + ); + let proof_lookup: Result = + get_proof_for_asset(&db, asset.to_bytes().to_vec()) + .await + .map_or_else(|_| Ok(ProofResult::NotFound), leaf_proof_result); + + if let Ok(proof_result) = proof_lookup { + let mut report = report.lock().await; + + match proof_result { + ProofResult::Correct => report.correct_proofs += 1, + ProofResult::Incorrect => report.incorrect_proofs += 1, + ProofResult::NotFound => report.not_found_proofs += 1, + ProofResult::Corrupt => report.corrupt_proofs += 1, + } + + debug!( + tree = %tree_pubkey, + leaf_index = i, + asset = %asset, + result = ?proof_result, + "Proof result for asset" + ); + } + })); + } + + while tasks.next().await.is_some() {} + + let final_report = Arc::try_unwrap(report) + .expect("Failed to unwrap Arc") + .into_inner(); + + Ok(final_report) +} diff --git a/digital_asset_types/src/dao/generated/tokens.rs b/digital_asset_types/src/dao/generated/tokens.rs index 35fcad5..326b8d9 100644 --- a/digital_asset_types/src/dao/generated/tokens.rs +++ b/digital_asset_types/src/dao/generated/tokens.rs @@ -15,7 +15,7 @@ impl EntityName for Entity { #[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)] pub struct Model { pub mint: Vec, - pub supply: i64, + pub supply: Decimal, pub decimals: i32, pub token_program: Vec, pub mint_authority: Option>, diff --git a/digital_asset_types/src/dapi/change_logs.rs b/digital_asset_types/src/dapi/change_logs.rs index 7023ad1..fdd19dd 100644 --- a/digital_asset_types/src/dapi/change_logs.rs +++ b/digital_asset_types/src/dapi/change_logs.rs @@ -200,7 +200,7 @@ fn build_asset_proof( tree_id: Vec, leaf_node_idx: i64, leaf_hash: Vec, - req_indexes: &Vec, + req_indexes: &[i64], required_nodes: &[SimpleChangeLog], ) -> AssetProof { let mut final_node_list = vec![SimpleChangeLog::default(); req_indexes.len()]; @@ -211,7 +211,7 @@ fn build_asset_proof( } for (i, (n, nin)) in final_node_list .iter_mut() - .zip(req_indexes.clone()) + .zip(req_indexes.to_owned()) .enumerate() { if *n == SimpleChangeLog::default() { diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index f5b4664..0dcad71 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -116,7 +116,7 @@ pub fn create_asset( owner_type: Set(owner_type.clone()), delegate: Set(delegate.clone()), frozen: Set(frozen), - supply: Set(supply), + supply: Set(supply.into()), supply_mint: Set(supply_mint.clone()), compressed: Set(compressed), compressible: Set(compressible), @@ -135,7 +135,7 @@ pub fn create_asset( owner_type, delegate, frozen, - supply, + supply: supply.into(), supply_mint, compressed, compressible, diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml index 54ea735..da0848c 100644 --- a/program_transformers/Cargo.toml +++ b/program_transformers/Cargo.toml @@ -20,6 +20,7 @@ heck = { workspace = true } mpl-bubblegum = { workspace = true } num-traits = { workspace = true } sea-orm = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/program_transformers/src/asset_upserts.rs b/program_transformers/src/asset_upserts.rs index a82923a..913748d 100644 --- a/program_transformers/src/asset_upserts.rs +++ b/program_transformers/src/asset_upserts.rs @@ -10,6 +10,7 @@ use { TransactionTrait, }, serde_json::value::Value, + sqlx::types::Decimal, }; pub struct AssetTokenAccountColumns { @@ -54,7 +55,7 @@ pub async fn upsert_assets_token_account_columns, - pub supply: u64, + pub supply: i64, pub supply_mint: Option>, pub slot_updated_mint_account: u64, } @@ -65,7 +66,7 @@ pub async fn upsert_assets_mint_account_columns Result<(), DbErr> { let active_model = asset::ActiveModel { id: Set(columns.mint), - supply: Set(columns.supply as i64), + supply: Set(columns.supply), supply_mint: Set(columns.supply_mint), slot_updated_mint_account: Set(Some(columns.slot_updated_mint_account as i64)), ..Default::default() diff --git a/program_transformers/src/bubblegum/burn.rs b/program_transformers/src/bubblegum/burn.rs index d0d45cc..6a18ae5 100644 --- a/program_transformers/src/bubblegum/burn.rs +++ b/program_transformers/src/bubblegum/burn.rs @@ -23,14 +23,12 @@ pub async fn burn<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/program_transformers/src/bubblegum/cancel_redeem.rs b/program_transformers/src/bubblegum/cancel_redeem.rs index 450bde1..28ea169 100644 --- a/program_transformers/src/bubblegum/cancel_redeem.rs +++ b/program_transformers/src/bubblegum/cancel_redeem.rs @@ -18,14 +18,12 @@ pub async fn cancel_redeem<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; match le.schema { LeafSchema::V1 { id, diff --git a/program_transformers/src/bubblegum/collection_verification.rs b/program_transformers/src/bubblegum/collection_verification.rs index 39284ff..27a3ed4 100644 --- a/program_transformers/src/bubblegum/collection_verification.rs +++ b/program_transformers/src/bubblegum/collection_verification.rs @@ -20,7 +20,6 @@ pub async fn process<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, @@ -44,8 +43,7 @@ where "Handling collection verification event for {} (verify: {}): {}", collection, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; diff --git a/program_transformers/src/bubblegum/creator_verification.rs b/program_transformers/src/bubblegum/creator_verification.rs index 22646ec..8281e71 100644 --- a/program_transformers/src/bubblegum/creator_verification.rs +++ b/program_transformers/src/bubblegum/creator_verification.rs @@ -20,7 +20,6 @@ pub async fn process<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, @@ -60,8 +59,7 @@ where "Handling creator verification event for creator {} (verify: {}): {}", creator, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; match le.schema { LeafSchema::V1 { diff --git a/program_transformers/src/bubblegum/db.rs b/program_transformers/src/bubblegum/db.rs index ca617fa..1e0f71b 100644 --- a/program_transformers/src/bubblegum/db.rs +++ b/program_transformers/src/bubblegum/db.rs @@ -1,8 +1,8 @@ use { crate::error::{ProgramTransformerError, ProgramTransformerResult}, + das_core::DownloadMetadataInfo, digital_asset_types::dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, - cl_audits_v2, cl_items, + asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items, sea_orm_active_enums::{ ChainMutability, Instruction, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, @@ -10,13 +10,14 @@ use { }, mpl_bubblegum::types::{Collection, Creator}, sea_orm::{ - entity::{ActiveValue, ColumnTrait, EntityTrait}, - query::{JsonValue, QueryFilter, QuerySelect, QueryTrait}, + entity::{ActiveValue, EntityTrait}, + prelude::*, + query::{JsonValue, QueryTrait}, sea_query::query::OnConflict, ConnectionTrait, DbBackend, TransactionTrait, }, spl_account_compression::events::ChangeLogEventV1, - tracing::{debug, error, info}, + tracing::{debug, error}, }; pub async fn save_changelog_event<'c, T>( @@ -25,12 +26,11 @@ pub async fn save_changelog_event<'c, T>( txn_id: &str, txn: &T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult where T: ConnectionTrait + TransactionTrait, { - insert_change_log(change_log_event, slot, txn_id, txn, instruction, cl_audits).await?; + insert_change_log(change_log_event, slot, txn_id, txn, instruction).await?; Ok(change_log_event.seq) } @@ -40,19 +40,17 @@ const fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 { pub async fn insert_change_log<'c, T>( change_log_event: &ChangeLogEventV1, - slot: u64, + _slot: u64, txn_id: &str, txn: &T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { - let mut i: i64 = 0; let depth = change_log_event.path.len() - 1; let tree_id = change_log_event.id.as_ref(); - for p in change_log_event.path.iter() { + for (i, p) in (0_i64..).zip(change_log_event.path.iter()) { let node_idx = p.index as i64; debug!( "seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}", @@ -79,7 +77,6 @@ where ..Default::default() }; - i += 1; let mut query = cl_items::Entity::insert(item) .on_conflict( OnConflict::columns([cl_items::Column::Tree, cl_items::Column::NodeIdx]) @@ -98,71 +95,38 @@ where .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; } - // Insert the audit item after the insert into cl_items have been completed - if cl_audits { - let tx_id_bytes = bs58::decode(txn_id) - .into_vec() - .map_err(|_e| ProgramTransformerError::ChangeLogEventMalformed)?; - let ix = Instruction::from(instruction); - if ix == Instruction::Unknown { - error!("Unknown instruction: {}", instruction); - } - let audit_item_v2 = cl_audits_v2::ActiveModel { - tree: ActiveValue::Set(tree_id.to_vec()), - leaf_idx: ActiveValue::Set(change_log_event.index as i64), - seq: ActiveValue::Set(change_log_event.seq as i64), - tx: ActiveValue::Set(tx_id_bytes), - instruction: ActiveValue::Set(ix), - ..Default::default() - }; - let query = cl_audits_v2::Entity::insert(audit_item_v2) - .on_conflict( - OnConflict::columns([ - cl_audits_v2::Column::Tree, - cl_audits_v2::Column::LeafIdx, - cl_audits_v2::Column::Seq, - ]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - match txn.execute(query).await { - Ok(_) => {} - Err(e) => { - error!("Error while inserting into cl_audits_v2: {:?}", e); - } - } + let tx_id_bytes = bs58::decode(txn_id) + .into_vec() + .map_err(|_e| ProgramTransformerError::ChangeLogEventMalformed)?; + let ix = Instruction::from(instruction); + if ix == Instruction::Unknown { + error!("Unknown instruction: {}", instruction); } + let audit_item_v2 = cl_audits_v2::ActiveModel { + tree: ActiveValue::Set(tree_id.to_vec()), + leaf_idx: ActiveValue::Set(change_log_event.index as i64), + seq: ActiveValue::Set(change_log_event.seq as i64), + tx: ActiveValue::Set(tx_id_bytes), + instruction: ActiveValue::Set(ix), + ..Default::default() + }; - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - println!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: ActiveValue::Set(tree_id.to_vec()), - seq: ActiveValue::Set(change_log_event.seq as i64), - slot: ActiveValue::Set(slot as i64), - force_chk: ActiveValue::Set(force_chk), - backfilled: ActiveValue::Set(false), - failed: ActiveValue::Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; + let query = cl_audits_v2::Entity::insert(audit_item_v2) + .on_conflict( + OnConflict::columns([ + cl_audits_v2::Column::Tree, + cl_audits_v2::Column::LeafIdx, + cl_audits_v2::Column::Seq, + ]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + match txn.execute(query).await { + Ok(_) => {} + Err(e) => { + error!("Error while inserting into cl_audits_v2: {:?}", e); + } } Ok(()) @@ -273,7 +237,7 @@ pub async fn upsert_asset_with_compression_info( id: Vec, compressed: bool, compressible: bool, - supply: i64, + supply: u64, supply_mint: Option>, ) -> ProgramTransformerResult<()> where @@ -283,7 +247,7 @@ where id: ActiveValue::Set(id), compressed: ActiveValue::Set(compressed), compressible: ActiveValue::Set(compressible), - supply: ActiveValue::Set(supply), + supply: ActiveValue::Set(supply as i64), supply_mint: ActiveValue::Set(supply_mint), ..Default::default() }; @@ -406,13 +370,11 @@ pub async fn upsert_asset_data( chain_data: JsonValue, metadata_url: String, metadata_mutability: Mutability, - metadata: JsonValue, slot_updated: i64, - reindex: Option, raw_name: Vec, raw_symbol: Vec, seq: i64, -) -> ProgramTransformerResult<()> +) -> ProgramTransformerResult> where T: ConnectionTrait + TransactionTrait, { @@ -420,11 +382,11 @@ where id: ActiveValue::Set(id.clone()), chain_data_mutability: ActiveValue::Set(chain_data_mutability), chain_data: ActiveValue::Set(chain_data), - metadata_url: ActiveValue::Set(metadata_url), + metadata_url: ActiveValue::Set(metadata_url.clone()), metadata_mutability: ActiveValue::Set(metadata_mutability), - metadata: ActiveValue::Set(metadata), + metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), slot_updated: ActiveValue::Set(slot_updated), - reindex: ActiveValue::Set(reindex), + reindex: ActiveValue::Set(Some(true)), raw_name: ActiveValue::Set(Some(raw_name)), raw_symbol: ActiveValue::Set(Some(raw_symbol)), base_info_seq: ActiveValue::Set(Some(seq)), @@ -438,9 +400,7 @@ where asset_data::Column::ChainData, asset_data::Column::MetadataUrl, asset_data::Column::MetadataMutability, - // Don't update asset_data::Column::Metadata if it already exists. Even if we - // are indexing `update_metadata`` and there's a new URI, the new background - // task will overwrite it. + asset_data::Column::Metadata, asset_data::Column::SlotUpdated, asset_data::Column::Reindex, asset_data::Column::RawName, @@ -453,15 +413,26 @@ where // Do not overwrite changes that happened after decompression (asset_data.base_info_seq = 0). // Do not overwrite changes from a later Bubblegum instruction. + // Do not update the record if the incoming slot is larger than the current or if it's null. + // Update if the current slot on the record is null. query.sql = format!( - "{} WHERE (asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL", + "{} WHERE ((asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL) AND (excluded.slot_updated <= asset_data.slot_updated OR asset_data.slot_updated IS NULL)", query.sql ); - txn.execute(query) + + let result = txn + .execute(query) .await .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; - Ok(()) + if result.rows_affected() > 0 { + Ok(Some(DownloadMetadataInfo::new( + id, + metadata_url, + ))) + } else { + Ok(None) + } } #[allow(clippy::too_many_arguments)] diff --git a/program_transformers/src/bubblegum/delegate.rs b/program_transformers/src/bubblegum/delegate.rs index b491ab5..15b50a6 100644 --- a/program_transformers/src/bubblegum/delegate.rs +++ b/program_transformers/src/bubblegum/delegate.rs @@ -18,14 +18,12 @@ pub async fn delegate<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; match le.schema { LeafSchema::V1 { id, diff --git a/program_transformers/src/bubblegum/mint_v1.rs b/program_transformers/src/bubblegum/mint_v1.rs index 641156c..47193ee 100644 --- a/program_transformers/src/bubblegum/mint_v1.rs +++ b/program_transformers/src/bubblegum/mint_v1.rs @@ -24,7 +24,7 @@ use { }, json::ChainDataV1, }, - sea_orm::{query::JsonValue, ConnectionTrait, TransactionTrait}, + sea_orm::{ConnectionTrait, TransactionTrait}, tracing::warn, }; @@ -33,7 +33,6 @@ pub async fn mint_v1<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult> where T: ConnectionTrait + TransactionTrait, @@ -51,8 +50,7 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let metadata = args; #[allow(unreachable_patterns)] return match le.schema { @@ -93,16 +91,14 @@ where // automatically rolled back. let multi_txn = txn.begin().await?; - upsert_asset_data( + let download_metadata_info = upsert_asset_data( &multi_txn, id_bytes.to_vec(), chain_mutability, chain_data_json, uri.clone(), Mutability::Mutable, - JsonValue::String("processing".to_string()), slot_i, - Some(true), name.to_vec(), symbol.to_vec(), seq as i64, @@ -209,7 +205,7 @@ where return Ok(None); } - Ok(Some(DownloadMetadataInfo::new(id_bytes.to_vec(), uri))) + Ok(download_metadata_info) } _ => Err(ProgramTransformerError::NotImplemented), }; diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs index dc108a0..03a8008 100644 --- a/program_transformers/src/bubblegum/mod.rs +++ b/program_transformers/src/bubblegum/mod.rs @@ -11,7 +11,7 @@ use { token_metadata::types::UseMethod as TokenMetadataUseMethod, }, sea_orm::{ConnectionTrait, TransactionTrait}, - tracing::{debug, info}, + tracing::debug, }; mod burn; @@ -30,7 +30,6 @@ pub async fn handle_bubblegum_instruction<'c, T>( bundle: &'c InstructionBundle<'c>, txn: &T, download_metadata_notifier: &DownloadMetadataNotifier, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, @@ -59,50 +58,46 @@ where InstructionName::SetDecompressibleState => "SetDecompressibleState", InstructionName::UpdateMetadata => "UpdateMetadata", }; - println!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); + debug!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); match ix_type { InstructionName::Transfer => { - transfer::transfer(parsing_result, bundle, txn, ix_str, cl_audits).await?; + transfer::transfer(parsing_result, bundle, txn, ix_str).await?; } InstructionName::Burn => { - burn::burn(parsing_result, bundle, txn, ix_str, cl_audits).await?; + burn::burn(parsing_result, bundle, txn, ix_str).await?; } InstructionName::Delegate => { - delegate::delegate(parsing_result, bundle, txn, ix_str, cl_audits).await?; + delegate::delegate(parsing_result, bundle, txn, ix_str).await?; } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - if let Some(info) = - mint_v1::mint_v1(parsing_result, bundle, txn, ix_str, cl_audits).await? - { + if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? { download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; } } InstructionName::Redeem => { - redeem::redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; + redeem::redeem(parsing_result, bundle, txn, ix_str).await?; } InstructionName::CancelRedeem => { - cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; + cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str).await?; } InstructionName::DecompressV1 => { debug!("No action necessary for decompression") } InstructionName::VerifyCreator | InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, ix_str, cl_audits).await?; + creator_verification::process(parsing_result, bundle, txn, ix_str).await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, ix_str, cl_audits) - .await?; + collection_verification::process(parsing_result, bundle, txn, ix_str).await?; } InstructionName::SetDecompressibleState => (), // Nothing to index. InstructionName::UpdateMetadata => { if let Some(info) = - update_metadata::update_metadata(parsing_result, bundle, txn, ix_str, cl_audits) - .await? + update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await? { download_metadata_notifier(info) .await diff --git a/program_transformers/src/bubblegum/redeem.rs b/program_transformers/src/bubblegum/redeem.rs index e6a6080..22caaf3 100644 --- a/program_transformers/src/bubblegum/redeem.rs +++ b/program_transformers/src/bubblegum/redeem.rs @@ -17,14 +17,12 @@ pub async fn redeem<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/program_transformers/src/bubblegum/transfer.rs b/program_transformers/src/bubblegum/transfer.rs index 9aa6a33..617efdf 100644 --- a/program_transformers/src/bubblegum/transfer.rs +++ b/program_transformers/src/bubblegum/transfer.rs @@ -18,14 +18,12 @@ pub async fn transfer<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; match le.schema { LeafSchema::V1 { id, @@ -79,6 +77,7 @@ where } } } + Err(ProgramTransformerError::ParsingError( "Ix not parsed correctly".to_string(), )) diff --git a/program_transformers/src/bubblegum/update_metadata.rs b/program_transformers/src/bubblegum/update_metadata.rs index fd01a93..dff719f 100644 --- a/program_transformers/src/bubblegum/update_metadata.rs +++ b/program_transformers/src/bubblegum/update_metadata.rs @@ -22,7 +22,7 @@ use { }, json::ChainDataV1, }, - sea_orm::{query::*, ConnectionTrait, JsonValue}, + sea_orm::{query::*, ConnectionTrait}, tracing::warn, }; @@ -31,7 +31,6 @@ pub async fn update_metadata<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, instruction: &str, - cl_audits: bool, ) -> ProgramTransformerResult> where T: ConnectionTrait + TransactionTrait, @@ -49,8 +48,7 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) - .await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; #[allow(unreachable_patterns)] return match le.schema { @@ -116,16 +114,14 @@ where // automatically rolled back. let multi_txn = txn.begin().await?; - upsert_asset_data( + let download_metadata_info = upsert_asset_data( &multi_txn, id_bytes.to_vec(), chain_mutability, chain_data_json, uri.clone(), Mutability::Mutable, - JsonValue::String("processing".to_string()), slot_i, - Some(true), name.into_bytes().to_vec(), symbol.into_bytes().to_vec(), seq as i64, @@ -190,7 +186,7 @@ where return Ok(None); } - Ok(Some(DownloadMetadataInfo::new(id_bytes.to_vec(), uri))) + Ok(download_metadata_info) } _ => Err(ProgramTransformerError::NotImplemented), }; diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index 7abdd18..bd774da 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -2,9 +2,6 @@ use { crate::{ bubblegum::handle_bubblegum_instruction, error::{ProgramTransformerError, ProgramTransformerResult}, - mpl_core_program::handle_mpl_core_account, - token::handle_token_program_account, - token_metadata::handle_token_metadata_account, }, blockbuster::{ instruction::{order_instructions, InstructionBundle, IxPair}, @@ -20,6 +17,7 @@ use { entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, }, + serde::Deserialize, solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature}, solana_transaction_status::InnerInstructions, sqlx::PgPool, @@ -29,13 +27,10 @@ use { }; mod asset_upserts; -mod bubblegum; +pub mod bubblegum; pub mod error; -mod mpl_core_program; -mod token; -mod token_metadata; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] pub struct AccountInfo { pub slot: u64, pub pubkey: Pubkey, @@ -43,7 +38,7 @@ pub struct AccountInfo { pub data: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] pub struct TransactionInfo { pub slot: u64, pub signature: Signature, @@ -57,15 +52,10 @@ pub struct ProgramTransformer { download_metadata_notifier: DownloadMetadataNotifier, parsers: HashMap>, key_set: HashSet, - cl_audits: bool, } impl ProgramTransformer { - pub fn new( - pool: PgPool, - download_metadata_notifier: DownloadMetadataNotifier, - cl_audits: bool, - ) -> Self { + pub fn new(pool: PgPool, download_metadata_notifier: DownloadMetadataNotifier) -> Self { let mut parsers: HashMap> = HashMap::with_capacity(3); let bgum = BubblegumParser {}; let token_metadata = TokenMetadataParser {}; @@ -85,7 +75,6 @@ impl ProgramTransformer { download_metadata_notifier, parsers, key_set: hs, - cl_audits, } } @@ -110,7 +99,7 @@ impl ProgramTransformer { &self, tx_info: &TransactionInfo, ) -> ProgramTransformerResult<()> { - println!("Handling Transaction: {:?}", tx_info.signature); + info!("Handling Transaction: {:?}", tx_info.signature); let instructions = self.break_transaction(tx_info); let mut not_impl = 0; let ixlen = instructions.len(); @@ -159,7 +148,6 @@ impl ProgramTransformer { &ix, &self.storage, &self.download_metadata_notifier, - self.cl_audits, ) .await .map_err(|err| { @@ -178,7 +166,10 @@ impl ProgramTransformer { } if not_impl == ixlen { - debug!("Not imple"); + debug!( + "Not implemented for transaction signature: {:?}", + tx_info.signature + ); return Err(ProgramTransformerError::NotImplemented); } Ok(()) @@ -191,33 +182,6 @@ impl ProgramTransformer { if let Some(program) = self.match_program(&account_info.owner) { let result = program.handle_account(&account_info.data)?; match result.result_type() { - ProgramParseResult::TokenMetadata(parsing_result) => { - handle_token_metadata_account( - account_info, - parsing_result, - &self.storage, - &self.download_metadata_notifier, - ) - .await - } - ProgramParseResult::TokenProgramAccount(parsing_result) => { - handle_token_program_account( - account_info, - parsing_result, - &self.storage, - &self.download_metadata_notifier, - ) - .await - } - ProgramParseResult::MplCore(parsing_result) => { - handle_mpl_core_account( - account_info, - parsing_result, - &self.storage, - &self.download_metadata_notifier, - ) - .await - } _ => Err(ProgramTransformerError::NotImplemented), }?; } diff --git a/program_transformers/src/mpl_core_program/mod.rs b/program_transformers/src/mpl_core_program/mod.rs deleted file mode 100644 index b01252d..0000000 --- a/program_transformers/src/mpl_core_program/mod.rs +++ /dev/null @@ -1,42 +0,0 @@ -use { - crate::{ - error::{ProgramTransformerError, ProgramTransformerResult}, - mpl_core_program::v1_asset::{burn_v1_asset, save_v1_asset}, - AccountInfo, DownloadMetadataNotifier, - }, - blockbuster::programs::mpl_core_program::{MplCoreAccountData, MplCoreAccountState}, - sea_orm::DatabaseConnection, -}; - -mod v1_asset; - -pub async fn handle_mpl_core_account<'a, 'b, 'c>( - account_info: &AccountInfo, - parsing_result: &'a MplCoreAccountState, - db: &'b DatabaseConnection, - download_metadata_notifier: &DownloadMetadataNotifier, -) -> ProgramTransformerResult<()> { - match &parsing_result.data { - MplCoreAccountData::EmptyAccount => { - burn_v1_asset(db, account_info.pubkey, account_info.slot).await?; - Ok(()) - } - MplCoreAccountData::Asset(_) | MplCoreAccountData::Collection(_) => { - if let Some(info) = save_v1_asset( - db, - account_info.pubkey, - &parsing_result.data, - account_info.slot, - ) - .await? - { - download_metadata_notifier(info) - .await - .map_err(ProgramTransformerError::DownloadMetadataNotify)?; - } - Ok(()) - } - _ => Err(ProgramTransformerError::NotImplemented), - }?; - Ok(()) -} diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs deleted file mode 100644 index 6714508..0000000 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ /dev/null @@ -1,642 +0,0 @@ -use { - crate::{ - asset_upserts::{ - upsert_assets_metadata_account_columns, upsert_assets_mint_account_columns, - upsert_assets_token_account_columns, AssetMetadataAccountColumns, - AssetMintAccountColumns, AssetTokenAccountColumns, - }, - error::{ProgramTransformerError, ProgramTransformerResult}, - find_model_with_retry, DownloadMetadataInfo, - }, - blockbuster::{ - mpl_core::types::{Plugin, PluginAuthority, PluginType, UpdateAuthority}, - programs::mpl_core_program::MplCoreAccountData, - }, - digital_asset_types::{ - dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, - sea_orm_active_enums::{ - ChainMutability, Mutability, OwnerType, SpecificationAssetClass, - }, - }, - json::ChainDataV1, - }, - heck::ToSnakeCase, - sea_orm::{ - entity::{ActiveValue, ColumnTrait, EntityTrait}, - query::{JsonValue, QueryFilter, QueryTrait}, - sea_query::query::OnConflict, - sea_query::Expr, - ConnectionTrait, CursorTrait, DbBackend, TransactionTrait, - }, - serde_json::{value::Value, Map}, - solana_sdk::pubkey::Pubkey, - tracing::warn, -}; - -pub async fn burn_v1_asset( - conn: &T, - id: Pubkey, - slot: u64, -) -> ProgramTransformerResult<()> { - let slot_i = slot as i64; - let model = asset::ActiveModel { - id: ActiveValue::Set(id.to_bytes().to_vec()), - slot_updated: ActiveValue::Set(Some(slot_i)), - burnt: ActiveValue::Set(true), - ..Default::default() - }; - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset::Column::Id]) - .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; - Ok(()) -} - -const RETRY_INTERVALS: &[u64] = &[0, 5, 10]; - -pub async fn save_v1_asset( - conn: &T, - id: Pubkey, - account_data: &MplCoreAccountData, - slot: u64, -) -> ProgramTransformerResult> { - // Notes: - // The address of the Core asset is used for Core Asset ID. There are no token or mint accounts. - // There are no `MasterEdition` or `Edition` accounts associated with Core assets. - let id_array = id.to_bytes(); - let id_vec = id_array.to_vec(); - - // Note: This indexes both Core Assets and Core Collections. - let asset = match account_data { - MplCoreAccountData::Asset(indexable_asset) - | MplCoreAccountData::Collection(indexable_asset) => indexable_asset, - _ => return Err(ProgramTransformerError::NotImplemented), - }; - - //----------------------- - // Asset authority table - //----------------------- - - // If it is an `Address` type, use the value directly. If it is a `Collection`, search for and - // use the collection's authority. - let update_authority = match asset.update_authority { - UpdateAuthority::Address(address) => address.to_bytes().to_vec(), - UpdateAuthority::Collection(address) => find_model_with_retry( - conn, - "mpl_core", - &asset_authority::Entity::find() - .filter(asset_authority::Column::AssetId.eq(address.to_bytes().to_vec())), - RETRY_INTERVALS, - ) - .await? - .map(|model| model.authority) - .unwrap_or_default(), - UpdateAuthority::None => Pubkey::default().to_bytes().to_vec(), - }; - - let slot_i = slot as i64; - - let txn = conn.begin().await?; - - let model = asset_authority::ActiveModel { - asset_id: ActiveValue::Set(id_vec.clone()), - authority: ActiveValue::Set(update_authority.clone()), - seq: ActiveValue::Set(0), - slot_updated: ActiveValue::Set(slot_i), - ..Default::default() - }; - - let mut query = asset_authority::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) - .update_columns([ - asset_authority::Column::Authority, - asset_authority::Column::Seq, - asset_authority::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - if matches!(account_data, MplCoreAccountData::Collection(_)) { - update_group_asset_authorities(conn, id_vec.clone(), update_authority.clone(), slot_i) - .await?; - } - - //----------------------- - // asset_data table - //----------------------- - - let name = asset.name.clone().into_bytes(); - let uri = asset.uri.trim().replace('\0', ""); - - // Notes: - // There is no symbol for a Core asset. - // Edition nonce hardcoded to `None`. - // There is no primary sale concept for Core Assets, hardcoded to `false`. - // Token standard is hardcoded to `None`. - let mut chain_data = ChainDataV1 { - name: asset.name.clone(), - symbol: "".to_string(), - edition_nonce: None, - primary_sale_happened: false, - token_standard: None, - uses: None, - }; - - chain_data.sanitize(); - let chain_data_json = serde_json::to_value(chain_data) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - - // Note: - // Mutability set based on core asset data having an update authority. - // Individual plugins could have some or no authority giving them individual mutability status. - let chain_mutability = match asset.update_authority { - UpdateAuthority::None => ChainMutability::Immutable, - _ => ChainMutability::Mutable, - }; - - let asset_data_model = asset_data::ActiveModel { - chain_data_mutability: ActiveValue::Set(chain_mutability), - chain_data: ActiveValue::Set(chain_data_json), - metadata_url: ActiveValue::Set(uri.clone()), - metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), - metadata_mutability: ActiveValue::Set(Mutability::Mutable), - slot_updated: ActiveValue::Set(slot_i), - reindex: ActiveValue::Set(Some(true)), - id: ActiveValue::Set(id_vec.clone()), - raw_name: ActiveValue::Set(Some(name.to_vec())), - raw_symbol: ActiveValue::Set(None), - base_info_seq: ActiveValue::Set(Some(0)), - }; - - let mut query = asset_data::Entity::insert(asset_data_model) - .on_conflict( - OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - asset_data::Column::BaseInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - //----------------------- - // asset table - //----------------------- - - let ownership_type = OwnerType::Single; - let (owner, class) = match account_data { - MplCoreAccountData::Asset(_) => ( - asset.owner.map(|owner| owner.to_bytes().to_vec()), - SpecificationAssetClass::MplCoreAsset, - ), - MplCoreAccountData::Collection(_) => ( - Some(update_authority.clone()), - SpecificationAssetClass::MplCoreCollection, - ), - _ => return Err(ProgramTransformerError::NotImplemented), - }; - - // Get royalty amount and creators from `Royalties` plugin if available. - let default_creators = Vec::new(); - let (royalty_amount, creators) = asset - .plugins - .get(&PluginType::Royalties) - .and_then(|plugin_schema| { - if let Plugin::Royalties(royalties) = &plugin_schema.data { - Some((royalties.basis_points, &royalties.creators)) - } else { - None - } - }) - .unwrap_or((0, &default_creators)); - - // Serialize known plugins into JSON. - let mut plugins_json = serde_json::to_value(&asset.plugins) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - - // Improve JSON output. - remove_plugins_nesting(&mut plugins_json, "data"); - transform_plugins_authority(&mut plugins_json); - convert_keys_to_snake_case(&mut plugins_json); - - // Serialize any unknown plugins into JSON. - let unknown_plugins_json = if !asset.unknown_plugins.is_empty() { - let mut unknown_plugins_json = serde_json::to_value(&asset.unknown_plugins) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - - // Improve JSON output. - transform_plugins_authority(&mut unknown_plugins_json); - convert_keys_to_snake_case(&mut unknown_plugins_json); - - Some(unknown_plugins_json) - } else { - None - }; - - // Serialize known external plugins into JSON. - let mut external_plugins_json = serde_json::to_value(&asset.external_plugins) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - - // Improve JSON output. - remove_plugins_nesting(&mut external_plugins_json, "adapter_config"); - transform_plugins_authority(&mut external_plugins_json); - convert_keys_to_snake_case(&mut external_plugins_json); - - // Serialize any unknown external plugins into JSON. - let unknown_external_plugins_json = if !asset.unknown_external_plugins.is_empty() { - let mut unknown_external_plugins_json = - serde_json::to_value(&asset.unknown_external_plugins) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - - // Improve JSON output. - transform_plugins_authority(&mut unknown_external_plugins_json); - convert_keys_to_snake_case(&mut unknown_external_plugins_json); - - Some(unknown_external_plugins_json) - } else { - None - }; - - upsert_assets_metadata_account_columns( - AssetMetadataAccountColumns { - mint: id_vec.clone(), - owner_type: ownership_type, - specification_asset_class: Some(class), - royalty_amount: royalty_amount as i32, - asset_data: Some(id_vec.clone()), - slot_updated_metadata_account: slot, - mpl_core_plugins: Some(plugins_json), - mpl_core_unknown_plugins: unknown_plugins_json, - mpl_core_collection_num_minted: asset.num_minted.map(|val| val as i32), - mpl_core_collection_current_size: asset.current_size.map(|val| val as i32), - mpl_core_plugins_json_version: Some(1), - mpl_core_external_plugins: Some(external_plugins_json), - mpl_core_unknown_external_plugins: unknown_external_plugins_json, - }, - &txn, - ) - .await?; - - let supply = 1; - - // Note: these need to be separate for Token Metadata but here could be one upsert. - upsert_assets_mint_account_columns( - AssetMintAccountColumns { - mint: id_vec.clone(), - supply_mint: None, - supply, - slot_updated_mint_account: slot, - }, - &txn, - ) - .await?; - - // Get transfer delegate from `TransferDelegate` plugin if available. - let transfer_delegate = - asset - .plugins - .get(&PluginType::TransferDelegate) - .and_then(|plugin_schema| match &plugin_schema.authority { - PluginAuthority::Owner => owner.clone(), - PluginAuthority::UpdateAuthority => Some(update_authority.clone()), - PluginAuthority::Address { address } => Some(address.to_bytes().to_vec()), - PluginAuthority::None => None, - }); - - // Get frozen status from `FreezeDelegate` plugin if available. - let frozen = asset - .plugins - .get(&PluginType::FreezeDelegate) - .and_then(|plugin_schema| { - if let Plugin::FreezeDelegate(freeze_delegate) = &plugin_schema.data { - Some(freeze_delegate.frozen) - } else { - None - } - }) - .unwrap_or(false); - - // TODO: these upserts needed to be separate for Token Metadata but here could be one upsert. - upsert_assets_token_account_columns( - AssetTokenAccountColumns { - mint: id_vec.clone(), - owner, - frozen, - // Note use transfer delegate for the existing delegate field. - delegate: transfer_delegate.clone(), - slot_updated_token_account: Some(slot_i), - }, - &txn, - ) - .await?; - - //----------------------- - // asset_grouping table - //----------------------- - - if let UpdateAuthority::Collection(address) = asset.update_authority { - let model = asset_grouping::ActiveModel { - asset_id: ActiveValue::Set(id_vec.clone()), - group_key: ActiveValue::Set("collection".to_string()), - group_value: ActiveValue::Set(Some(address.to_string())), - // Note all Core assets in a collection are verified. - verified: ActiveValue::Set(true), - group_info_seq: ActiveValue::Set(Some(0)), - slot_updated: ActiveValue::Set(Some(slot_i)), - ..Default::default() - }; - let mut query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - } - - //----------------------- - // creators table - //----------------------- - - let creators = creators - .iter() - .enumerate() - .map(|(i, creator)| asset_creators::ActiveModel { - asset_id: ActiveValue::Set(id_vec.clone()), - position: ActiveValue::Set(i as i16), - creator: ActiveValue::Set(creator.address.to_bytes().to_vec()), - share: ActiveValue::Set(creator.percentage as i32), - // Note all creators are verified for Core Assets. - verified: ActiveValue::Set(true), - slot_updated: ActiveValue::Set(Some(slot_i)), - seq: ActiveValue::Set(Some(0)), - ..Default::default() - }) - .collect::>(); - - if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Position, - ]) - .update_columns([ - asset_creators::Column::Creator, - asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - } - - // Commit the database transaction. - txn.commit().await?; - - // Return early if there is no URI. - if uri.is_empty() { - warn!( - "URI is empty for mint {}. Skipping background task.", - bs58::encode(id_vec.clone()).into_string() - ); - return Ok(None); - } - - // Otherwise return with info for background downloading. - Ok(Some(DownloadMetadataInfo::new(id_vec.clone(), uri))) -} - -// Modify the JSON structure to remove the `Plugin` name and just display its data. -// For example, this will transform `FreezeDelegate` JSON from: -// "data":{"freeze_delegate":{"frozen":false}}} -// to: -// "data":{"frozen":false} -fn remove_plugins_nesting(plugins_json: &mut Value, nested_key: &str) { - match plugins_json { - Value::Object(plugins) => { - // Handle the case where plugins_json is an object. - for (_, plugin) in plugins.iter_mut() { - remove_nesting_from_plugin(plugin, nested_key); - } - } - Value::Array(plugins_array) => { - // Handle the case where plugins_json is an array. - for plugin in plugins_array.iter_mut() { - remove_nesting_from_plugin(plugin, nested_key); - } - } - _ => {} - } -} - -fn remove_nesting_from_plugin(plugin: &mut Value, nested_key: &str) { - if let Some(Value::Object(nested_key)) = plugin.get_mut(nested_key) { - // Extract the plugin data and remove it. - if let Some((_, inner_plugin_data)) = nested_key.iter().next() { - let inner_plugin_data_clone = inner_plugin_data.clone(); - // Clear the `nested_key` object. - nested_key.clear(); - // Move the plugin data fields to the top level of `nested_key`. - if let Value::Object(inner_plugin_data) = inner_plugin_data_clone { - for (field_name, field_value) in inner_plugin_data.iter() { - nested_key.insert(field_name.clone(), field_value.clone()); - } - } - } - } -} - -// Modify the JSON for `PluginAuthority` to have consistent output no matter the enum type. -// For example, from: -// "authority":{"Address":{"address":"D7whDWAP5gN9x4Ff6T9MyQEkotyzmNWtfYhCEWjbUDBM"}} -// to: -// "authority":{"address":"4dGxsCAwSCopxjEYY7sFShFUkfKC6vzsNEXJDzFYYFXh","type":"Address"} -// and from: -// "authority":"UpdateAuthority" -// to: -// "authority":{"address":null,"type":"UpdateAuthority"} -fn transform_plugins_authority(plugins_json: &mut Value) { - match plugins_json { - Value::Object(plugins) => { - // Transform plugins in an object - for (_, plugin) in plugins.iter_mut() { - if let Some(plugin_obj) = plugin.as_object_mut() { - transform_authority_in_object(plugin_obj); - } - } - } - Value::Array(plugins_array) => { - // Transform plugins in an array - for plugin in plugins_array.iter_mut() { - if let Some(plugin_obj) = plugin.as_object_mut() { - transform_authority_in_object(plugin_obj); - } - } - } - _ => {} - } -} - -// Helper for `transform_plugins_authority` logic. -fn transform_authority_in_object(plugin: &mut Map) { - match plugin.get_mut("authority") { - Some(Value::Object(authority)) => { - if let Some(authority_type) = authority.keys().next().cloned() { - // Replace the nested JSON objects with desired format. - if let Some(Value::Object(pubkey_obj)) = authority.remove(&authority_type) { - if let Some(address_value) = pubkey_obj.get("address") { - authority.insert("type".to_string(), Value::from(authority_type)); - authority.insert("address".to_string(), address_value.clone()); - } - } - } - } - Some(Value::String(authority_type)) => { - // Handle the case where authority is a string. - let mut authority_obj = Map::new(); - authority_obj.insert("type".to_string(), Value::String(authority_type.clone())); - authority_obj.insert("address".to_string(), Value::Null); - plugin.insert("authority".to_string(), Value::Object(authority_obj)); - } - _ => {} - } -} - -// Convert all keys to snake case. Ignore values that aren't JSON objects themselves. -fn convert_keys_to_snake_case(plugins_json: &mut Value) { - match plugins_json { - Value::Object(obj) => { - let keys = obj.keys().cloned().collect::>(); - for key in keys { - let snake_case_key = key.to_snake_case(); - if let Some(val) = obj.remove(&key) { - obj.insert(snake_case_key, val); - } - } - for (_, val) in obj.iter_mut() { - convert_keys_to_snake_case(val); - } - } - Value::Array(arr) => { - for val in arr { - convert_keys_to_snake_case(val); - } - } - _ => {} - } -} - -/// Updates the `asset_authority` for all assets that are part of a collection in a batch. -/// This function performs a cursor-based paginated read and batch update. -async fn update_group_asset_authorities( - conn: &T, - group_value: Vec, - authority: Vec, - slot: i64, -) -> ProgramTransformerResult<()> { - let mut after = None; - - let group_key = "collection".to_string(); - let group_value = bs58::encode(group_value).into_string(); - - let mut query = asset_grouping::Entity::find() - .filter(asset_grouping::Column::GroupKey.eq(group_key)) - .filter(asset_grouping::Column::GroupValue.eq(group_value)) - .cursor_by(asset_grouping::Column::AssetId); - let mut query = query.first(1_000); - - loop { - if let Some(after) = after.clone() { - query = query.after(after); - } - - let entries = query.all(conn).await?; - - if entries.is_empty() { - break; - } - - let asset_ids = entries - .clone() - .into_iter() - .map(|entry| entry.asset_id) - .collect::>(); - - asset_authority::Entity::update_many() - .col_expr( - asset_authority::Column::Authority, - Expr::value(authority.clone()), - ) - .col_expr(asset_authority::Column::SlotUpdated, Expr::value(slot)) - .filter(asset_authority::Column::AssetId.is_in(asset_ids)) - .filter(asset_authority::Column::Authority.ne(authority.clone())) - .filter(Expr::cust_with_values( - "asset_authority.slot_updated < $1", - vec![slot], - )) - .exec(conn) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - after = entries.last().map(|entry| entry.asset_id.clone()); - } - - Ok(()) -} diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs deleted file mode 100644 index 7cb70c4..0000000 --- a/program_transformers/src/token/mod.rs +++ /dev/null @@ -1,169 +0,0 @@ -use { - crate::{ - asset_upserts::{ - upsert_assets_mint_account_columns, upsert_assets_token_account_columns, - AssetMintAccountColumns, AssetTokenAccountColumns, - }, - error::ProgramTransformerResult, - AccountInfo, DownloadMetadataNotifier, - }, - blockbuster::programs::token_account::TokenProgramAccount, - digital_asset_types::dao::{asset, sea_orm_active_enums::OwnerType, token_accounts, tokens}, - sea_orm::{ - entity::{ActiveValue, ColumnTrait}, - query::{QueryFilter, QueryTrait}, - sea_query::query::OnConflict, - ConnectionTrait, DatabaseConnection, DbBackend, EntityTrait, TransactionTrait, - }, - solana_sdk::program_option::COption, - spl_token::state::AccountState, -}; - -pub async fn handle_token_program_account<'a, 'b>( - account_info: &AccountInfo, - parsing_result: &'a TokenProgramAccount, - db: &'b DatabaseConnection, - _download_metadata_notifier: &DownloadMetadataNotifier, -) -> ProgramTransformerResult<()> { - let account_key = account_info.pubkey.to_bytes().to_vec(); - let account_owner = account_info.owner.to_bytes().to_vec(); - match &parsing_result { - TokenProgramAccount::TokenAccount(ta) => { - let mint = ta.mint.to_bytes().to_vec(); - let delegate: Option> = match ta.delegate { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let frozen = matches!(ta.state, AccountState::Frozen); - let owner = ta.owner.to_bytes().to_vec(); - let model = token_accounts::ActiveModel { - pubkey: ActiveValue::Set(account_key.clone()), - mint: ActiveValue::Set(mint.clone()), - delegate: ActiveValue::Set(delegate.clone()), - owner: ActiveValue::Set(owner.clone()), - frozen: ActiveValue::Set(frozen), - delegated_amount: ActiveValue::Set(ta.delegated_amount as i64), - token_program: ActiveValue::Set(account_owner.clone()), - slot_updated: ActiveValue::Set(account_info.slot as i64), - amount: ActiveValue::Set(ta.amount as i64), - close_authority: ActiveValue::Set(None), - }; - - let mut query = token_accounts::Entity::insert(model) - .on_conflict( - OnConflict::columns([token_accounts::Column::Pubkey]) - .update_columns([ - token_accounts::Column::Mint, - token_accounts::Column::DelegatedAmount, - token_accounts::Column::Delegate, - token_accounts::Column::Amount, - token_accounts::Column::Frozen, - token_accounts::Column::TokenProgram, - token_accounts::Column::Owner, - token_accounts::Column::CloseAuthority, - token_accounts::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > token_accounts.slot_updated", - query.sql - ); - db.execute(query).await?; - let txn = db.begin().await?; - let asset_update: Option = asset::Entity::find_by_id(mint.clone()) - .filter(asset::Column::OwnerType.eq("single")) - .one(&txn) - .await?; - if let Some(_asset) = asset_update { - // will only update owner if token account balance is non-zero - // since the asset is marked as single then the token account balance can only be 1. Greater implies a fungible token in which case no si - // TODO: this does not guarantee in case when wallet receives an amount of 1 for a token but its supply is more. is unlikely since mints often have a decimal - if ta.amount == 1 { - upsert_assets_token_account_columns( - AssetTokenAccountColumns { - mint: mint.clone(), - owner: Some(owner.clone()), - frozen, - delegate, - slot_updated_token_account: Some(account_info.slot as i64), - }, - &txn, - ) - .await?; - } - } - txn.commit().await?; - Ok(()) - } - TokenProgramAccount::Mint(m) => { - let freeze_auth: Option> = match m.freeze_authority { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let mint_auth: Option> = match m.mint_authority { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let model = tokens::ActiveModel { - mint: ActiveValue::Set(account_key.clone()), - token_program: ActiveValue::Set(account_owner), - slot_updated: ActiveValue::Set(account_info.slot as i64), - supply: ActiveValue::Set(m.supply as i64), - decimals: ActiveValue::Set(m.decimals as i32), - close_authority: ActiveValue::Set(None), - extension_data: ActiveValue::Set(None), - mint_authority: ActiveValue::Set(mint_auth), - freeze_authority: ActiveValue::Set(freeze_auth), - }; - - let mut query = tokens::Entity::insert(model) - .on_conflict( - OnConflict::columns([tokens::Column::Mint]) - .update_columns([ - tokens::Column::Supply, - tokens::Column::TokenProgram, - tokens::Column::MintAuthority, - tokens::Column::CloseAuthority, - tokens::Column::ExtensionData, - tokens::Column::SlotUpdated, - tokens::Column::Decimals, - tokens::Column::FreezeAuthority, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= tokens.slot_updated", - query.sql - ); - db.execute(query).await?; - - let asset_update: Option = asset::Entity::find_by_id(account_key.clone()) - .filter( - asset::Column::OwnerType - .eq(OwnerType::Single) - .or(asset::Column::OwnerType - .eq(OwnerType::Unknown) - .and(asset::Column::Supply.eq(1))), - ) - .one(db) - .await?; - if let Some(_asset) = asset_update { - upsert_assets_mint_account_columns( - AssetMintAccountColumns { - mint: account_key.clone(), - supply_mint: Some(account_key), - supply: m.supply, - slot_updated_mint_account: account_info.slot, - }, - db, - ) - .await?; - } - - Ok(()) - } - } -} diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs deleted file mode 100644 index 7f76fa4..0000000 --- a/program_transformers/src/token_metadata/master_edition.rs +++ /dev/null @@ -1,107 +0,0 @@ -use { - crate::error::{ProgramTransformerError, ProgramTransformerResult}, - blockbuster::token_metadata::{ - accounts::{DeprecatedMasterEditionV1, MasterEdition}, - types::Key, - }, - digital_asset_types::dao::{ - asset, asset_v1_account_attachments, extensions, - sea_orm_active_enums::{SpecificationAssetClass, V1AccountAttachments}, - }, - sea_orm::{ - entity::{ActiveModelTrait, ActiveValue, EntityTrait, RelationTrait}, - query::{JoinType, QuerySelect, QueryTrait}, - sea_query::query::OnConflict, - ConnectionTrait, DatabaseTransaction, DbBackend, - }, - solana_sdk::pubkey::Pubkey, -}; - -pub async fn save_v2_master_edition( - id: Pubkey, - slot: u64, - me_data: &MasterEdition, - txn: &DatabaseTransaction, -) -> ProgramTransformerResult<()> { - save_master_edition( - V1AccountAttachments::MasterEditionV2, - id, - slot, - me_data, - txn, - ) - .await -} - -pub async fn save_v1_master_edition( - id: Pubkey, - slot: u64, - me_data: &DeprecatedMasterEditionV1, - txn: &DatabaseTransaction, -) -> ProgramTransformerResult<()> { - // This discards the deprecated `MasterEditionV1` fields - // but sets the `Key`` as `MasterEditionV1`. - let bridge = MasterEdition { - supply: me_data.supply, - max_supply: me_data.max_supply, - key: Key::MasterEditionV1, - }; - save_master_edition( - V1AccountAttachments::MasterEditionV1, - id, - slot, - &bridge, - txn, - ) - .await -} - -pub async fn save_master_edition( - version: V1AccountAttachments, - id: Pubkey, - slot: u64, - me_data: &MasterEdition, - txn: &DatabaseTransaction, -) -> ProgramTransformerResult<()> { - let id_bytes = id.to_bytes().to_vec(); - let master_edition: Option<(asset_v1_account_attachments::Model, Option)> = - asset_v1_account_attachments::Entity::find_by_id(id.to_bytes().to_vec()) - .find_also_related(asset::Entity) - .join( - JoinType::InnerJoin, - extensions::asset::Relation::AssetData.def(), - ) - .one(txn) - .await?; - let ser = serde_json::to_value(me_data) - .map_err(|e| ProgramTransformerError::SerializatonError(e.to_string()))?; - - let model = asset_v1_account_attachments::ActiveModel { - id: ActiveValue::Set(id_bytes), - attachment_type: ActiveValue::Set(version), - data: ActiveValue::Set(Some(ser)), - slot_updated: ActiveValue::Set(slot as i64), - ..Default::default() - }; - - if let Some((_me, Some(asset))) = master_edition { - let mut updatable: asset::ActiveModel = asset.into(); - updatable.supply = ActiveValue::Set(1); - updatable.specification_asset_class = ActiveValue::Set(Some(SpecificationAssetClass::Nft)); - updatable.update(txn).await?; - } - - let query = asset_v1_account_attachments::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset_v1_account_attachments::Column::Id]) - .update_columns([ - asset_v1_account_attachments::Column::AttachmentType, - asset_v1_account_attachments::Column::Data, - asset_v1_account_attachments::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - Ok(()) -} diff --git a/program_transformers/src/token_metadata/mod.rs b/program_transformers/src/token_metadata/mod.rs deleted file mode 100644 index cbeb941..0000000 --- a/program_transformers/src/token_metadata/mod.rs +++ /dev/null @@ -1,53 +0,0 @@ -use { - crate::{ - error::{ProgramTransformerError, ProgramTransformerResult}, - token_metadata::{ - master_edition::{save_v1_master_edition, save_v2_master_edition}, - v1_asset::{burn_v1_asset, save_v1_asset}, - }, - AccountInfo, DownloadMetadataNotifier, - }, - blockbuster::programs::token_metadata::{TokenMetadataAccountData, TokenMetadataAccountState}, - sea_orm::{DatabaseConnection, TransactionTrait}, -}; - -mod master_edition; -mod v1_asset; - -pub async fn handle_token_metadata_account<'a, 'b>( - account_info: &AccountInfo, - parsing_result: &'a TokenMetadataAccountState, - db: &'b DatabaseConnection, - download_metadata_notifier: &DownloadMetadataNotifier, -) -> ProgramTransformerResult<()> { - match &parsing_result.data { - TokenMetadataAccountData::EmptyAccount => { - burn_v1_asset(db, account_info.pubkey, account_info.slot).await?; - Ok(()) - } - TokenMetadataAccountData::MasterEditionV1(m) => { - let txn = db.begin().await?; - save_v1_master_edition(account_info.pubkey, account_info.slot, m, &txn).await?; - txn.commit().await?; - Ok(()) - } - TokenMetadataAccountData::MetadataV1(m) => { - if let Some(info) = save_v1_asset(db, m, account_info.slot).await? { - download_metadata_notifier(info) - .await - .map_err(ProgramTransformerError::DownloadMetadataNotify)?; - } - Ok(()) - } - TokenMetadataAccountData::MasterEditionV2(m) => { - let txn = db.begin().await?; - save_v2_master_edition(account_info.pubkey, account_info.slot, m, &txn).await?; - txn.commit().await?; - Ok(()) - } - // TokenMetadataAccountData::EditionMarker(_) => {} - // TokenMetadataAccountData::UseAuthorityRecord(_) => {} - // TokenMetadataAccountData::CollectionAuthorityRecord(_) => {} - _ => Err(ProgramTransformerError::NotImplemented), - } -} diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs deleted file mode 100644 index 59f4edd..0000000 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ /dev/null @@ -1,413 +0,0 @@ -use { - crate::{ - asset_upserts::{ - upsert_assets_metadata_account_columns, upsert_assets_mint_account_columns, - upsert_assets_token_account_columns, AssetMetadataAccountColumns, - AssetMintAccountColumns, AssetTokenAccountColumns, - }, - error::{ProgramTransformerError, ProgramTransformerResult}, - find_model_with_retry, DownloadMetadataInfo, - }, - blockbuster::token_metadata::{ - accounts::{MasterEdition, Metadata}, - types::TokenStandard, - }, - digital_asset_types::{ - dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, - asset_v1_account_attachments, - sea_orm_active_enums::{ - ChainMutability, Mutability, OwnerType, SpecificationAssetClass, - SpecificationVersions, V1AccountAttachments, - }, - token_accounts, tokens, - }, - json::ChainDataV1, - }, - sea_orm::{ - entity::{ActiveValue, ColumnTrait, EntityTrait}, - query::{JsonValue, Order, QueryFilter, QueryOrder, QueryTrait}, - sea_query::query::OnConflict, - ConnectionTrait, DbBackend, DbErr, TransactionTrait, - }, - solana_sdk::{pubkey, pubkey::Pubkey}, - tracing::warn, -}; - -pub async fn burn_v1_asset( - conn: &T, - id: Pubkey, - slot: u64, -) -> ProgramTransformerResult<()> { - let slot_i = slot as i64; - let model = asset::ActiveModel { - id: ActiveValue::Set(id.to_bytes().to_vec()), - slot_updated: ActiveValue::Set(Some(slot_i)), - burnt: ActiveValue::Set(true), - ..Default::default() - }; - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset::Column::Id]) - .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; - Ok(()) -} - -const RETRY_INTERVALS: &[u64] = &[0, 5, 10]; -static WSOL_PUBKEY: Pubkey = pubkey!("So11111111111111111111111111111111111111112"); - -pub async fn index_and_fetch_mint_data( - conn: &T, - mint_pubkey_vec: Vec, -) -> ProgramTransformerResult> { - // Gets the token and token account for the mint to populate the asset. - // This is required when the token and token account are indexed, but not the metadata account. - // If the metadata account is indexed, then the token and ta ingester will update the asset with the correct data. - let token: Option = find_model_with_retry( - conn, - "token", - &tokens::Entity::find_by_id(mint_pubkey_vec.clone()), - RETRY_INTERVALS, - ) - .await?; - - if let Some(token) = token { - upsert_assets_mint_account_columns( - AssetMintAccountColumns { - mint: mint_pubkey_vec.clone(), - supply_mint: Some(token.mint.clone()), - supply: token.supply as u64, - slot_updated_mint_account: token.slot_updated as u64, - }, - conn, - ) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - Ok(Some(token)) - } else { - warn!( - target: "Mint not found", - "Mint not found in 'tokens' table for mint {}", - bs58::encode(&mint_pubkey_vec).into_string() - ); - Ok(None) - } -} - -async fn index_token_account_data( - conn: &T, - mint_pubkey_vec: Vec, -) -> ProgramTransformerResult<()> { - let token_account: Option = find_model_with_retry( - conn, - "owners", - &token_accounts::Entity::find() - .filter(token_accounts::Column::Mint.eq(mint_pubkey_vec.clone())) - .filter(token_accounts::Column::Amount.gt(0)) - .order_by(token_accounts::Column::SlotUpdated, Order::Desc), - RETRY_INTERVALS, - ) - .await - .map_err(|e: DbErr| ProgramTransformerError::DatabaseError(e.to_string()))?; - - if let Some(token_account) = token_account { - upsert_assets_token_account_columns( - AssetTokenAccountColumns { - mint: mint_pubkey_vec.clone(), - owner: Some(token_account.owner), - delegate: token_account.delegate, - frozen: token_account.frozen, - slot_updated_token_account: Some(token_account.slot_updated), - }, - conn, - ) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - } else { - warn!( - target: "Account not found", - "Token acc not found in 'owners' table for mint {}", - bs58::encode(&mint_pubkey_vec).into_string() - ); - } - - Ok(()) -} - -pub async fn save_v1_asset( - conn: &T, - metadata: &Metadata, - slot: u64, -) -> ProgramTransformerResult> { - let metadata = metadata.clone(); - let mint_pubkey = metadata.mint; - let mint_pubkey_array = mint_pubkey.to_bytes(); - let mint_pubkey_vec = mint_pubkey_array.to_vec(); - - let (edition_attachment_address, _) = MasterEdition::find_pda(&mint_pubkey); - - let authority = metadata.update_authority.to_bytes().to_vec(); - let slot_i = slot as i64; - let uri = metadata.uri.trim().replace('\0', ""); - let _spec = SpecificationVersions::V1; - let mut class = match metadata.token_standard { - Some(TokenStandard::NonFungible) => SpecificationAssetClass::Nft, - Some(TokenStandard::FungibleAsset) => SpecificationAssetClass::FungibleAsset, - Some(TokenStandard::Fungible) => SpecificationAssetClass::FungibleToken, - Some(TokenStandard::NonFungibleEdition) => SpecificationAssetClass::Nft, - Some(TokenStandard::ProgrammableNonFungible) => SpecificationAssetClass::ProgrammableNft, - Some(TokenStandard::ProgrammableNonFungibleEdition) => { - SpecificationAssetClass::ProgrammableNft - } - _ => SpecificationAssetClass::Unknown, - }; - let mut ownership_type = match class { - SpecificationAssetClass::FungibleAsset => OwnerType::Token, - SpecificationAssetClass::FungibleToken => OwnerType::Token, - SpecificationAssetClass::Nft | SpecificationAssetClass::ProgrammableNft => { - OwnerType::Single - } - _ => OwnerType::Unknown, - }; - - // Wrapped Solana is a special token that has supply 0 (infinite). - // It's a fungible token with a metadata account, but without any token standard, meaning the code above will misabel it as an NFT. - if mint_pubkey == WSOL_PUBKEY { - ownership_type = OwnerType::Token; - class = SpecificationAssetClass::FungibleToken; - } - - let token: Option = - index_and_fetch_mint_data(conn, mint_pubkey_vec.clone()).await?; - - // get supply of token, default to 1 since most cases will be NFTs. Token mint ingester will properly set supply if token_result is None - let supply = token.map(|t| t.supply).unwrap_or(1); - - // Map unknown ownership types based on the supply. - if ownership_type == OwnerType::Unknown { - ownership_type = match supply.cmp(&1) { - std::cmp::Ordering::Equal => OwnerType::Single, - std::cmp::Ordering::Greater => OwnerType::Token, - _ => OwnerType::Unknown, - }; - }; - - if (ownership_type == OwnerType::Single) | (ownership_type == OwnerType::Unknown) { - index_token_account_data(conn, mint_pubkey_vec.clone()).await?; - } - - let name = metadata.name.clone().into_bytes(); - let symbol = metadata.symbol.clone().into_bytes(); - let mut chain_data = ChainDataV1 { - name: metadata.name.clone(), - symbol: metadata.symbol.clone(), - edition_nonce: metadata.edition_nonce, - primary_sale_happened: metadata.primary_sale_happened, - token_standard: metadata.token_standard, - uses: metadata.uses, - }; - chain_data.sanitize(); - let chain_data_json = serde_json::to_value(chain_data) - .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; - let chain_mutability = match metadata.is_mutable { - true => ChainMutability::Mutable, - false => ChainMutability::Immutable, - }; - let asset_data_model = asset_data::ActiveModel { - chain_data_mutability: ActiveValue::Set(chain_mutability), - chain_data: ActiveValue::Set(chain_data_json), - metadata_url: ActiveValue::Set(uri.clone()), - metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), - metadata_mutability: ActiveValue::Set(Mutability::Mutable), - slot_updated: ActiveValue::Set(slot_i), - reindex: ActiveValue::Set(Some(true)), - id: ActiveValue::Set(mint_pubkey_vec.clone()), - raw_name: ActiveValue::Set(Some(name.to_vec())), - raw_symbol: ActiveValue::Set(Some(symbol.to_vec())), - base_info_seq: ActiveValue::Set(Some(0)), - }; - let txn = conn.begin().await?; - let mut query = asset_data::Entity::insert(asset_data_model) - .on_conflict( - OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - asset_data::Column::BaseInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - upsert_assets_metadata_account_columns( - AssetMetadataAccountColumns { - mint: mint_pubkey_vec.clone(), - owner_type: ownership_type, - specification_asset_class: Some(class), - royalty_amount: metadata.seller_fee_basis_points as i32, - asset_data: Some(mint_pubkey_vec.clone()), - slot_updated_metadata_account: slot_i as u64, - mpl_core_plugins: None, - mpl_core_unknown_plugins: None, - mpl_core_collection_num_minted: None, - mpl_core_collection_current_size: None, - mpl_core_plugins_json_version: None, - mpl_core_external_plugins: None, - mpl_core_unknown_external_plugins: None, - }, - &txn, - ) - .await?; - - let attachment = asset_v1_account_attachments::ActiveModel { - id: ActiveValue::Set(edition_attachment_address.to_bytes().to_vec()), - slot_updated: ActiveValue::Set(slot_i), - attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV2), - ..Default::default() - }; - let query = asset_v1_account_attachments::Entity::insert(attachment) - .on_conflict( - OnConflict::columns([asset_v1_account_attachments::Column::Id]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - let model = asset_authority::ActiveModel { - asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), - authority: ActiveValue::Set(authority), - seq: ActiveValue::Set(0), - slot_updated: ActiveValue::Set(slot_i), - ..Default::default() - }; - let mut query = asset_authority::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) - .update_columns([ - asset_authority::Column::Authority, - asset_authority::Column::Seq, - asset_authority::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - - if let Some(c) = &metadata.collection { - let model = asset_grouping::ActiveModel { - asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), - group_key: ActiveValue::Set("collection".to_string()), - group_value: ActiveValue::Set(Some(c.key.to_string())), - verified: ActiveValue::Set(c.verified), - group_info_seq: ActiveValue::Set(Some(0)), - slot_updated: ActiveValue::Set(Some(slot_i)), - ..Default::default() - }; - let mut query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - } - - let creators = metadata - .creators - .unwrap_or_default() - .iter() - .enumerate() - .map(|(i, creator)| asset_creators::ActiveModel { - asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), - position: ActiveValue::Set(i as i16), - creator: ActiveValue::Set(creator.address.to_bytes().to_vec()), - share: ActiveValue::Set(creator.share as i32), - verified: ActiveValue::Set(creator.verified), - slot_updated: ActiveValue::Set(Some(slot_i)), - seq: ActiveValue::Set(Some(0)), - ..Default::default() - }) - .collect::>(); - - if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Position, - ]) - .update_columns([ - asset_creators::Column::Creator, - asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; - } - txn.commit().await?; - - if uri.is_empty() { - warn!( - "URI is empty for mint {}. Skipping background task.", - bs58::encode(mint_pubkey_vec).into_string() - ); - return Ok(None); - } - - Ok(Some(DownloadMetadataInfo::new(mint_pubkey_vec, uri))) -} diff --git a/src/main.rs b/src/main.rs index c791a1e..b58edec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,14 @@ use crate::config::database::setup_database_config; use crate::config::env_config::{setup_env_config, EnvConfig}; use anyhow::Result; use config::rpc_config::{get_pubsub_client, setup_rpc_clients}; -use das_bubblegum_backfill::worker::{ - GapWorkerArgs, ProgramTransformerWorkerArgs, SignatureWorkerArgs, -}; -use das_bubblegum_backfill::{ - start_bubblegum_backfill, BubblegumBackfillArgs, BubblegumBackfillContext, +use das_bubblegum::backfill::worker::{ + GapWorkerArgs, ProgramTransformerWorkerArgs, SignatureWorkerArgs, TreeWorkerArgs, }; +use das_bubblegum::{start_backfill, BubblegumContext}; + +// use das_bubblegum::{ +// start_backfill, BackfillArgs, BackfillContext, +// }; use das_core::{MetadataJsonDownloadWorkerArgs, Rpc, SolanaRpcArgs}; use dotenv::dotenv; @@ -186,7 +188,7 @@ fn reload_tasks(state: &mut State, database_pool: Pool, env_config: En } }); - let context = BubblegumBackfillContext::new( + let context = BubblegumContext::new( database_pool.clone(), Rpc::from_config(&SolanaRpcArgs { solana_rpc_url: env_config.get_rpc_url().to_string(), @@ -201,15 +203,14 @@ fn reload_tasks(state: &mut State, database_pool: Pool, env_config: En let program_transformer = ProgramTransformer::new( database_pool.clone(), Box::new(|_info| futures::future::ready(Ok(())).boxed()), - false, ); let context = context.clone(); - let args = BubblegumBackfillArgs { + let args = das_bubblegum::BackfillArgs { only_trees: Some(vec![address.clone().to_string()]), tree_crawler_count: 4, - tree_worker: das_bubblegum_backfill::worker::TreeWorkerArgs { + tree_worker: TreeWorkerArgs { metadata_json_download_worker: MetadataJsonDownloadWorkerArgs { metadata_json_download_worker_count: 100, metadata_json_download_worker_request_timeout: 200, @@ -224,7 +225,9 @@ fn reload_tasks(state: &mut State, database_pool: Pool, env_config: En }, program_transformer_worker: ProgramTransformerWorkerArgs { program_transformer_channel_size: 100, + program_transformer_worker_count: 100, }, + force: true, }, }; @@ -250,7 +253,11 @@ fn reload_tasks(state: &mut State, database_pool: Pool, env_config: En println!("Backfill started for tree: {:}", address); - if let Err(e) = start_bubblegum_backfill(context.clone(), args).await { + // if let Err(e) = start_bubblegum_backfill(context.clone(), args).await { + // eprintln!("Error backfilling tree {:?}: {:?}", address.clone(), e); + // } + + if let Err(e) = start_backfill(context.clone(), args).await { eprintln!("Error backfilling tree {:?}: {:?}", address.clone(), e); }