diff --git a/packages/programs/data/shared-log/src/index.ts b/packages/programs/data/shared-log/src/index.ts index da66fe0cb..398bdf9ea 100644 --- a/packages/programs/data/shared-log/src/index.ts +++ b/packages/programs/data/shared-log/src/index.ts @@ -2037,7 +2037,7 @@ export class SharedLog< if (isLeader) { for (const entry of entries) { this.pruneDebouncedFn.delete(entry.entry.hash); - this._requestIPruneSent.delete(entry.entry.hash); + this.removePruneRequestSent(entry.entry.hash); this._requestIPruneResponseReplicatorSet.delete( entry.entry.hash, ); @@ -2130,13 +2130,17 @@ export class SharedLog< } } else if (msg instanceof RequestIPrune) { const hasAndIsLeader: string[] = []; + const from = context.from.hashcode(); + for (const hash of msg.hashes) { + this.removePruneRequestSent(hash, from); + // if we expect the remote to be owner of this entry because we are to prune ourselves, then we need to remove the remote // this is due to that the remote has previously indicated to be a replicator to help us prune but now has changed their mind const outGoingPrunes = this._requestIPruneResponseReplicatorSet.get(hash); if (outGoingPrunes) { - outGoingPrunes.delete(context.from.hashcode()); + outGoingPrunes.delete(from); } const indexedEntry = await this.log.entryIndex.getShallow(hash); @@ -2170,6 +2174,8 @@ export class SharedLog< } if (isLeader) { + //console.log("IS LEADER", this.node.identity.publicKey.hashcode(), hash); + hasAndIsLeader.push(hash); hasAndIsLeader.length > 0 && @@ -2205,13 +2211,20 @@ export class SharedLog< context.from!.hashcode(), entry.meta.gid, ); + this.removePruneRequestSent(entry.hash, from); let isLeader = false; - await this.findLeaders( + await this._waitForReplicators( await this.createCoordinates( entry, decodeReplicas(entry).getValue(this), ), entry, + [ + { + key: this.node.identity.publicKey.hashcode(), + replicator: true, + }, + ], { onLeader: (key) => { isLeader = @@ -3171,6 +3184,20 @@ export class SharedLog< return new AbsoluteReplicas(maxValue); } + private removePruneRequestSent(hash: string, to?: string) { + if (!to) { + this._requestIPruneSent.delete(hash); + } else { + let set = this._requestIPruneSent.get(hash); + if (set) { + set.delete(to); + if (set.size === 0) { + this._requestIPruneSent.delete(hash); + } + } + } + } + prune( entries: Map< string, @@ -3184,7 +3211,7 @@ export class SharedLog< if (options?.unchecked) { return [...entries.values()].map((x) => { this._gidPeersHistory.delete(x.entry.meta.gid); - this._requestIPruneSent.delete(x.entry.hash); + this.removePruneRequestSent(x.entry.hash); this._requestIPruneResponseReplicatorSet.delete(x.entry.hash); return this.log.remove(x.entry, { recursively: true, @@ -3192,6 +3219,10 @@ export class SharedLog< }); } + if (this.closed) { + return []; + } + // ask network if they have they entry, // so I can delete it @@ -3227,7 +3258,6 @@ export class SharedLog< const deferredPromise: DeferredPromise = pDefer(); const clear = () => { - //pendingPrev?.clear(); const pending = this._pendingDeletes.get(entry.hash); if (pending?.promise === deferredPromise) { this._pendingDeletes.delete(entry.hash); @@ -3239,6 +3269,10 @@ export class SharedLog< clear(); cleanupTimer.push( setTimeout(async () => { + this._gidPeersHistory.delete(entry.meta.gid); + this.removePruneRequestSent(entry.hash); + this._requestIPruneResponseReplicatorSet.delete(entry.hash); + if ( await this.isLeader({ entry, @@ -3251,10 +3285,6 @@ export class SharedLog< return; } - this._gidPeersHistory.delete(entry.meta.gid); - this._requestIPruneSent.delete(entry.hash); - this._requestIPruneResponseReplicatorSet.delete(entry.hash); - return this.log .remove(entry, { recursively: true, @@ -3267,7 +3297,7 @@ export class SharedLog< }) .finally(async () => { this._gidPeersHistory.delete(entry.meta.gid); - this._requestIPruneSent.delete(entry.hash); + this.removePruneRequestSent(entry.hash); this._requestIPruneResponseReplicatorSet.delete(entry.hash); // TODO in the case we become leader again here we need to re-add the entry @@ -3286,7 +3316,7 @@ export class SharedLog< const reject = (e: any) => { clear(); - this._requestIPruneSent.delete(entry.hash); + this.removePruneRequestSent(entry.hash); this._requestIPruneResponseReplicatorSet.delete(entry.hash); deferredPromise.reject(e); }; @@ -3301,9 +3331,7 @@ export class SharedLog< this._pendingDeletes.set(entry.hash, { promise: deferredPromise, - clear: () => { - clear(); - }, + clear, reject, resolve: async (publicKeyHash: string) => { const minReplicasObj = this.getClampedReplicas(minReplicas); @@ -3363,10 +3391,9 @@ export class SharedLog< set = new Set(); this._requestIPruneSent.set(entry, set); } - - /* if (set.has(to)) { - continue; - } */ + /* if (set.has(to)) { + continue; + } */ set.add(to); filteredSet.push(entry); } @@ -3390,64 +3417,10 @@ export class SharedLog< emitMessages(v, k); } - /* const fn = async () => { - this.rpc.send( - new RequestIPrune({ - hashes: filteredEntries.map(x => x.hash), - }), - { - mode: new SilentDelivery({ - to: [...await this.getReplicators()], - redundancy: 1, - }), - priority: 1, - }, - ) - }; - fn() */ - - /* const onPeersChange = async ( - e?: CustomEvent, - reason?: string, - ) => { - if ( - true // e.detail.publicKey.equals(this.node.identity.publicKey) === false // TODO proper condition - ) { - - const peerToEntryMap = await this.groupByLeaders( - filteredEntries - .filter((x) => !readyToDelete.has(x.hash)) - .map((x) => { - return { entry: x, replicas: maxReplicasValue }; // TODO choose right maxReplicasValue, should it really be for all entries combined? - }), - ); - for (const receiver of peerToEntryMap.keys()) { - if (receiver === this.node.identity.publicKey.hashcode()) { - continue; - } - const peerEntries = peerToEntryMap.get(receiver); - if (peerEntries && peerEntries.length > 0) { - emitMessages( - peerEntries.map((x) => filteredEntries[x].hash), - receiver, - ); - } - } - } - }; */ - - // check joining peers - /* this.events.addEventListener("replication:change", onPeersChange); - this.events.addEventListener("replicator:mature", onPeersChange); - this.events.addEventListener("replicator:join", onPeersChange); */ - let cleanup = () => { for (const timer of cleanupTimer) { clearTimeout(timer); } - /* this.events.removeEventListener("replication:change", onPeersChange); - this.events.removeEventListener("replicator:mature", onPeersChange); - this.events.removeEventListener("replicator:join", onPeersChange); */ this._closeController.signal.removeEventListener("abort", cleanup); }; @@ -3565,15 +3538,14 @@ export class SharedLog< if (!set.has(entryReplicated.hash)) { set.set(entryReplicated.hash, entryReplicated); } + } + } - /* for (const entry of coordinates) { - let arr = set.get(entry.hash); - if (!arr) { - arr = []; - set.set(entry.hash, arr); - } - arr.push(entry); - } */ + if (oldPeersSet) { + for (const oldPeer of oldPeersSet) { + if (!currentPeers.has(oldPeer)) { + this.removePruneRequestSent(entryReplicated.hash); + } } } @@ -3591,13 +3563,14 @@ export class SharedLog< }); } + // console.log("DELETE RESPONSE AS LEADER", this.node.identity.publicKey.hashcode(), entryReplicated.hash) this.responseToPruneDebouncedFn.delete(entryReplicated.hash); // don't allow others to prune because of expecting me to replicating this entry } else { this.pruneDebouncedFn.delete(entryReplicated.hash); await this._pendingDeletes .get(entryReplicated.hash) ?.reject(new Error("Failed to delete, is leader again")); - this._requestIPruneSent.delete(entryReplicated.hash); + this.removePruneRequestSent(entryReplicated.hash); } } for (const [target, entries] of uncheckedDeliver) { diff --git a/packages/programs/data/shared-log/test/lifecycle.spec.ts b/packages/programs/data/shared-log/test/lifecycle.spec.ts index c67597f00..1586e5f7c 100644 --- a/packages/programs/data/shared-log/test/lifecycle.spec.ts +++ b/packages/programs/data/shared-log/test/lifecycle.spec.ts @@ -260,4 +260,19 @@ describe("lifecycle", () => { abortController.abort("Done"); }); }); + + describe("prune", () => { + it("prune after close is no-op", async () => { + session = await TestSession.connected(1); + const store = new EventStore(); + const db = await session.peers[0].open(store); + const { entry } = await db.add("hello"); + await db.close(); + let pruneMap = new Map([[entry.hash, { entry, leaders: new Map() }]]); + let t0 = +new Date(); + await Promise.all(db.log.prune(pruneMap)); + let t1 = +new Date(); + expect(t1 - t0).to.be.lessThan(100); + }); + }); }); diff --git a/packages/programs/data/shared-log/test/sharding.spec.ts b/packages/programs/data/shared-log/test/sharding.spec.ts index 3ef3dcc58..68b601754 100644 --- a/packages/programs/data/shared-log/test/sharding.spec.ts +++ b/packages/programs/data/shared-log/test/sharding.spec.ts @@ -231,6 +231,7 @@ testSetups.forEach((setup) => { } await Promise.all(promises); + await checkBounded(entryCount, 0.35, 0.65, db1, db2); });