Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 88 additions & 3 deletions lib/db/sqlite/firo_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,95 @@ abstract class _FiroCache {
sparkUsedTagsCacheFile.path,
mode: OpenMode.readWrite,
);

_migrateSparkSetCacheDb(_setCacheDB[network]!);
}
}

/// Idempotent migration for Spark anon-set cache databases. Runs on every
/// open. Safe to invoke repeatedly: each step is gated on a presence
/// check so subsequent startups are no-ops.
///
/// Migrations:
///
/// 1. SparkSet.complete (ADD COLUMN, DEFAULT 1): 0 while a sync is in
/// flight, 1 once every sector has committed and the finalize-time
/// integrity check has passed. Readers filter on this; partial
/// state is invisible. Existing rows are assumed complete — the
/// pre-fix writer was all-or-nothing so any row in a legacy DB
/// represents a successfully-finalized sync.
///
/// 2. SparkSetCoins.orderKey (ADD COLUMN, DEFAULT 0): the server-side
/// delta index of the coin this link-row references. Used by the
/// reader's ORDER BY to reconstruct server newest-first order
/// end-to-end. Pre-migration rows default to 0; the reader's
/// `ssc.id ASC` tiebreaker then sorts them in PK order, which is
/// exactly the layout the pre-fix writer produced (it inserted
/// coins in globally-reversed RPC order, so PK-ASC = oldest-first,
/// and the coordinator's Dart `.reversed` flips to newest-first).
///
/// 3. UNIQUE INDEX idx_sparksetcoins_set_coin ON SparkSetCoins(setId,
/// coinId): required for INSERT OR IGNORE on the link table during
/// resumable per-sector writes (idempotent under crash-recovery
/// replay). Before creating it, any pre-existing duplicate
/// (setId, coinId) rows are removed — keeping the lowest PK — so
/// a legacy DB with unexpected duplicates can still upgrade. The
/// pre-fix writer shouldn't have produced duplicates (its INSERT
/// was not OR IGNORE and would have thrown on collision), but
/// scrubbing once is cheaper than failing to open the DB.
///
/// Column and index presence are checked explicitly rather than wrapped
/// in try/catch, so unrelated SQLite errors don't get silently swallowed.
static void _migrateSparkSetCacheDb(Database db) {
if (!_columnExists(db, "SparkSet", "complete")) {
db.execute("""
ALTER TABLE SparkSet
ADD COLUMN complete INTEGER NOT NULL DEFAULT 1;
""");
}
if (!_columnExists(db, "SparkSetCoins", "orderKey")) {
db.execute("""
ALTER TABLE SparkSetCoins
ADD COLUMN orderKey INTEGER NOT NULL DEFAULT 0;
""");
}
if (!_indexExists(db, "idx_sparksetcoins_set_coin")) {
// Defensive: drop any pre-existing duplicate (setId, coinId) rows
// before the index creation would fail on them. On a clean legacy
// DB this DELETE matches zero rows.
db.execute("""
DELETE FROM SparkSetCoins
WHERE id NOT IN (
SELECT MIN(id) FROM SparkSetCoins
GROUP BY setId, coinId
);
""");
db.execute("""
CREATE UNIQUE INDEX idx_sparksetcoins_set_coin
ON SparkSetCoins(setId, coinId);
""");
}
}

static bool _columnExists(Database db, String table, String column) {
final rows = db.select("PRAGMA table_info($table);");
return rows.any((row) => row["name"] == column);
}

static bool _indexExists(Database db, String indexName) {
final rows = db.select(
"SELECT 1 FROM sqlite_master WHERE type = 'index' AND name = ?;",
[indexName],
);
return rows.isNotEmpty;
}

