From 08469c0a57d2c6236fd6e2ac8587c43610ed08ea Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Wed, 25 Oct 2023 10:38:02 +0200 Subject: [PATCH] fix: improve waitForIsLeader performance --- .../programs/data/shared-log/src/index.ts | 59 +++++++++++++++---- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/packages/programs/data/shared-log/src/index.ts b/packages/programs/data/shared-log/src/index.ts index 63aab2c24..305c59570 100644 --- a/packages/programs/data/shared-log/src/index.ts +++ b/packages/programs/data/shared-log/src/index.ts @@ -34,7 +34,7 @@ import { SubscriptionEvent, UnsubcriptionEvent } from "@peerbit/pubsub-interface"; -import { AbortError, TimeoutError, waitFor } from "@peerbit/time"; +import { AbortError, delay, TimeoutError, waitFor } from "@peerbit/time"; import { Observer, Replicator, Role } from "./role.js"; import { AbsoluteReplicas, @@ -90,6 +90,7 @@ export interface SharedLogOptions { } export const DEFAULT_MIN_REPLICAS = 2; +export const WAIT_FOR_REPLICATOR_TIMEOUT = 500; export type Args = LogProperties & LogEvents & SharedLogOptions; export type SharedAppendOptions = AppendOptions & { @@ -169,6 +170,12 @@ export class SharedLog extends Program< if (wasReplicator) { await this.replicationReorganization(); } + + this.events.dispatchEvent( + new CustomEvent("role", { + detail: { publicKey: this.node.identity.publicKey, role: this._role } + }) + ); } private async initializeWithRole() { @@ -605,7 +612,7 @@ export class SharedLog extends Program< this.waitFor(context.from, { signal: this._closeController.signal, - timeout: 3000 + timeout: WAIT_FOR_REPLICATOR_TIMEOUT }) .then(async () => { const change = await this.modifyReplicatorsCache( @@ -686,14 +693,49 @@ export class SharedLog extends Program< slot: { toString(): string }, numberOfLeaders: number ): Promise { - try { + return new Promise((res, rej) => { + const removeListeners = () => { + this.events.removeEventListener("role", roleListener); + this._closeController.signal.addEventListener("abort", abortListener); + }; + const abortListener = () => { + removeListeners(); + clearTimeout(timeout); + res(false); + }; + + const timeout = setTimeout(() => { + removeListeners(); + res(false); + }, WAIT_FOR_REPLICATOR_TIMEOUT); + + const check = () => + this.isLeader(slot, numberOfLeaders).then((isLeader) => { + if (isLeader) { + removeListeners(); + clearTimeout(timeout); + res(isLeader); + } + }); + + const roleListener = () => { + check(); + }; + this.events.addEventListener("role", roleListener); + this._closeController.signal.addEventListener("abort", abortListener); + + check(); + }); + /* */ + + /* try { const result = (await waitFor( async () => (await this.isLeader(slot, numberOfLeaders)) == true ? true : undefined, - { timeout: 3000, signal: this._closeController.signal } + { timeout: WAIT_FOR_REPLICATOR_TIMEOUT, signal: this._closeController.signal } )) == true; return result; } catch (error: any) { @@ -702,7 +744,7 @@ export class SharedLog extends Program< } throw error; - } + } */ } async findLeaders( @@ -961,16 +1003,11 @@ export class SharedLog extends Program< } */ async replicationReorganization() { - /* console.log( - "???", - this.node.identity.publicKey.hashcode(), - this.getReplicatorsSorted() - ); */ - const changed = false; const heads = await this.log.getHeads(); const groupedByGid = await groupByGid(heads); let storeChanged = false; + for (const [gid, entries] of groupedByGid) { if (this.closed) { break;