diff --git a/lib/db/sqlite/firo_cache.dart b/lib/db/sqlite/firo_cache.dart index 9a0d83f6a..0b45f92cb 100644 --- a/lib/db/sqlite/firo_cache.dart +++ b/lib/db/sqlite/firo_cache.dart @@ -88,15 +88,95 @@ abstract class _FiroCache { sparkUsedTagsCacheFile.path, mode: OpenMode.readWrite, ); + + _migrateSparkSetCacheDb(_setCacheDB[network]!); + } + } + + /// Idempotent migration for Spark anon-set cache databases. Runs on every + /// open. Safe to invoke repeatedly: each step is gated on a presence + /// check so subsequent startups are no-ops. + /// + /// Migrations: + /// + /// 1. SparkSet.complete (ADD COLUMN, DEFAULT 1): 0 while a sync is in + /// flight, 1 once every sector has committed and the finalize-time + /// integrity check has passed. Readers filter on this; partial + /// state is invisible. Existing rows are assumed complete — the + /// pre-fix writer was all-or-nothing so any row in a legacy DB + /// represents a successfully-finalized sync. + /// + /// 2. SparkSetCoins.orderKey (ADD COLUMN, DEFAULT 0): the server-side + /// delta index of the coin this link-row references. Used by the + /// reader's ORDER BY to reconstruct server newest-first order + /// end-to-end. Pre-migration rows default to 0; the reader's + /// `ssc.id ASC` tiebreaker then sorts them in PK order, which is + /// exactly the layout the pre-fix writer produced (it inserted + /// coins in globally-reversed RPC order, so PK-ASC = oldest-first, + /// and the coordinator's Dart `.reversed` flips to newest-first). + /// + /// 3. UNIQUE INDEX idx_sparksetcoins_set_coin ON SparkSetCoins(setId, + /// coinId): required for INSERT OR IGNORE on the link table during + /// resumable per-sector writes (idempotent under crash-recovery + /// replay). Before creating it, any pre-existing duplicate + /// (setId, coinId) rows are removed — keeping the lowest PK — so + /// a legacy DB with unexpected duplicates can still upgrade. The + /// pre-fix writer shouldn't have produced duplicates (its INSERT + /// was not OR IGNORE and would have thrown on collision), but + /// scrubbing once is cheaper than failing to open the DB. + /// + /// Column and index presence are checked explicitly rather than wrapped + /// in try/catch, so unrelated SQLite errors don't get silently swallowed. + static void _migrateSparkSetCacheDb(Database db) { + if (!_columnExists(db, "SparkSet", "complete")) { + db.execute(""" + ALTER TABLE SparkSet + ADD COLUMN complete INTEGER NOT NULL DEFAULT 1; + """); } + if (!_columnExists(db, "SparkSetCoins", "orderKey")) { + db.execute(""" + ALTER TABLE SparkSetCoins + ADD COLUMN orderKey INTEGER NOT NULL DEFAULT 0; + """); + } + if (!_indexExists(db, "idx_sparksetcoins_set_coin")) { + // Defensive: drop any pre-existing duplicate (setId, coinId) rows + // before the index creation would fail on them. On a clean legacy + // DB this DELETE matches zero rows. + db.execute(""" + DELETE FROM SparkSetCoins + WHERE id NOT IN ( + SELECT MIN(id) FROM SparkSetCoins + GROUP BY setId, coinId + ); + """); + db.execute(""" + CREATE UNIQUE INDEX idx_sparksetcoins_set_coin + ON SparkSetCoins(setId, coinId); + """); + } + } + + static bool _columnExists(Database db, String table, String column) { + final rows = db.select("PRAGMA table_info($table);"); + return rows.any((row) => row["name"] == column); + } + + static bool _indexExists(Database db, String indexName) { + final rows = db.select( + "SELECT 1 FROM sqlite_master WHERE type = 'index' AND name = ?;", + [indexName], + ); + return rows.isNotEmpty; } static Future _deleteAllCache(CryptoCurrencyNetwork network) async { final start = DateTime.now(); setCacheDB(network).execute(""" + DELETE FROM SparkSetCoins; DELETE FROM SparkSet; DELETE FROM SparkCoin; - DELETE FROM SparkSetCoins; VACUUM; """); await _deleteUsedTagsCache(network); @@ -133,9 +213,10 @@ abstract class _FiroCache { setHash TEXT NOT NULL, groupId INTEGER NOT NULL, size INTEGER NOT NULL, + complete INTEGER NOT NULL DEFAULT 0, UNIQUE (blockHash, setHash, groupId) ); - + CREATE TABLE SparkCoin ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, serialized TEXT NOT NULL, @@ -144,14 +225,18 @@ abstract class _FiroCache { groupId INTEGER NOT NULL, UNIQUE(serialized, txHash, context, groupId) ); - + CREATE TABLE SparkSetCoins ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, setId INTEGER NOT NULL, coinId INTEGER NOT NULL, + orderKey INTEGER NOT NULL DEFAULT 0, FOREIGN KEY (setId) REFERENCES SparkSet(id), FOREIGN KEY (coinId) REFERENCES SparkCoin(id) ); + + CREATE UNIQUE INDEX idx_sparksetcoins_set_coin + ON SparkSetCoins(setId, coinId); """); db.dispose(); diff --git a/lib/db/sqlite/firo_cache_coordinator.dart b/lib/db/sqlite/firo_cache_coordinator.dart index 5ecd7534b..978b1fe43 100644 --- a/lib/db/sqlite/firo_cache_coordinator.dart +++ b/lib/db/sqlite/firo_cache_coordinator.dart @@ -84,6 +84,24 @@ abstract class FiroCacheCoordinator { }); } + /// Sync the Spark anonymity set cache for `groupId` from the node. + /// + /// Each sector the server returns is persisted to disk in its own SQLite + /// transaction against an in-progress SparkSet row (`complete = 0`). The + /// row and its coins are invisible to readers until + /// [_markSparkAnonSetComplete] flips the flag after a strict integrity + /// check (linked coin count == expected delta) passes. + /// + /// Resumability: if a prior sync crashed mid-download this function + /// picks up at the count of coins already linked to the in-progress + /// row, never re-downloading sectors that committed. If the server's + /// blockHash has shifted between attempts the partial row is discarded + /// (orderKey indices no longer align at the new blockHash) and the + /// delta is fetched fresh. + /// + /// Integrity: the finalize step rolls back and leaves `complete = 0` + /// if the link count doesn't match the expected delta — a partial or + /// over-full cache never becomes the current set. static Future runFetchAndUpdateSparkAnonSetCacheForGroupId( int groupId, ElectrumXClient client, @@ -91,65 +109,186 @@ abstract class FiroCacheCoordinator { void Function(int countFetched, int totalCount)? progressUpdated, ) async { await _setLocks[network]!.protect(() async { - const sectorSize = - 1500; // chosen as a somewhat decent value. Could be changed in the future if wanted/needed + const sectorSize = 1500; + final prevMeta = await FiroCacheCoordinator.getLatestSetInfoForGroupId( groupId, network, ); - final prevSize = prevMeta?.size ?? 0; final meta = await client.getSparkAnonymitySetMeta(coinGroupId: groupId); + // Preserve the pre-fix "initial call with totals" callback shape so + // existing UI callers don't see a progress-bar regression. progressUpdated?.call(prevSize, meta.size); if (prevMeta?.blockHash == meta.blockHash) { - Logging.instance.d("prevMeta?.blockHash == meta.blockHash"); + if (prevMeta?.size == meta.size) { + Logging.instance.d( + "Spark anon set groupId=$groupId already up to date " + "(blockHash=${meta.blockHash}, size=${meta.size})", + ); + return; + } + // Server reports a different size for the same blockHash. On + // Firo's server this should be impossible (blockHash advances + // whenever a coin is added), so this is treated as an anomaly. + // Refuse to sync: appending coins to the existing finalized row + // (what INSERT OR IGNORE in the writer would produce) would leak + // unverified coins into a set whose setHash we've already + // committed to. + Logging.instance.w( + "Spark anon set groupId=$groupId server reports different size " + "(${prevMeta!.size} -> ${meta.size}) at same blockHash " + "${meta.blockHash}; skipping sync to preserve cached state.", + ); return; } final numberOfCoinsToFetch = meta.size - prevSize; + if (numberOfCoinsToFetch < 0) { + // Reorg-style shrink: server has fewer coins than our last + // finalized set. Refuse to sync rather than invalidate cached data. + Logging.instance.w( + "Spark anon set groupId=$groupId appears to have shrunk " + "($prevSize -> ${meta.size}); skipping sync to preserve " + "cached data.", + ); + return; + } + if (numberOfCoinsToFetch == 0) { + // blockHash advanced but no new coins in this group's set. We do + // not materialise a new SparkSet row for an empty delta — a + // same-size row would create a tiebreaker ambiguity in + // _getLatestSetInfoForGroupId. Drop any stray in-progress row so + // it doesn't confuse the next resume attempt. + final stale = await _Reader._getIncompleteSetForGroupId( + groupId, + db: _FiroCache.setCacheDB(network), + ); + if (stale.isNotEmpty) { + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._deleteIncompleteSparkSetsForGroup, + data: groupId, + ), + ); + } + return; + } + + // Decide whether to resume an existing in-progress row or start + // fresh. Cases: + // * no in-progress row -> cursor = 0 + // * in-progress blockHash differs -> discard, cursor = 0 + // * in-progress linked > expected delta -> corrupt, discard, + // cursor = 0 + // * in-progress blockHash matches -> cursor = linkedSoFar + final incomplete = await _Reader._getIncompleteSetForGroupId( + groupId, + db: _FiroCache.setCacheDB(network), + ); - final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize; - final remainder = numberOfCoinsToFetch % sectorSize; + int cursor; + if (incomplete.isEmpty) { + cursor = 0; + } else { + final incBlockHash = incomplete.first["blockHash"] as String; + final incSetHash = incomplete.first["setHash"] as String; + final incSetId = incomplete.first["id"] as int; + + // Discard the in-progress row if either blockHash or setHash + // disagrees with the current meta. blockHash disagreement means + // the server's indexing has shifted; setHash disagreement at the + // same blockHash would indicate the in-progress row targets a + // different set snapshot and resuming would mix coin contexts. + if (incBlockHash != meta.blockHash || + incSetHash != meta.setHash) { + Logging.instance.i( + "Spark anon set groupId=$groupId in-progress " + "(blockHash=$incBlockHash, setHash=$incSetHash) does not " + "match meta (blockHash=${meta.blockHash}, " + "setHash=${meta.setHash}); discarding in-flight row.", + ); + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._deleteIncompleteSparkSetsForGroup, + data: groupId, + ), + ); + cursor = 0; + } else { + final linked = await _Reader._countSetCoins( + incSetId, + db: _FiroCache.setCacheDB(network), + ); + if (linked > numberOfCoinsToFetch) { + Logging.instance.w( + "Spark anon set groupId=$groupId in-progress row has " + "$linked linked coins but delta is only " + "$numberOfCoinsToFetch; discarding in-flight row.", + ); + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._deleteIncompleteSparkSetsForGroup, + data: groupId, + ), + ); + cursor = 0; + } else { + cursor = linked; + } + } + } - final List coins = []; + while (cursor < numberOfCoinsToFetch) { + final endIndex = cursor + sectorSize <= numberOfCoinsToFetch + ? cursor + sectorSize + : numberOfCoinsToFetch; + final expected = endIndex - cursor; - for (int i = 0; i < fullSectorCount; i++) { - final start = (i * sectorSize); final data = await client.getSparkAnonymitySetBySector( coinGroupId: groupId, latestBlock: meta.blockHash, - startIndex: start, - endIndex: start + sectorSize, + startIndex: cursor, + endIndex: endIndex, ); - progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch); - coins.addAll(data); - } + // Refuse to persist a sector whose size doesn't match the request: + // a partial or over-full server response would break the finalize- + // time integrity check and potentially skew the resume cursor. + if (data.length != expected) { + throw Exception( + "Spark anon set sector size mismatch for groupId=$groupId: " + "requested $expected coins in range [$cursor, $endIndex), " + "server returned ${data.length}", + ); + } + + final sectorCoins = data + .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) + .toList(); - if (remainder > 0) { - final data = await client.getSparkAnonymitySetBySector( - coinGroupId: groupId, - latestBlock: meta.blockHash, - startIndex: numberOfCoinsToFetch - remainder, - endIndex: numberOfCoinsToFetch, + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._insertSparkAnonSetCoinsIncremental, + data: (meta, sectorCoins, cursor), + ), ); - progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch); - coins.addAll(data); + cursor = endIndex; + progressUpdated?.call(cursor, numberOfCoinsToFetch); } - final result = - coins - .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) - .toList(); - + // All sectors persisted. Flip `complete = 1` iff the link count + // matches the expected delta. On failure the in-progress row stays + // hidden and the error propagates; the next sync will observe the + // over-linked row and reset. await _workers[network]!.runTask( FCTask( - func: FCFuncName._updateSparkAnonSetCoinsWith, - data: (meta, result), + func: FCFuncName._markSparkAnonSetComplete, + data: (meta, numberOfCoinsToFetch), ), ); }); diff --git a/lib/db/sqlite/firo_cache_reader.dart b/lib/db/sqlite/firo_cache_reader.dart index 67fea7764..2ac679753 100644 --- a/lib/db/sqlite/firo_cache_reader.dart +++ b/lib/db/sqlite/firo_cache_reader.dart @@ -4,6 +4,19 @@ part of 'firo_cache.dart'; abstract class _Reader { // =========================================================================== // =============== Spark anonymity set queries =============================== + // + // All anon-set reads filter `ss.complete = 1` so in-flight syncs cannot + // leak partial coin sets to callers. This is especially load-bearing for + // libspark membership-proof construction: a spend initiated while a sync + // is running must not see half-populated SparkSetCoins. + // + // Coin ordering is reconstructed via `orderKey DESC`. The writer stores + // orderKey = server-side delta index (0 = newest). Sorting DESC within a + // set, then Dart-reversing in the coordinator, yields server newest-first + // order end-to-end — matching the pre-fix behavior. The `ssc.id ASC` + // tiebreaker preserves pre-migration rows (where all orderKey == 0): the + // old writer inserted coins in globally-reversed RPC order, so PK-ASC is + // oldest-first, which Dart's `.reversed` flips back to newest-first. static Future _getSetCoinsForGroupId( int groupId, { @@ -14,7 +27,8 @@ abstract class _Reader { FROM SparkSet AS ss JOIN SparkSetCoins AS ssc ON ss.id = ssc.setId JOIN SparkCoin AS sc ON ssc.coinId = sc.id - WHERE ss.groupId = $groupId; + WHERE ss.groupId = $groupId AND ss.complete = 1 + ORDER BY ss.id ASC, ssc.orderKey DESC, ssc.id ASC; """; return db.select("$query;"); @@ -27,8 +41,8 @@ abstract class _Reader { final query = """ SELECT ss.blockHash, ss.setHash, ss.size FROM SparkSet ss - WHERE ss.groupId = $groupId - ORDER BY ss.size DESC + WHERE ss.groupId = $groupId AND ss.complete = 1 + ORDER BY ss.size DESC, ss.id DESC LIMIT 1; """; @@ -44,21 +58,26 @@ abstract class _Reader { WITH TargetBlock AS ( SELECT id FROM SparkSet - WHERE blockHash = ? + WHERE blockHash = ? AND complete = 1 ), TargetSets AS ( SELECT id AS setId FROM SparkSet - WHERE groupId = ? AND id > (SELECT id FROM TargetBlock) + WHERE groupId = ? + AND complete = 1 + AND id > (SELECT id FROM TargetBlock) ) - SELECT + SELECT SparkCoin.serialized, SparkCoin.txHash, SparkCoin.context, SparkCoin.groupId FROM SparkSetCoins JOIN SparkCoin ON SparkSetCoins.coinId = SparkCoin.id - WHERE SparkSetCoins.setId IN (SELECT setId FROM TargetSets); + WHERE SparkSetCoins.setId IN (SELECT setId FROM TargetSets) + ORDER BY SparkSetCoins.setId ASC, + SparkSetCoins.orderKey DESC, + SparkSetCoins.id ASC; """; return db.select("$query;", [blockHash, groupId]); @@ -72,13 +91,49 @@ abstract class _Reader { SELECT EXISTS ( SELECT 1 FROM SparkSet - WHERE groupId = $groupId + WHERE groupId = $groupId AND complete = 1 ) AS setExists; """; return db.select("$query;").first["setExists"] == 1; } + /// In-progress (complete=0) SparkSet row for this group, if any. Used by + /// the coordinator's resume logic to decide whether to continue a partial + /// sync or discard it. + /// + /// In normal flow only one incomplete row can exist per group at a time, + /// but in pathological scenarios (aborted syncs across blockHashes that + /// never got cleaned up) multiple rows might remain. We pick the newest + /// (`id DESC`) deterministically; any older ones get cleaned up the next + /// time the coordinator calls [_deleteIncompleteSparkSetsForGroup]. + static Future _getIncompleteSetForGroupId( + int groupId, { + required Database db, + }) async { + final query = """ + SELECT id, blockHash, setHash, size + FROM SparkSet + WHERE groupId = $groupId AND complete = 0 + ORDER BY id DESC + LIMIT 1; + """; + return db.select("$query;"); + } + + /// Count of coins currently linked to the given SparkSet. Used by the + /// coordinator to compute the resume cursor. + static Future _countSetCoins( + int setId, { + required Database db, + }) async { + final rows = db.select( + "SELECT COUNT(*) AS c FROM SparkSetCoins WHERE setId = ?;", + [setId], + ); + return rows.first["c"] as int; + } + // =========================================================================== // =============== Spark used coin tags queries ============================== @@ -118,7 +173,7 @@ abstract class _Reader { final query = """ SELECT tag, GROUP_CONCAT(txid) AS txids FROM SparkUsedCoinTags - WHERE tag IN ('$tagsConcat') + WHERE tag IN ('$tagsConcat') GROUP BY tag; """; diff --git a/lib/db/sqlite/firo_cache_worker.dart b/lib/db/sqlite/firo_cache_worker.dart index abaceb288..5a4b6d8ae 100644 --- a/lib/db/sqlite/firo_cache_worker.dart +++ b/lib/db/sqlite/firo_cache_worker.dart @@ -1,7 +1,9 @@ part of 'firo_cache.dart'; enum FCFuncName { - _updateSparkAnonSetCoinsWith, + _insertSparkAnonSetCoinsIncremental, + _markSparkAnonSetComplete, + _deleteIncompleteSparkSetsForGroup, _updateSparkUsedTagsWith, } @@ -93,13 +95,28 @@ class _FiroCacheWorker { try { final FCResult result; switch (task.func) { - case FCFuncName._updateSparkAnonSetCoinsWith: - final data = - task.data as (SparkAnonymitySetMeta, List); - result = _updateSparkAnonSetCoinsWith( + case FCFuncName._insertSparkAnonSetCoinsIncremental: + result = _insertSparkAnonSetCoinsIncremental( setCacheDb, - data.$2, - data.$1, + task.data as ( + SparkAnonymitySetMeta, + List, + int, + ), + ); + break; + + case FCFuncName._markSparkAnonSetComplete: + result = _markSparkAnonSetComplete( + setCacheDb, + task.data as (SparkAnonymitySetMeta, int), + ); + break; + + case FCFuncName._deleteIncompleteSparkSetsForGroup: + result = _deleteIncompleteSparkSetsForGroup( + setCacheDb, + task.data as int, ); break; diff --git a/lib/db/sqlite/firo_cache_writer.dart b/lib/db/sqlite/firo_cache_writer.dart index fadc3eb91..5a8c57708 100644 --- a/lib/db/sqlite/firo_cache_writer.dart +++ b/lib/db/sqlite/firo_cache_writer.dart @@ -47,68 +47,240 @@ FCResult _updateSparkUsedTagsWith(Database db, List> tags) { // =========================================================================== // ================== write to spark anon set cache ========================== -/// update the sqlite cache +/// Persist a single sector of a resumable Spark anon-set sync. +/// +/// The sector tuple carries `(meta, coins, firstOrderKey)`: +/// - `meta` identifies the SparkSet row this sector contributes to. On +/// first sector for a sync the row is created with `complete = 0`; +/// subsequent sectors find the existing row via INSERT OR IGNORE. +/// - `coins` is the server's newest-first response for this sector. +/// - `firstOrderKey` is the server-side delta index of `coins[0]` +/// (equivalent to the `startIndex` of the `getSparkAnonymitySetBySector` +/// request that produced the sector). Successive coins get +/// `firstOrderKey + i`. /// -/// returns true if successful, otherwise false -FCResult _updateSparkAnonSetCoinsWith( +/// The row stays invisible to readers (`complete = 0`) until +/// [_markSparkAnonSetComplete] flips the flag. Each sector runs in its own +/// SQLite transaction, so a crash at any point leaves the cache consistent +/// with the last sector that committed. Replay is safe because: +/// - SparkSet: INSERT OR IGNORE against UNIQUE(blockHash, setHash, groupId). +/// - SparkCoin: INSERT OR IGNORE against UNIQUE(serialized, txHash, context, +/// groupId). +/// - SparkSetCoins: INSERT OR IGNORE against UNIQUE(setId, coinId). +FCResult _insertSparkAnonSetCoinsIncremental( Database db, - final List coinsRaw, - SparkAnonymitySetMeta meta, + (SparkAnonymitySetMeta, List, int) sector, ) { - if (coinsRaw.isEmpty) { - // no coins to actually insert - return FCResult(success: true); - } + final (meta, coinsRaw, firstOrderKey) = sector; - final checkResult = db.select( - """ - SELECT * - FROM SparkSet - WHERE blockHash = ? AND setHash = ? AND groupId = ?; - """, - [meta.blockHash, meta.setHash, meta.coinGroupId], - ); - - if (checkResult.isNotEmpty) { - // already up to date + if (coinsRaw.isEmpty) { return FCResult(success: true); } - final coins = coinsRaw.reversed; - db.execute("BEGIN;"); try { + // Create (or find) the in-progress SparkSet row. size is locked to + // meta.size at creation; we never UPDATE it per-sector. The row + // represents "the anon-set meta the coordinator is targeting" — it + // becomes visible to readers only after complete is flipped to 1. db.execute( """ - INSERT INTO SparkSet (blockHash, setHash, groupId, size) - VALUES (?, ?, ?, ?); + INSERT OR IGNORE INTO SparkSet + (blockHash, setHash, groupId, size, complete) + VALUES (?, ?, ?, ?, 0); """, [meta.blockHash, meta.setHash, meta.coinGroupId, meta.size], ); - final setId = db.lastInsertRowId; + final setIdRow = db.select( + """ + SELECT id, complete FROM SparkSet + WHERE blockHash = ? AND setHash = ? AND groupId = ?; + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + if (setIdRow.isEmpty) { + throw StateError( + "Failed to locate SparkSet row after INSERT OR IGNORE for " + "groupId=${meta.coinGroupId}", + ); + } + final setId = setIdRow.first["id"] as int; + final existingComplete = (setIdRow.first["complete"] as int) == 1; + // Defensive: if the target row is already marked complete, the + // coordinator either has a bug or the server is reporting + // inconsistent state for (blockHash, setHash). Either way, appending + // would corrupt a set whose setHash we've committed to. Refuse. + if (existingComplete) { + throw StateError( + "Refusing to append coins to already-finalized SparkSet " + "setId=$setId (groupId=${meta.coinGroupId}, " + "blockHash=${meta.blockHash}, setHash=${meta.setHash}).", + ); + } + + for (int i = 0; i < coinsRaw.length; i++) { + final coin = coinsRaw[i]; + + // Defensive: a mixed-group sector would skew the per-set coin count + // and break the finalize-time integrity check. + if (coin.groupId != meta.coinGroupId) { + throw StateError( + "Spark anon set sector coin groupId mismatch: " + "expected ${meta.coinGroupId}, got ${coin.groupId}", + ); + } - for (final coin in coins) { db.execute( """ - INSERT INTO SparkCoin (serialized, txHash, context, groupId) - VALUES (?, ?, ?, ?); - """, + INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId) + VALUES (?, ?, ?, ?); + """, [coin.serialized, coin.txHash, coin.context, coin.groupId], ); - final coinId = db.lastInsertRowId; + // lastInsertRowId is 0 when INSERT OR IGNORE skipped a duplicate, so + // always SELECT the id explicitly rather than relying on it. + final coinIdRow = db.select( + """ + SELECT id FROM SparkCoin + WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?; + """, + [coin.serialized, coin.txHash, coin.context, coin.groupId], + ); + if (coinIdRow.isEmpty) { + throw StateError( + "Failed to locate SparkCoin row after INSERT OR IGNORE " + "(groupId=${meta.coinGroupId})", + ); + } + final coinId = coinIdRow.first["id"] as int; + + // orderKey = server-side delta index. Used by the reader's ORDER BY + // to emit coins in the same newest-first order as the server. + final orderKey = firstOrderKey + i; - // finally add the row id to the newly added set db.execute( """ - INSERT INTO SparkSetCoins (setId, coinId) - VALUES (?, ?); + INSERT OR IGNORE INTO SparkSetCoins (setId, coinId, orderKey) + VALUES (?, ?, ?); """, - [setId, coinId], + [setId, coinId, orderKey], + ); + } + + db.execute("COMMIT;"); + return FCResult(success: true); + } catch (e) { + db.execute("ROLLBACK;"); + return FCResult(success: false, error: e); + } +} + +/// Finalize a resumable sync by flipping the SparkSet row's `complete` flag +/// to 1, gated on a strict integrity check. +/// +/// The `(meta, expectedLinkedCount)` tuple carries: +/// - `meta`: identifies the row to finalize. +/// - `expectedLinkedCount`: the number of coins the coordinator believes +/// it appended for this sync. For first-sync of a group this is +/// meta.size; for an incremental sync it is meta.size minus the +/// previously-finalized size for this group. +/// +/// If the actual linked count in SparkSetCoins does not match the expected +/// value the transaction rolls back and the row stays `complete = 0`. +/// Coordinator's next sync will observe the over-linked row and reset it. +/// A partial, over-full, or corrupted cache therefore never becomes the +/// current set. +FCResult _markSparkAnonSetComplete( + Database db, + (SparkAnonymitySetMeta, int) payload, +) { + final (meta, expectedLinkedCount) = payload; + + db.execute("BEGIN;"); + try { + final setIdRow = db.select( + """ + SELECT id, complete FROM SparkSet + WHERE blockHash = ? AND setHash = ? AND groupId = ?; + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + if (setIdRow.isEmpty) { + // No row to finalize. Only reachable for the empty-delta edge case + // (blockHash advanced without new coins), and the coordinator already + // handles that case without calling us. Return success so an + // accidental call is an idempotent no-op rather than an error. + db.execute("ROLLBACK;"); + return FCResult(success: true); + } + final setId = setIdRow.first["id"] as int; + final alreadyComplete = (setIdRow.first["complete"] as int) == 1; + + if (alreadyComplete) { + // Idempotent replay: the row was finalized in a prior call. + db.execute("COMMIT;"); + return FCResult(success: true); + } + + final linkedRow = db.select( + """ + SELECT COUNT(*) AS c FROM SparkSetCoins WHERE setId = ?; + """, + [setId], + ); + final linked = linkedRow.first["c"] as int; + if (linked != expectedLinkedCount) { + db.execute("ROLLBACK;"); + return FCResult( + success: false, + error: StateError( + "Cannot finalize SparkSet setId=$setId " + "(groupId=${meta.coinGroupId}): linked $linked coins " + "but expected $expectedLinkedCount", + ), ); } + db.execute( + "UPDATE SparkSet SET complete = 1 WHERE id = ?;", + [setId], + ); + db.execute("COMMIT;"); + return FCResult(success: true); + } catch (e) { + db.execute("ROLLBACK;"); + return FCResult(success: false, error: e); + } +} +/// Delete every incomplete SparkSet row (and its SparkSetCoins links) for +/// this group. Used when: +/// * The server's blockHash has shifted between resume attempts — at the +/// new blockHash, the partial row's orderKeys no longer align with +/// server indices. +/// * The in-progress row's linked count somehow exceeds the expected +/// delta (corruption guard). +/// * An empty-delta sync needs to clear a stray partial before returning. +/// +/// Finalized rows (`complete = 1`) are never touched. +FCResult _deleteIncompleteSparkSetsForGroup(Database db, int groupId) { + db.execute("BEGIN;"); + try { + db.execute( + """ + DELETE FROM SparkSetCoins + WHERE setId IN ( + SELECT id FROM SparkSet WHERE groupId = ? AND complete = 0 + ); + """, + [groupId], + ); + db.execute( + "DELETE FROM SparkSet WHERE groupId = ? AND complete = 0;", + [groupId], + ); + db.execute("COMMIT;"); return FCResult(success: true); } catch (e) { db.execute("ROLLBACK;");