Skip to content

Commit

Permalink
fix: more sparse handling of RequestIPrune
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 30, 2024
1 parent 3934c1c commit 39e9da6
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 81 deletions.
135 changes: 54 additions & 81 deletions packages/programs/data/shared-log/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,7 @@ export class SharedLog<
if (isLeader) {
for (const entry of entries) {
this.pruneDebouncedFn.delete(entry.entry.hash);
this._requestIPruneSent.delete(entry.entry.hash);
this.removePruneRequestSent(entry.entry.hash);
this._requestIPruneResponseReplicatorSet.delete(
entry.entry.hash,
);
Expand Down Expand Up @@ -2130,13 +2130,17 @@ export class SharedLog<
}
} else if (msg instanceof RequestIPrune) {
const hasAndIsLeader: string[] = [];
const from = context.from.hashcode();

for (const hash of msg.hashes) {
this.removePruneRequestSent(hash, from);

// if we expect the remote to be owner of this entry because we are to prune ourselves, then we need to remove the remote
// this is due to that the remote has previously indicated to be a replicator to help us prune but now has changed their mind
const outGoingPrunes =
this._requestIPruneResponseReplicatorSet.get(hash);
if (outGoingPrunes) {
outGoingPrunes.delete(context.from.hashcode());
outGoingPrunes.delete(from);
}

const indexedEntry = await this.log.entryIndex.getShallow(hash);
Expand Down Expand Up @@ -2170,6 +2174,8 @@ export class SharedLog<
}

if (isLeader) {
//console.log("IS LEADER", this.node.identity.publicKey.hashcode(), hash);

hasAndIsLeader.push(hash);

hasAndIsLeader.length > 0 &&
Expand Down Expand Up @@ -2205,13 +2211,20 @@ export class SharedLog<
context.from!.hashcode(),
entry.meta.gid,
);
this.removePruneRequestSent(entry.hash, from);
let isLeader = false;
await this.findLeaders(
await this._waitForReplicators(
await this.createCoordinates(
entry,
decodeReplicas(entry).getValue(this),
),
entry,
[
{
key: this.node.identity.publicKey.hashcode(),
replicator: true,
},
],
{
onLeader: (key) => {
isLeader =
Expand Down Expand Up @@ -3171,6 +3184,20 @@ export class SharedLog<
return new AbsoluteReplicas(maxValue);
}

private removePruneRequestSent(hash: string, to?: string) {
if (!to) {
this._requestIPruneSent.delete(hash);
} else {
let set = this._requestIPruneSent.get(hash);
if (set) {
set.delete(to);
if (set.size === 0) {
this._requestIPruneSent.delete(hash);
}
}
}
}

prune(
entries: Map<
string,
Expand All @@ -3184,14 +3211,18 @@ export class SharedLog<
if (options?.unchecked) {
return [...entries.values()].map((x) => {
this._gidPeersHistory.delete(x.entry.meta.gid);
this._requestIPruneSent.delete(x.entry.hash);
this.removePruneRequestSent(x.entry.hash);
this._requestIPruneResponseReplicatorSet.delete(x.entry.hash);
return this.log.remove(x.entry, {
recursively: true,
});
});
}

if (this.closed) {
return [];
}

// ask network if they have they entry,
// so I can delete it

Expand Down Expand Up @@ -3227,7 +3258,6 @@ export class SharedLog<
const deferredPromise: DeferredPromise<void> = pDefer();

const clear = () => {
//pendingPrev?.clear();
const pending = this._pendingDeletes.get(entry.hash);
if (pending?.promise === deferredPromise) {
this._pendingDeletes.delete(entry.hash);
Expand All @@ -3239,6 +3269,10 @@ export class SharedLog<
clear();
cleanupTimer.push(
setTimeout(async () => {
this._gidPeersHistory.delete(entry.meta.gid);
this.removePruneRequestSent(entry.hash);
this._requestIPruneResponseReplicatorSet.delete(entry.hash);

if (
await this.isLeader({
entry,
Expand All @@ -3251,10 +3285,6 @@ export class SharedLog<
return;
}

this._gidPeersHistory.delete(entry.meta.gid);
this._requestIPruneSent.delete(entry.hash);
this._requestIPruneResponseReplicatorSet.delete(entry.hash);

return this.log
.remove(entry, {
recursively: true,
Expand All @@ -3267,7 +3297,7 @@ export class SharedLog<
})
.finally(async () => {
this._gidPeersHistory.delete(entry.meta.gid);
this._requestIPruneSent.delete(entry.hash);
this.removePruneRequestSent(entry.hash);
this._requestIPruneResponseReplicatorSet.delete(entry.hash);
// TODO in the case we become leader again here we need to re-add the entry

Expand All @@ -3286,7 +3316,7 @@ export class SharedLog<

const reject = (e: any) => {
clear();
this._requestIPruneSent.delete(entry.hash);
this.removePruneRequestSent(entry.hash);
this._requestIPruneResponseReplicatorSet.delete(entry.hash);
deferredPromise.reject(e);
};
Expand All @@ -3301,9 +3331,7 @@ export class SharedLog<

this._pendingDeletes.set(entry.hash, {
promise: deferredPromise,
clear: () => {
clear();
},
clear,
reject,
resolve: async (publicKeyHash: string) => {
const minReplicasObj = this.getClampedReplicas(minReplicas);
Expand Down Expand Up @@ -3363,10 +3391,9 @@ export class SharedLog<
set = new Set();
this._requestIPruneSent.set(entry, set);
}

/* if (set.has(to)) {
continue;
} */
/* if (set.has(to)) {
continue;
} */
set.add(to);
filteredSet.push(entry);
}
Expand All @@ -3390,64 +3417,10 @@ export class SharedLog<
emitMessages(v, k);
}

/* const fn = async () => {
this.rpc.send(
new RequestIPrune({
hashes: filteredEntries.map(x => x.hash),
}),
{
mode: new SilentDelivery({
to: [...await this.getReplicators()],
redundancy: 1,
}),
priority: 1,
},
)
};
fn() */

/* const onPeersChange = async (
e?: CustomEvent<ReplicatorJoinEvent>,
reason?: string,
) => {
if (
true // e.detail.publicKey.equals(this.node.identity.publicKey) === false // TODO proper condition
) {
const peerToEntryMap = await this.groupByLeaders(
filteredEntries
.filter((x) => !readyToDelete.has(x.hash))
.map((x) => {
return { entry: x, replicas: maxReplicasValue }; // TODO choose right maxReplicasValue, should it really be for all entries combined?
}),
);
for (const receiver of peerToEntryMap.keys()) {
if (receiver === this.node.identity.publicKey.hashcode()) {
continue;
}
const peerEntries = peerToEntryMap.get(receiver);
if (peerEntries && peerEntries.length > 0) {
emitMessages(
peerEntries.map((x) => filteredEntries[x].hash),
receiver,
);
}
}
}
}; */

// check joining peers
/* this.events.addEventListener("replication:change", onPeersChange);
this.events.addEventListener("replicator:mature", onPeersChange);
this.events.addEventListener("replicator:join", onPeersChange); */

let cleanup = () => {
for (const timer of cleanupTimer) {
clearTimeout(timer);
}
/* this.events.removeEventListener("replication:change", onPeersChange);
this.events.removeEventListener("replicator:mature", onPeersChange);
this.events.removeEventListener("replicator:join", onPeersChange); */
this._closeController.signal.removeEventListener("abort", cleanup);
};

Expand Down Expand Up @@ -3565,15 +3538,14 @@ export class SharedLog<
if (!set.has(entryReplicated.hash)) {
set.set(entryReplicated.hash, entryReplicated);
}
}
}

/* for (const entry of coordinates) {
let arr = set.get(entry.hash);
if (!arr) {
arr = [];
set.set(entry.hash, arr);
}
arr.push(entry);
} */
if (oldPeersSet) {
for (const oldPeer of oldPeersSet) {
if (!currentPeers.has(oldPeer)) {
this.removePruneRequestSent(entryReplicated.hash);
}
}
}

Expand All @@ -3591,13 +3563,14 @@ export class SharedLog<
});
}

// console.log("DELETE RESPONSE AS LEADER", this.node.identity.publicKey.hashcode(), entryReplicated.hash)
this.responseToPruneDebouncedFn.delete(entryReplicated.hash); // don't allow others to prune because of expecting me to replicating this entry
} else {
this.pruneDebouncedFn.delete(entryReplicated.hash);
await this._pendingDeletes
.get(entryReplicated.hash)
?.reject(new Error("Failed to delete, is leader again"));
this._requestIPruneSent.delete(entryReplicated.hash);
this.removePruneRequestSent(entryReplicated.hash);
}
}
for (const [target, entries] of uncheckedDeliver) {
Expand Down
15 changes: 15 additions & 0 deletions packages/programs/data/shared-log/test/lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,19 @@ describe("lifecycle", () => {
abortController.abort("Done");
});
});

describe("prune", () => {
it("prune after close is no-op", async () => {
session = await TestSession.connected(1);
const store = new EventStore();
const db = await session.peers[0].open(store);
const { entry } = await db.add("hello");
await db.close();
let pruneMap = new Map([[entry.hash, { entry, leaders: new Map() }]]);
let t0 = +new Date();
await Promise.all(db.log.prune(pruneMap));
let t1 = +new Date();
expect(t1 - t0).to.be.lessThan(100);
});
});
});
1 change: 1 addition & 0 deletions packages/programs/data/shared-log/test/sharding.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ testSetups.forEach((setup) => {
}

await Promise.all(promises);

await checkBounded(entryCount, 0.35, 0.65, db1, db2);
});

Expand Down

0 comments on commit 39e9da6

Please sign in to comment.