static Future<void> _deleteAllCache(CryptoCurrencyNetwork network) async {
final start = DateTime.now();
setCacheDB(network).execute("""
DELETE FROM SparkSetCoins;
DELETE FROM SparkSet;
DELETE FROM SparkCoin;
DELETE FROM SparkSetCoins;
VACUUM;
""");
await _deleteUsedTagsCache(network);
Expand Down Expand Up @@ -133,9 +213,10 @@ abstract class _FiroCache {
setHash TEXT NOT NULL,
groupId INTEGER NOT NULL,
size INTEGER NOT NULL,
complete INTEGER NOT NULL DEFAULT 0,
UNIQUE (blockHash, setHash, groupId)
);

CREATE TABLE SparkCoin (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
serialized TEXT NOT NULL,
Expand All @@ -144,14 +225,18 @@ abstract class _FiroCache {
groupId INTEGER NOT NULL,
UNIQUE(serialized, txHash, context, groupId)
);

CREATE TABLE SparkSetCoins (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
setId INTEGER NOT NULL,
coinId INTEGER NOT NULL,
orderKey INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY (setId) REFERENCES SparkSet(id),
FOREIGN KEY (coinId) REFERENCES SparkCoin(id)
);

CREATE UNIQUE INDEX idx_sparksetcoins_set_coin
ON SparkSetCoins(setId, coinId);
""");

db.dispose();
Expand Down
197 changes: 168 additions & 29 deletions lib/db/sqlite/firo_cache_coordinator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,72 +84,211 @@ abstract class FiroCacheCoordinator {
});
}

/// Sync the Spark anonymity set cache for `groupId` from the node.
///
/// Each sector the server returns is persisted to disk in its own SQLite
/// transaction against an in-progress SparkSet row (`complete = 0`). The
/// row and its coins are invisible to readers until
/// [_markSparkAnonSetComplete] flips the flag after a strict integrity
/// check (linked coin count == expected delta) passes.
///
/// Resumability: if a prior sync crashed mid-download this function
/// picks up at the count of coins already linked to the in-progress
/// row, never re-downloading sectors that committed. If the server's
/// blockHash has shifted between attempts the partial row is discarded
/// (orderKey indices no longer align at the new blockHash) and the
/// delta is fetched fresh.
///
/// Integrity: the finalize step rolls back and leaves `complete = 0`
/// if the link count doesn't match the expected delta — a partial or
/// over-full cache never becomes the current set.
static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
int groupId,
ElectrumXClient client,
CryptoCurrencyNetwork network,
void Function(int countFetched, int totalCount)? progressUpdated,
) async {
await _setLocks[network]!.protect(() async {
const sectorSize =
1500; // chosen as a somewhat decent value. Could be changed in the future if wanted/needed
const sectorSize = 1500;

final prevMeta = await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
network,
);

final prevSize = prevMeta?.size ?? 0;

final meta = await client.getSparkAnonymitySetMeta(coinGroupId: groupId);

// Preserve the pre-fix "initial call with totals" callback shape so
// existing UI callers don't see a progress-bar regression.
progressUpdated?.call(prevSize, meta.size);

if (prevMeta?.blockHash == meta.blockHash) {
Logging.instance.d("prevMeta?.blockHash == meta.blockHash");
if (prevMeta?.size == meta.size) {
Logging.instance.d(
"Spark anon set groupId=$groupId already up to date "
"(blockHash=${meta.blockHash}, size=${meta.size})",
);
return;
}
// Server reports a different size for the same blockHash. On
// Firo's server this should be impossible (blockHash advances
// whenever a coin is added), so this is treated as an anomaly.
// Refuse to sync: appending coins to the existing finalized row
// (what INSERT OR IGNORE in the writer would produce) would leak
// unverified coins into a set whose setHash we've already
// committed to.
Logging.instance.w(
"Spark anon set groupId=$groupId server reports different size "
"(${prevMeta!.size} -> ${meta.size}) at same blockHash "
"${meta.blockHash}; skipping sync to preserve cached state.",
);
return;
}

final numberOfCoinsToFetch = meta.size - prevSize;
if (numberOfCoinsToFetch < 0) {
// Reorg-style shrink: server has fewer coins than our last
// finalized set. Refuse to sync rather than invalidate cached data.
Logging.instance.w(
"Spark anon set groupId=$groupId appears to have shrunk "
"($prevSize -> ${meta.size}); skipping sync to preserve "
"cached data.",
);
return;
}
if (numberOfCoinsToFetch == 0) {
// blockHash advanced but no new coins in this group's set. We do
// not materialise a new SparkSet row for an empty delta — a
// same-size row would create a tiebreaker ambiguity in
// _getLatestSetInfoForGroupId. Drop any stray in-progress row so
// it doesn't confuse the next resume attempt.
final stale = await _Reader._getIncompleteSetForGroupId(
groupId,
db: _FiroCache.setCacheDB(network),
);
if (stale.isNotEmpty) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
data: groupId,
),
);
}
return;
}

// Decide whether to resume an existing in-progress row or start
// fresh. Cases:
// * no in-progress row -> cursor = 0
// * in-progress blockHash differs -> discard, cursor = 0
// * in-progress linked > expected delta -> corrupt, discard,
// cursor = 0
// * in-progress blockHash matches -> cursor = linkedSoFar
final incomplete = await _Reader._getIncompleteSetForGroupId(
groupId,
db: _FiroCache.setCacheDB(network),
);

final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
final remainder = numberOfCoinsToFetch % sectorSize;
int cursor;
if (incomplete.isEmpty) {
cursor = 0;
} else {
final incBlockHash = incomplete.first["blockHash"] as String;
final incSetHash = incomplete.first["setHash"] as String;
final incSetId = incomplete.first["id"] as int;

// Discard the in-progress row if either blockHash or setHash
// disagrees with the current meta. blockHash disagreement means
// the server's indexing has shifted; setHash disagreement at the
// same blockHash would indicate the in-progress row targets a
// different set snapshot and resuming would mix coin contexts.
if (incBlockHash != meta.blockHash ||
incSetHash != meta.setHash) {
Logging.instance.i(
"Spark anon set groupId=$groupId in-progress "
"(blockHash=$incBlockHash, setHash=$incSetHash) does not "
"match meta (blockHash=${meta.blockHash}, "
"setHash=${meta.setHash}); discarding in-flight row.",
);
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
data: groupId,
),
);
cursor = 0;
} else {
final linked = await _Reader._countSetCoins(
incSetId,
db: _FiroCache.setCacheDB(network),
);
if (linked > numberOfCoinsToFetch) {
Logging.instance.w(
"Spark anon set groupId=$groupId in-progress row has "
"$linked linked coins but delta is only "
"$numberOfCoinsToFetch; discarding in-flight row.",
);
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
data: groupId,
),
);
cursor = 0;
} else {
cursor = linked;
}
}
}

final List<dynamic> coins = [];
while (cursor < numberOfCoinsToFetch) {
final endIndex = cursor + sectorSize <= numberOfCoinsToFetch
? cursor + sectorSize
: numberOfCoinsToFetch;
final expected = endIndex - cursor;

for (int i = 0; i < fullSectorCount; i++) {
final start = (i * sectorSize);
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: start,
endIndex: start + sectorSize,
startIndex: cursor,
endIndex: endIndex,
);
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);

coins.addAll(data);
}
// Refuse to persist a sector whose size doesn't match the request:
// a partial or over-full server response would break the finalize-
// time integrity check and potentially skew the resume cursor.
if (data.length != expected) {
throw Exception(
"Spark anon set sector size mismatch for groupId=$groupId: "
"requested $expected coins in range [$cursor, $endIndex), "
"server returned ${data.length}",
);
}

final sectorCoins = data
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();

if (remainder > 0) {
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: numberOfCoinsToFetch - remainder,
endIndex: numberOfCoinsToFetch,
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
data: (meta, sectorCoins, cursor),
),
);
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);

coins.addAll(data);
cursor = endIndex;
progressUpdated?.call(cursor, numberOfCoinsToFetch);
}

final result =
coins
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();

// All sectors persisted. Flip `complete = 1` iff the link count
// matches the expected delta. On failure the in-progress row stays
// hidden and the error propagates; the next sync will observe the
// over-linked row and reset.
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (meta, result),
func: FCFuncName._markSparkAnonSetComplete,
data: (meta, numberOfCoinsToFetch),
),
);
});
Expand Down
Loading