Skip to content

Commit

Permalink
fix: improve waitForIsLeader performance
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Oct 25, 2023
1 parent d12eb28 commit 08469c0
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions packages/programs/data/shared-log/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
SubscriptionEvent,
UnsubcriptionEvent
} from "@peerbit/pubsub-interface";
import { AbortError, TimeoutError, waitFor } from "@peerbit/time";
import { AbortError, delay, TimeoutError, waitFor } from "@peerbit/time";
import { Observer, Replicator, Role } from "./role.js";
import {
AbsoluteReplicas,
Expand Down Expand Up @@ -90,6 +90,7 @@ export interface SharedLogOptions {
}

export const DEFAULT_MIN_REPLICAS = 2;
export const WAIT_FOR_REPLICATOR_TIMEOUT = 500;

export type Args<T> = LogProperties<T> & LogEvents<T> & SharedLogOptions;
export type SharedAppendOptions<T> = AppendOptions<T> & {
Expand Down Expand Up @@ -169,6 +170,12 @@ export class SharedLog<T = Uint8Array> extends Program<
if (wasReplicator) {
await this.replicationReorganization();
}

this.events.dispatchEvent(
new CustomEvent<UpdateRoleEvent>("role", {
detail: { publicKey: this.node.identity.publicKey, role: this._role }
})
);
}

private async initializeWithRole() {
Expand Down Expand Up @@ -605,7 +612,7 @@ export class SharedLog<T = Uint8Array> extends Program<

this.waitFor(context.from, {
signal: this._closeController.signal,
timeout: 3000
timeout: WAIT_FOR_REPLICATOR_TIMEOUT
})
.then(async () => {
const change = await this.modifyReplicatorsCache(
Expand Down Expand Up @@ -686,14 +693,49 @@ export class SharedLog<T = Uint8Array> extends Program<
slot: { toString(): string },
numberOfLeaders: number
): Promise<boolean> {
try {
return new Promise((res, rej) => {
const removeListeners = () => {
this.events.removeEventListener("role", roleListener);
this._closeController.signal.addEventListener("abort", abortListener);
};
const abortListener = () => {
removeListeners();
clearTimeout(timeout);
res(false);
};

const timeout = setTimeout(() => {
removeListeners();
res(false);
}, WAIT_FOR_REPLICATOR_TIMEOUT);

const check = () =>
this.isLeader(slot, numberOfLeaders).then((isLeader) => {
if (isLeader) {
removeListeners();
clearTimeout(timeout);
res(isLeader);
}
});

const roleListener = () => {
check();
};
this.events.addEventListener("role", roleListener);
this._closeController.signal.addEventListener("abort", abortListener);

check();
});
/* */

/* try {
const result =
(await waitFor(
async () =>
(await this.isLeader(slot, numberOfLeaders)) == true
? true
: undefined,
{ timeout: 3000, signal: this._closeController.signal }
{ timeout: WAIT_FOR_REPLICATOR_TIMEOUT, signal: this._closeController.signal }
)) == true;
return result;
} catch (error: any) {
Expand All @@ -702,7 +744,7 @@ export class SharedLog<T = Uint8Array> extends Program<
}
throw error;
}
} */
}

async findLeaders(
Expand Down Expand Up @@ -961,16 +1003,11 @@ export class SharedLog<T = Uint8Array> extends Program<
}
*/
async replicationReorganization() {
/* console.log(
"???",
this.node.identity.publicKey.hashcode(),
this.getReplicatorsSorted()
); */

const changed = false;
const heads = await this.log.getHeads();
const groupedByGid = await groupByGid(heads);
let storeChanged = false;

for (const [gid, entries] of groupedByGid) {
if (this.closed) {
break;
Expand Down

0 comments on commit 08469c0

Please sign in to comment.