Skip to content

Commit

Permalink
fix: increase seek timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 13, 2023
1 parent 4a698f2 commit d4cf164
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
30 changes: 21 additions & 9 deletions packages/transport/stream/src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ class TestDirectStream extends DirectStream {
options: {
id?: string;
connectionManager?: ConnectionManagerArguments;
seekTimeout?: number;
} = {}
) {
super(components, [options.id || "/test/0.0.0"], {
canRelayMessage: true,
connectionManager: options.connectionManager,
seekTimeout: options.seekTimeout,
...options
});
}
Expand Down Expand Up @@ -1668,7 +1670,10 @@ describe("join/leave", () => {
session = await disconnected(4, {
services: {
directstream: (c) =>
new TestDirectStream(c, { connectionManager: false })
new TestDirectStream(c, {
connectionManager: false,
seekTimeout: 5000
})
}
});

Expand Down Expand Up @@ -1818,14 +1823,21 @@ describe("join/leave", () => {

await session.peers[2].stop();

await waitForResolved(() => {
expect(
streams[3].stream.routes.isReachable(
streams[3].stream.publicKeyHash,
streams[2].stream.publicKeyHash
)
).toEqual(false);
});
try {
await waitForResolved(
() => {
expect(
streams[3].stream.routes.isReachable(
streams[3].stream.publicKeyHash,
streams[2].stream.publicKeyHash
)
).toEqual(false);
},
{ timeout: 20 * 1000 }
);
} catch (error) {
throw error;
}

await waitForResolved(() => {
expect(
Expand Down
19 changes: 13 additions & 6 deletions packages/transport/stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ export interface PeerStreamEvents {
close: CustomEvent<never>;
}

const SEEK_DELIVERY_TIMEOUT = 9e3;
const SEEK_DELIVERY_TIMEOUT = 15e3;
const MAX_DATA_LENGTH = 1e7 + 1000; // 10 mb and some metadata
const MAX_QUEUED_BYTES = MAX_DATA_LENGTH * 50;

const DEFAULT_PRUNE_CONNECTIONS_INTERVAL = 1e4;
const DEFAULT_PRUNE_CONNECTIONS_INTERVAL = 2e4;
const DEFAULT_MIN_CONNECTIONS = 2;
const DEFAULT_MAX_CONNECTIONS = 300;

Expand Down Expand Up @@ -354,6 +354,7 @@ export type DirectStreamOptions = {
signaturePolicy?: SignaturePolicy;
connectionManager?: ConnectionManagerArguments;
routeSeekInterval?: number;
seekTimeout?: number;
};

export interface DirectStreamComponents extends Components {
Expand Down Expand Up @@ -412,6 +413,7 @@ export abstract class DirectStream<
private pruneConnectionsTimeout: ReturnType<typeof setInterval>;
private prunedConnectionsCache?: Cache<string>;
routeSeekInterval: number;
seekTimeout: number;
closeController: AbortController;
private _ackCallbacks: Map<
string,
Expand Down Expand Up @@ -440,10 +442,12 @@ export abstract class DirectStream<
maxOutboundStreams,
signaturePolicy = "StictSign",
connectionManager,
routeSeekInterval = ROUTE_UPDATE_DELAY_FACTOR
routeSeekInterval = ROUTE_UPDATE_DELAY_FACTOR,
seekTimeout = SEEK_DELIVERY_TIMEOUT
} = options || {};

const signKey = getKeypairFromPeerId(components.peerId);
this.seekTimeout = seekTimeout;
this.sign = signKey.sign.bind(signKey);
this.peerId = components.peerId;
this.publicKey = signKey.publicKey;
Expand Down Expand Up @@ -1292,7 +1296,10 @@ export abstract class DirectStream<

if (filteredLeaving.length > 0) {
this.publish(new Uint8Array(0), {
mode: new SeekDelivery({ to: filteredLeaving, redundancy: 2 })
mode: new SeekDelivery({
to: filteredLeaving,
redundancy: DEFAULT_SEEK_MESSAGE_REDUDANCY
})
}).catch((e) => {
if (e instanceof TimeoutError || e instanceof AbortError) {
// peer left or closed
Expand Down Expand Up @@ -1446,7 +1453,7 @@ export abstract class DirectStream<
to,
setTimeout(() => {
this.removeRouteConnection(to, false);
}, SEEK_DELIVERY_TIMEOUT)
}, this.seekTimeout + 5000)
);
}
}
Expand Down Expand Up @@ -1507,7 +1514,7 @@ export abstract class DirectStream<
} else {
deliveryDeferredPromise.resolve();
}
}, SEEK_DELIVERY_TIMEOUT);
}, this.seekTimeout);

this._ackCallbacks.set(idString, {
promise: deliveryDeferredPromise.promise,
Expand Down

0 comments on commit d4cf164

Please sign in to comment.