diff --git a/packages/clients/peerbit/src/__tests__/dial.test.ts b/packages/clients/peerbit/src/__tests__/dial.test.ts index a85198f5a..cd077f51f 100644 --- a/packages/clients/peerbit/src/__tests__/dial.test.ts +++ b/packages/clients/peerbit/src/__tests__/dial.test.ts @@ -36,10 +36,19 @@ describe(`dial`, function () { it("autodials by default", async () => { expect( - clients[0].services.pubsub["connectionManagerOptions"].autoDial - ).toBeTrue(); + clients[0].services.pubsub.connectionManagerOptions.dialer + ).toBeDefined(); expect( - clients[1].services.pubsub["connectionManagerOptions"].autoDial - ).toBeTrue(); + clients[1].services.blocks.connectionManagerOptions.dialer + ).toBeDefined(); + }); + + it("autoprunes by default", async () => { + expect( + clients[0].services.pubsub.connectionManagerOptions.pruner + ).toBeDefined(); + expect( + clients[1].services.blocks.connectionManagerOptions.pruner + ).toBeDefined(); }); }); diff --git a/packages/clients/peerbit/src/libp2p.ts b/packages/clients/peerbit/src/libp2p.ts index 986987102..3fe3e9459 100644 --- a/packages/clients/peerbit/src/libp2p.ts +++ b/packages/clients/peerbit/src/libp2p.ts @@ -64,10 +64,9 @@ export const createLibp2pExtended = ( ((c) => new DirectSub(c, { canRelayMessage: true, - signaturePolicy: "StrictNoSign", - connectionManager: { - autoDial: true - } + signaturePolicy: "StrictNoSign" + // auto dial true + // auto prune true })), blocks: opts.services?.blocks || ((c) => new DirectBlock(c)), ...opts.services diff --git a/packages/programs/data/document/src/__benchmark__/replication.ts b/packages/programs/data/document/src/__benchmark__/replication.ts index 4971b7350..c54f9c2fa 100644 --- a/packages/programs/data/document/src/__benchmark__/replication.ts +++ b/packages/programs/data/document/src/__benchmark__/replication.ts @@ -8,6 +8,7 @@ import { Documents, SetupOptions } from "../document-store.js"; import { Replicator } from "@peerbit/shared-log"; import { DirectSub } from "@peerbit/pubsub"; import { mplex } from "@libp2p/mplex"; + // Run with "node --loader ts-node/esm ./src/__benchmark__/replication.ts" // put x 1,009 ops/sec ±2.57% (80 runs sampled) @@ -55,18 +56,19 @@ const peers = await Promise.all( pubsub: (sub) => new DirectSub(sub, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }), await createLibp2pExtended({ + connectionManager: {}, transports: [tcp()], streamMuxers: [mplex()], services: { pubsub: (sub) => new DirectSub(sub, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }), @@ -77,7 +79,7 @@ const peers = await Promise.all( pubsub: (sub) => new DirectSub(sub, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }) diff --git a/packages/programs/data/shared-log/src/__tests__/leader.test.ts b/packages/programs/data/shared-log/src/__tests__/leader.test.ts index fcdf050c6..005fcdc54 100644 --- a/packages/programs/data/shared-log/src/__tests__/leader.test.ts +++ b/packages/programs/data/shared-log/src/__tests__/leader.test.ts @@ -26,7 +26,7 @@ describe(`leaders`, function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } } diff --git a/packages/transport/libp2p-test-utils/src/session.ts b/packages/transport/libp2p-test-utils/src/session.ts index a395bbcb4..579d7826a 100644 --- a/packages/transport/libp2p-test-utils/src/session.ts +++ b/packages/transport/libp2p-test-utils/src/session.ts @@ -83,6 +83,7 @@ export class TestSession { connectionManager: (options?.[i] || options)?.connectionManager ?? { minConnections: 0 }, + datastore: (options?.[i] || options)?.datastore, transports: (options?.[i] || options)?.transports ?? diff --git a/packages/transport/pubsub/src/__benchmark__/index.ts b/packages/transport/pubsub/src/__benchmark__/index.ts index 86d629c91..4f8ff086e 100644 --- a/packages/transport/pubsub/src/__benchmark__/index.ts +++ b/packages/transport/pubsub/src/__benchmark__/index.ts @@ -17,9 +17,7 @@ const session = await TestSession.disconnected(4, { new DirectSub(c, { canRelayMessage: true, emitSelf: true, - connectionManager: { - autoDial: false - } + connectionManager: false }) } }); diff --git a/packages/transport/pubsub/src/__tests__/index.test.ts b/packages/transport/pubsub/src/__tests__/index.test.ts index 94edaacb9..77ef1e832 100644 --- a/packages/transport/pubsub/src/__tests__/index.test.ts +++ b/packages/transport/pubsub/src/__tests__/index.test.ts @@ -160,7 +160,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -217,7 +217,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -369,7 +369,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -523,7 +523,7 @@ describe("pubsub", function () { new DirectSub(c, { emitSelf: true, canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -579,7 +579,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -680,7 +680,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -752,17 +752,14 @@ describe("pubsub", function () { let session: TestSession<{ pubsub: DirectSub }>; let streams: ReturnType[]; - const data = new Uint8Array([1, 2, 3]); const TOPIC = "topic"; - const PING_INTERVAL = 1000; beforeEach(async () => { session = await TestSession.disconnected(4, { services: { pubsub: (c) => new DirectSub(c, { - pingInterval: PING_INTERVAL, canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -894,7 +891,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -983,7 +980,7 @@ describe("pubsub", function () { pubsub: (c) => new DirectSub(c, { canRelayMessage: true, - connectionManager: { autoDial: false } + connectionManager: false }) } }); diff --git a/packages/transport/stream/src/__benchmark__/transfer.ts b/packages/transport/stream/src/__benchmark__/transfer.ts index 4b19f4e10..59474150f 100644 --- a/packages/transport/stream/src/__benchmark__/transfer.ts +++ b/packages/transport/stream/src/__benchmark__/transfer.ts @@ -20,9 +20,7 @@ class TestStreamImpl extends DirectStream { super(c, ["bench/0.0.0"], { canRelayMessage: true, emitSelf: true, - connectionManager: { - autoDial: false - } + connectionManager: false }); } } diff --git a/packages/transport/stream/src/__tests__/stream.test.ts b/packages/transport/stream/src/__tests__/stream.test.ts index 4c33ca372..8cc5774de 100644 --- a/packages/transport/stream/src/__tests__/stream.test.ts +++ b/packages/transport/stream/src/__tests__/stream.test.ts @@ -3,9 +3,10 @@ import crypto from "crypto"; import { waitForPeers as waitForPeerStreams, DirectStream, - ConnectionManagerOptions, - DirectStreamComponents + DirectStreamComponents, + ConnectionManagerArguments } from ".."; +import { Cache } from "@peerbit/cache"; import { ACK, AcknowledgeDelivery, @@ -113,15 +114,13 @@ class TestDirectStream extends DirectStream { components: DirectStreamComponents, options: { id?: string; - connectionManager?: ConnectionManagerOptions; + connectionManager?: ConnectionManagerArguments; } = {} ) { - super(components, [options.id || "test/0.0.0"], { + super(components, [options.id || "/test/0.0.0"], { canRelayMessage: true, emitSelf: true, - connectionManager: options.connectionManager || { - autoDial: false - }, + connectionManager: options.connectionManager, ...options }); } @@ -184,7 +183,7 @@ describe("streams", function () { services: { directstream: (c) => new TestDirectStream(c, { - connectionManager: { autoDial: false } + connectionManager: false }) } }); @@ -464,437 +463,433 @@ describe("streams", function () { expect(streams[1].stream.routes.count()).toEqual(2); }); }); + }); - describe("fanout", () => { - describe("basic", () => { - let session: TestSessionStream; - let streams: ReturnType[]; + describe("fanout", () => { + describe("basic", () => { + let session: TestSessionStream; + let streams: ReturnType[]; - beforeAll(async () => {}); + beforeAll(async () => {}); - beforeEach(async () => { - session = await connected(3, { - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { autoDial: false } - }) - } - }); - streams = []; - for (const peer of session.peers) { - streams.push(createMetrics(peer.services.directstream)); + beforeEach(async () => { + session = await connected(3, { + services: { + directstream: (c) => + new TestDirectStream(c, { connectionManager: false }) } - - await waitForPeerStreams(streams[0].stream, streams[1].stream); }); + streams = []; + for (const peer of session.peers) { + streams.push(createMetrics(peer.services.directstream)); + } + + await waitForPeerStreams(streams[0].stream, streams[1].stream); + }); + + afterEach(async () => { + await session.stop(); + }); - afterEach(async () => { - await session.stop(); + it("will not publish to 'from' when explicitly providing to", async () => { + const msg = new DataMessage({ + data: new Uint8Array([0]), + deliveryMode: new SeekDelivery(1) }); + streams[2].stream.canRelayMessage = false; // so that 2 does not relay to 0 + + await streams[1].stream.publishMessage( + session.peers[0].services.directstream.publicKey, + await msg.sign(streams[1].stream.sign), + [ + //streams[1].stream.peers.get(streams[0].stream.publicKeyHash)!, + streams[1].stream.peers.get(streams[2].stream.publicKeyHash)! + ] + ); + const msgId = await getMsgId(msg.bytes()); + await waitForResolved(() => + expect(streams[2].processed.get(msgId)).toEqual(1) + ); - it("will not publish to 'from' when explicitly providing to", async () => { - const msg = new DataMessage({ - data: new Uint8Array([0]), - deliveryMode: new SeekDelivery(1) - }); - streams[2].stream.canRelayMessage = false; // so that 2 does not relay to 0 + await delay(1000); // wait for more messages eventually propagate + expect(streams[0].processed.get(msgId)).toBeUndefined(); + expect(streams[1].processed.get(msgId)).toBeUndefined(); + }); - await streams[1].stream.publishMessage( - session.peers[0].services.directstream.publicKey, - await msg.sign(streams[1].stream.sign), - [ - //streams[1].stream.peers.get(streams[0].stream.publicKeyHash)!, - streams[1].stream.peers.get(streams[2].stream.publicKeyHash)! + /** + * If tests below fails, dead-locks can apphear in unpredictable ways + */ + it("to in message will not send back", async () => { + const msg = new DataMessage({ + data: new Uint8Array([0]), + header: new MessageHeader({ + to: [ + streams[0].stream.publicKeyHash, + streams[2].stream.publicKeyHash ] - ); - const msgId = await getMsgId(msg.bytes()); - await waitForResolved(() => - expect(streams[2].processed.get(msgId)).toEqual(1) - ); - - await delay(1000); // wait for more messages eventually propagate - expect(streams[0].processed.get(msgId)).toBeUndefined(); - expect(streams[1].processed.get(msgId)).toBeUndefined(); + }), + deliveryMode: new SeekDelivery(1) }); + streams[2].stream.canRelayMessage = false; // so that 2 does not relay to 0 + + await msg.sign(streams[1].stream.sign); + await streams[1].stream.publishMessage( + session.peers[0].services.directstream.publicKey, + msg, + undefined, + true + ); + await delay(1000); + const msgId = await getMsgId(msg.bytes()); + expect(streams[0].processed.get(msgId)).toBeUndefined(); + expect(streams[1].processed.get(msgId)).toBeUndefined(); + expect(streams[2].processed.get(msgId)).toEqual(1); + }); - /** - * If tests below fails, dead-locks can apphear in unpredictable ways - */ - it("to in message will not send back", async () => { - const msg = new DataMessage({ - data: new Uint8Array([0]), - header: new MessageHeader({ - to: [ - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ] - }), - deliveryMode: new SeekDelivery(1) - }); - streams[2].stream.canRelayMessage = false; // so that 2 does not relay to 0 - - await msg.sign(streams[1].stream.sign); - await streams[1].stream.publishMessage( + it("rejects when to peers is from", async () => { + const msg = new DataMessage({ + data: new Uint8Array([0]), + deliveryMode: new SilentDelivery(1) + }); + await msg.sign(streams[1].stream.sign); + await expect( + streams[1].stream.publishMessage( session.peers[0].services.directstream.publicKey, msg, - undefined, - true - ); - await delay(1000); - const msgId = await getMsgId(msg.bytes()); - expect(streams[0].processed.get(msgId)).toBeUndefined(); - expect(streams[1].processed.get(msgId)).toBeUndefined(); - expect(streams[2].processed.get(msgId)).toEqual(1); - }); + [streams[1].stream.peers.get(streams[0].stream.publicKeyHash)!] + ) + ).rejects.toThrowError("Message did not have any valid receivers"); + }); - it("rejects when to peers is from", async () => { - const msg = new DataMessage({ - data: new Uint8Array([0]), - deliveryMode: new SilentDelivery(1) - }); - await msg.sign(streams[1].stream.sign); - await expect( - streams[1].stream.publishMessage( - session.peers[0].services.directstream.publicKey, - msg, - [streams[1].stream.peers.get(streams[0].stream.publicKeyHash)!] - ) - ).rejects.toThrowError("Message did not have any valid receivers"); + it("rejects when only to is from", async () => { + const msg = new DataMessage({ + data: new Uint8Array([0]), + header: new MessageHeader({ + to: [streams[0].stream.publicKeyHash] + }), + deliveryMode: new SilentDelivery(1) }); + await msg.sign(streams[1].stream.sign); + await streams[1].stream.publishMessage( + session.peers[0].services.directstream.publicKey, + msg + ); + const msgId = await getMsgId(msg.bytes()); + await delay(1000); + expect(streams[0].processed.get(msgId)).toBeUndefined(); + expect(streams[1].processed.get(msgId)).toBeUndefined(); + expect(streams[2].processed.get(msgId)).toBeUndefined(); + }); - it("rejects when only to is from", async () => { - const msg = new DataMessage({ - data: new Uint8Array([0]), - header: new MessageHeader({ - to: [streams[0].stream.publicKeyHash] - }), - deliveryMode: new SilentDelivery(1) - }); - await msg.sign(streams[1].stream.sign); - await streams[1].stream.publishMessage( - session.peers[0].services.directstream.publicKey, - msg - ); - const msgId = await getMsgId(msg.bytes()); - await delay(1000); - expect(streams[0].processed.get(msgId)).toBeUndefined(); - expect(streams[1].processed.get(msgId)).toBeUndefined(); - expect(streams[2].processed.get(msgId)).toBeUndefined(); + it("will send through peer", async () => { + await session.peers[0].hangUp(session.peers[1].peerId); + + // send a message with to=[2] + // make sure message is received + const msg = new DataMessage({ + data: new Uint8Array([0]), + header: new MessageHeader({ + to: [streams[2].stream.publicKeyHash] + }), + deliveryMode: new SeekDelivery(1) }); + await msg.sign(streams[1].stream.sign); + await streams[0].stream.publishMessage( + session.peers[0].services.directstream.publicKey, + msg + ); + await waitForResolved(() => + expect(streams[2].received).toHaveLength(1) + ); + }); + }); - it("will send through peer", async () => { - await session.peers[0].hangUp(session.peers[1].peerId); - - // send a message with to=[2] - // make sure message is received - const msg = new DataMessage({ - data: new Uint8Array([0]), - header: new MessageHeader({ - to: [streams[2].stream.publicKeyHash] - }), - deliveryMode: new SeekDelivery(1) - }); - await msg.sign(streams[1].stream.sign); - await streams[0].stream.publishMessage( - session.peers[0].services.directstream.publicKey, - msg - ); + describe("1->2", () => { + let session: TestSessionStream; + let streams: ReturnType[]; + const data = new Uint8Array([1, 2, 3]); + + beforeAll(async () => {}); + + beforeEach(async () => { + session = await connected(3, { + services: { + directstream: (c) => + new TestDirectStream(c, { connectionManager: false }) + } + }); + streams = []; + for (const peer of session.peers) { await waitForResolved(() => - expect(streams[2].received).toHaveLength(1) + expect(peer.services.directstream.peers.size).toEqual( + session.peers.length - 1 + ) ); - }); + streams.push(createMetrics(peer.services.directstream)); + } }); - describe("1->2", () => { - let session: TestSessionStream; - let streams: ReturnType[]; - const data = new Uint8Array([1, 2, 3]); + afterEach(async () => { + await session.stop(); + }); - beforeAll(async () => {}); + it("messages are only sent once to each peer", async () => { + let totalWrites = 1; + expect(streams[0].ack).toHaveLength(0); - beforeEach(async () => { - session = await connected(3, { - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { autoDial: false } - }) - } - }); - streams = []; - for (const peer of session.peers) { - await waitForResolved(() => - expect(peer.services.directstream.peers.size).toEqual( - session.peers.length - 1 - ) - ); - streams.push(createMetrics(peer.services.directstream)); - } + // push one message to ensure paths are found + await streams[0].stream.publish(data, { + to: [ + streams[1].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ], + mode: new SeekDelivery(2) }); - afterEach(async () => { - await session.stop(); - }); + // message delivered to 1 from 0 and relayed through 2. (2 ACKS) + // message delivered to 2 from 0 and relayed through 1. (2 ACKS) + // 2 + 2 = 4 + expect( + streams[0].stream.routes.isReachable( + streams[0].stream.publicKeyHash, + streams[1].stream.publicKeyHash + ) + ).toBeTrue(); + expect( + streams[0].stream.routes.isReachable( + streams[0].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ).toBeTrue(); + + await waitForResolved(() => expect(streams[0].ack).toHaveLength(4)); - it("messages are only sent once to each peer", async () => { - let totalWrites = 1; - expect(streams[0].ack).toHaveLength(0); + const allWrites = streams.map((x) => collectDataWrites(x.stream)); + streams[1].received = []; + streams[2].received = []; - // push one message to ensure paths are found + // expect the data to be sent smartly + for (let i = 0; i < totalWrites; i++) { await streams[0].stream.publish(data, { to: [ streams[1].stream.publicKeyHash, streams[2].stream.publicKeyHash - ], - mode: new SeekDelivery(2) + ] }); + } - // message delivered to 1 from 0 and relayed through 2. (2 ACKS) - // message delivered to 2 from 0 and relayed through 1. (2 ACKS) - // 2 + 2 = 4 - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[1].stream.publicKeyHash - ) - ).toBeTrue(); - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ).toBeTrue(); + await waitForResolved(() => + expect(streams[1].received).toHaveLength(totalWrites) + ); + await waitForResolved(() => + expect(streams[2].received).toHaveLength(totalWrites) + ); - await waitForResolved(() => expect(streams[0].ack).toHaveLength(4)); + await delay(2000); - const allWrites = streams.map((x) => collectDataWrites(x.stream)); - streams[1].received = []; - streams[2].received = []; + // Check number of writes for each node + expect(getWritesCount(allWrites[0])).toEqual(totalWrites * 2); // write to "1" or "2" + expect(getWritesCount(allWrites[1])).toEqual(0); // "1" should never has to push any data + expect(getWritesCount(allWrites[2])).toEqual(0); // "2" should never has to push any data + }); + }); - // expect the data to be sent smartly - for (let i = 0; i < totalWrites; i++) { - await streams[0].stream.publish(data, { - to: [ - streams[1].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ] - }); - } + describe("1->2->2", () => { + /** + ┌─────┐ + │0 │ + └┬───┬┘ + ┌▽─┐┌▽─┐ + │2 ││1 │ + └┬┬┘└┬┬┘ + ││ ││ + ││ ││ + ││ └│┐ + └│──┐││ + ┌│──│┘│ + ││ │┌┘ + ┌▽▽┐┌▽▽┐ + │3 ││4 │ // 3 and 4 are connected also + └──┘└──┘ + */ - await waitForResolved(() => - expect(streams[1].received).toHaveLength(totalWrites) - ); - await waitForResolved(() => - expect(streams[2].received).toHaveLength(totalWrites) - ); + let session: TestSessionStream; + let streams: ReturnType[]; + const data = new Uint8Array([1, 2, 3]); - await delay(2000); + beforeAll(async () => {}); - // Check number of writes for each node - expect(getWritesCount(allWrites[0])).toEqual(totalWrites * 2); // write to "1" or "2" - expect(getWritesCount(allWrites[1])).toEqual(0); // "1" should never has to push any data - expect(getWritesCount(allWrites[2])).toEqual(0); // "2" should never has to push any data + beforeEach(async () => { + session = await disconnected(5, { + services: { + directstream: (c) => + new TestDirectStream(c, { connectionManager: false }) + } }); + streams = []; + for (const peer of session.peers) { + streams.push(createMetrics(peer.services.directstream)); + } + await session.connect([ + // behaviour seems to be more predictable if we connect after start (TODO improve startup to use existing connections in a better way) + [session.peers[0], session.peers[1]], + [session.peers[0], session.peers[2]], + + [session.peers[1], session.peers[3]], + [session.peers[1], session.peers[4]], + + [session.peers[2], session.peers[3]], + [session.peers[2], session.peers[4]], + + [session.peers[3], session.peers[4]] + ]); + + await waitForPeerStreams(streams[0].stream, streams[1].stream); + await waitForPeerStreams(streams[0].stream, streams[2].stream); + await waitForPeerStreams(streams[1].stream, streams[3].stream); + await waitForPeerStreams(streams[1].stream, streams[4].stream); + await waitForPeerStreams(streams[2].stream, streams[3].stream); + await waitForPeerStreams(streams[2].stream, streams[4].stream); + await waitForPeerStreams(streams[3].stream, streams[4].stream); }); - describe("1->2->2", () => { - /** - ┌─────┐ - │0 │ - └┬───┬┘ - ┌▽─┐┌▽─┐ - │2 ││1 │ - └┬┬┘└┬┬┘ - ││ ││ - ││ ││ - ││ └│┐ - └│──┐││ - ┌│──│┘│ - ││ │┌┘ - ┌▽▽┐┌▽▽┐ - │3 ││4 │ // 3 and 4 are connected also - └──┘└──┘ - */ - - let session: TestSessionStream; - let streams: ReturnType[]; - const data = new Uint8Array([1, 2, 3]); - - beforeAll(async () => {}); - - beforeEach(async () => { - session = await disconnected(5, { - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { autoDial: false } - }) - } - }); - streams = []; - for (const peer of session.peers) { - streams.push(createMetrics(peer.services.directstream)); - } - await session.connect([ - // behaviour seems to be more predictable if we connect after start (TODO improve startup to use existing connections in a better way) - [session.peers[0], session.peers[1]], - [session.peers[0], session.peers[2]], - - [session.peers[1], session.peers[3]], - [session.peers[1], session.peers[4]], - - [session.peers[2], session.peers[3]], - [session.peers[2], session.peers[4]], - - [session.peers[3], session.peers[4]] - ]); - - await waitForPeerStreams(streams[0].stream, streams[1].stream); - await waitForPeerStreams(streams[0].stream, streams[2].stream); - await waitForPeerStreams(streams[1].stream, streams[3].stream); - await waitForPeerStreams(streams[1].stream, streams[4].stream); - await waitForPeerStreams(streams[2].stream, streams[3].stream); - await waitForPeerStreams(streams[2].stream, streams[4].stream); - await waitForPeerStreams(streams[3].stream, streams[4].stream); - }); + afterEach(async () => { + await session.stop(); + }); - afterEach(async () => { - await session.stop(); + it("messages are only sent once to each peer", async () => { + await streams[0].stream.publish(data, { + to: [ + streams[3].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ], + mode: new SeekDelivery(2) }); - it("messages are only sent once to each peer", async () => { - await streams[0].stream.publish(data, { + expect( + streams[0].stream.routes.isReachable( + streams[0].stream.publicKeyHash, + streams[3].stream.publicKeyHash + ) + ).toBeTrue(); + expect( + streams[0].stream.routes.isReachable( + streams[0].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ) + ).toBeTrue(); + + expect( + streams[0].stream.routes.findNeighbor( + streams[0].stream.publicKeyHash, + streams[3].stream.publicKeyHash + )?.list + ).toHaveLength(2); + + expect( + streams[0].stream.routes.findNeighbor( + streams[0].stream.publicKeyHash, + streams[4].stream.publicKeyHash + )?.list + ).toHaveLength(2); + + const allWrites = streams.map((x) => collectDataWrites(x.stream)); + + let totalWrites = 100; + streams[3].received = []; + streams[4].received = []; + streams[3].processed.clear(); + streams[4].processed.clear(); + + for (let i = 0; i < totalWrites; i++) { + streams[0].stream.publish(data, { to: [ streams[3].stream.publicKeyHash, streams[4].stream.publicKeyHash - ], - mode: new SeekDelivery(2) + ] }); + } - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[3].stream.publicKeyHash - ) - ).toBeTrue(); - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ) - ).toBeTrue(); - - expect( - streams[0].stream.routes.findNeighbor( - streams[0].stream.publicKeyHash, - streams[3].stream.publicKeyHash - )?.list - ).toHaveLength(2); - - expect( - streams[0].stream.routes.findNeighbor( - streams[0].stream.publicKeyHash, - streams[4].stream.publicKeyHash - )?.list - ).toHaveLength(2); - - const allWrites = streams.map((x) => collectDataWrites(x.stream)); - - let totalWrites = 100; - streams[3].received = []; - streams[4].received = []; - streams[3].processed.clear(); - streams[4].processed.clear(); - - for (let i = 0; i < totalWrites; i++) { - streams[0].stream.publish(data, { - to: [ - streams[3].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ] - }); - } + await waitForResolved(() => + expect(streams[3].received).toHaveLength(totalWrites) + ); + await waitForResolved(() => + expect(streams[4].received).toHaveLength(totalWrites) + ); - await waitForResolved(() => - expect(streams[3].received).toHaveLength(totalWrites) - ); - await waitForResolved(() => - expect(streams[4].received).toHaveLength(totalWrites) - ); + const id1 = await getMsgId(serialize(streams[3].received[0])); - const id1 = await getMsgId(serialize(streams[3].received[0])); + await delay(3000); // Wait some exstra time if additional messages are propagating through - await delay(3000); // Wait some exstra time if additional messages are propagating through + expect(streams[3].processed.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node + expect(streams[4].processed.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node - expect(streams[3].processed.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node - expect(streams[4].processed.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node + // Check number of writes for each node + expect(getWritesCount(allWrites[0])).toEqual(totalWrites); // write to "1" or "2" + expect( + getWritesCount(allWrites[1]) + getWritesCount(allWrites[2]) + ).toEqual(totalWrites * 2); // write to "3" and "4" + expect(getWritesCount(allWrites[3])).toEqual(0); // "3" should never has to push any data + expect(getWritesCount(allWrites[4])).toEqual(0); // "4" should never has to push any data + }); - // Check number of writes for each node - expect(getWritesCount(allWrites[0])).toEqual(totalWrites); // write to "1" or "2" - expect( - getWritesCount(allWrites[1]) + getWritesCount(allWrites[2]) - ).toEqual(totalWrites * 2); // write to "3" and "4" - expect(getWritesCount(allWrites[3])).toEqual(0); // "3" should never has to push any data - expect(getWritesCount(allWrites[4])).toEqual(0); // "4" should never has to push any data + it("can send with higher redundancy", async () => { + await streams[0].stream.publish(data, { + to: [ + streams[3].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ], + mode: new SeekDelivery(2) }); - it("can send with higher redundancy", async () => { - await streams[0].stream.publish(data, { - to: [ - streams[3].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ], - mode: new SeekDelivery(2) - }); + const neighbourTo3 = streams[0].stream.routes.findNeighbor( + streams[0].stream.publicKeyHash, + streams[3].stream.publicKeyHash + )!.list[0]; - const neighbourTo3 = streams[0].stream.routes.findNeighbor( + expect( + streams[0].stream.routes.isReachable( streams[0].stream.publicKeyHash, streams[3].stream.publicKeyHash - )!.list[0]; - - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[3].stream.publicKeyHash - ) - ).toBeTrue(); - expect( - streams[0].stream.routes.isReachable( - streams[0].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ) - ).toBeTrue(); + ) + ).toBeTrue(); + expect( + streams[0].stream.routes.isReachable( + streams[0].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ) + ).toBeTrue(); - streams.find( - (x) => x.stream.publicKeyHash === neighbourTo3.hash - )!.stream.processMessage = async (a, b, c) => { - // dont do anything - }; + streams.find( + (x) => x.stream.publicKeyHash === neighbourTo3.hash + )!.stream.processMessage = async (a, b, c) => { + // dont do anything + }; - await streams[0].stream.publish(data, { - to: [ - streams[3].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ], - mode: new AcknowledgeDelivery(2) // send at least 2 routes - }); + await streams[0].stream.publish(data, { + to: [ + streams[3].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ], + mode: new AcknowledgeDelivery(2) // send at least 2 routes }); }); }); - describe("concurrency", () => { + describe("bandwidth", () => { let session: TestSessionStream; let streams: ReturnType[]; - beforeAll(async () => {}); - beforeEach(async () => { session = await connected(3, { services: { directstream: (c) => new TestDirectStream(c, { - connectionManager: { autoDial: false } + connectionManager: { + dialer: false, + minConnections: 1, + pruner: { interval: 1000 } + } }) } }); @@ -903,12 +898,8 @@ describe("streams", function () { streams.push(createMetrics(peer.services.directstream)); } - await session.connect([ - [session.peers[0], session.peers[1]], - [session.peers[1], session.peers[2]] - ]); - await waitForPeerStreams(streams[0].stream, streams[1].stream); + await waitForPeerStreams(streams[0].stream, streams[2].stream); await waitForPeerStreams(streams[1].stream, streams[2].stream); }); @@ -916,626 +907,783 @@ describe("streams", function () { await session.stop(); }); - it("can concurrently seek and wait for ack", async () => { - await streams[0].stream.publish(crypto.randomBytes(1e2), { - to: [streams[2].stream.components.peerId] - }); - const p = streams[0].stream.publish(crypto.randomBytes(1e2), { - to: [streams[2].stream.components.peerId], - mode: new AcknowledgeDelivery(1) + it("bandwidth limits pruning", async () => { + expect(streams[0].stream.peers.size).toEqual(2); + streams[0].stream.connectionManagerOptions.pruner!.bandwidth = 1; + await streams[0].stream.publish(new Uint8Array(100), { + to: [streams[1].stream.publicKey, streams[2].stream.publicKey] }); - streams[0].stream.publish(crypto.randomBytes(1e2), { - to: [streams[2].stream.components.peerId], - mode: new SeekDelivery(1) + await waitForResolved(() => + expect(streams[0].stream.peers.size).toEqual(1) + ); + await delay(2000); + expect(streams[0].stream.peers.size).toEqual(1); + await streams[0].stream.publish(new Uint8Array(101), { + to: [streams[1].stream.publicKey, streams[2].stream.publicKey] }); - streams[0].stream.publish(crypto.randomBytes(1e2), { - to: [streams[2].stream.components.peerId], - mode: new SeekDelivery(1) + // messages can still deliver + expect(streams[1].received).toHaveLength(2); + expect(streams[2].received).toHaveLength(2); + }); + + it("max queued buffer pruning", async () => { + streams[0].stream.connectionManagerOptions.pruner!.maxBuffer = 1; + await streams[0].stream.maybePruneConnections(); + expect(streams[0].stream.peers.size).toEqual(2); + + [...streams[0].stream.peers.values()].forEach((x) => { + x.outboundStream! = { ...x.outboundStream, readableLength: 2 } as any; }); - streams[2].stream.publish(crypto.randomBytes(1e2), { - to: [streams[0].stream.components.peerId], + await streams[0].stream.maybePruneConnections(); + await waitForResolved(() => + expect(streams[0].stream.peers.size).toEqual(1) + ); + }); + + // maybe connect directly if pruned + + it("rejects incomming connections that are pruned", async () => { + streams[0].stream.connectionManagerOptions.pruner!.bandwidth = 1; + await streams[0].stream.publish(new Uint8Array(100), { + to: [streams[2].stream.publicKey] + }); + await waitForResolved(() => + expect(streams[0].stream.peers.size).toEqual(1) + ); + await waitForResolved(() => + expect(streams[1].stream.peers.size).toEqual(1) + ); + + expect(streams[0].stream["prunedConnectionsCache"]!.size).toEqual(1); + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeUndefined(); // beacuse stream[1] has received less data from stream[0] (least important) + + await session.peers[1].dial(session.peers[0].getMultiaddrs()); + + await delay(3000); + + // expect a connection to not be established + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeUndefined(); // beacuse stream[1] has received less data from stream[0] (least important) + streams[0].stream["prunedConnectionsCache"]?.clear(); + await session.peers[1].dial(session.peers[0].getMultiaddrs()); + await waitForResolved(() => + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeDefined() + ); + }); + + // if incomed incommection and pruned, timeout (?) + it("will not dial that are pruned", async () => { + // enable the autodialer (TODO do this on the setup step instead) + streams[0].stream.connectionManagerOptions.dialer = { retryDelay: 1e4 }; + streams[0].stream["recentDials"] = new Cache({ + ttl: 1e4, + max: 1e3 + }); + + streams[0].stream.connectionManagerOptions.pruner!.bandwidth = 1; + await streams[0].stream.publish(new Uint8Array(100), { + to: [streams[2].stream.publicKey] + }); + await waitForResolved(() => + expect(streams[0].stream.peers.size).toEqual(1) + ); + await waitForResolved(() => + expect(streams[1].stream.peers.size).toEqual(1) + ); + + expect(streams[0].stream["prunedConnectionsCache"]!.size).toEqual(1); + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeUndefined(); + + streams[0].stream.connectionManagerOptions.pruner!.bandwidth = + Number.MAX_SAFE_INTEGER; + + await streams[0].stream.publish(new Uint8Array(100), { + to: [streams[1].stream.publicKey], mode: new SeekDelivery(1) }); + await delay(2000); + + // will not dial + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeUndefined(); + + // clear the map that filter the dial + streams[0].stream["prunedConnectionsCache"]?.clear(); + await streams[0].stream.publish(new Uint8Array(100), { + to: [streams[1].stream.publicKey], + mode: new SeekDelivery(2) + }); - await p; + await waitForResolved(() => + expect( + streams[0].stream.peers.get(streams[1].stream.publicKey.hashcode()) + ).toBeDefined() + ); }); }); }); - // TODO test that messages are not sent backward, triangles etc - - describe("join/leave", () => { + describe("concurrency", () => { let session: TestSessionStream; let streams: ReturnType[]; - const data = new Uint8Array([1, 2, 3]); - let autoDialRetryDelay = 5 * 1000; - describe("direct connections", () => { - beforeEach(async () => { - session = await disconnected( - 4, - new Array(4).fill(0).map((_x, i) => { - return { - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { - autoDial: i === 0, // allow client 0 to auto dial - retryDelay: autoDialRetryDelay - } - }) - } - }; - }) - ); // Second arg is due to https://github.com/transport/js-libp2p/issues/1690 - streams = []; + beforeAll(async () => {}); - for (const [i, peer] of session.peers.entries()) { - if (i === 0) { - expect( - peer.services.directstream["connectionManagerOptions"].autoDial - ).toBeTrue(); - } else { - expect( - peer.services.directstream["connectionManagerOptions"].autoDial - ).toBeFalse(); - } - - streams.push(createMetrics(peer.services.directstream)); + beforeEach(async () => { + session = await connected(3, { + services: { + directstream: (c) => + new TestDirectStream(c, { connectionManager: false }) } + }); + streams = []; + for (const peer of session.peers) { + streams.push(createMetrics(peer.services.directstream)); + } - // slowly connect to that the route maps are deterministic - await session.connect([[session.peers[0], session.peers[1]]]); - await session.connect([[session.peers[1], session.peers[2]]]); - await session.connect([[session.peers[2], session.peers[3]]]); - await waitForPeerStreams(streams[0].stream, streams[1].stream); - await waitForPeerStreams(streams[1].stream, streams[2].stream); - await waitForPeerStreams(streams[2].stream, streams[3].stream); + await session.connect([ + [session.peers[0], session.peers[1]], + [session.peers[1], session.peers[2]] + ]); + + await waitForPeerStreams(streams[0].stream, streams[1].stream); + await waitForPeerStreams(streams[1].stream, streams[2].stream); + }); + + afterEach(async () => { + await session.stop(); + }); + + it("can concurrently seek and wait for ack", async () => { + await streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[2].stream.components.peerId] + }); + const p = streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[2].stream.components.peerId], + mode: new AcknowledgeDelivery(1) + }); + streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[2].stream.components.peerId], + mode: new SeekDelivery(1) }); - afterEach(async () => { - await session.stop(); + streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[2].stream.components.peerId], + mode: new SeekDelivery(1) + }); + streams[2].stream.publish(crypto.randomBytes(1e2), { + to: [streams[0].stream.components.peerId], + mode: new SeekDelivery(1) }); - it("directly if possible", async () => { - let dials = 0; - const dialFn = - streams[0].stream.components.connectionManager.openConnection.bind( - streams[0].stream.components.connectionManager - ); - streams[0].stream.components.connectionManager.openConnection = ( - a, - b - ) => { - dials += 1; - return dialFn(a, b); - }; + await p; + }); + }); +}); - streams[3].received = []; - expect(streams[0].stream.peers.size).toEqual(1); +// TODO test that messages are not sent backward, triangles etc - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId], - mode: new SeekDelivery(1) - }); +describe("join/leave", () => { + let session: TestSessionStream; + let streams: ReturnType[]; + const data = new Uint8Array([1, 2, 3]); + let autoDialRetryDelay = 5 * 1000; - await waitFor(() => streams[0].ack.length === 1); + describe("direct connections", () => { + beforeEach(async () => { + session = await disconnected( + 4, + new Array(4).fill(0).map((_x, i) => { + return { + services: { + directstream: (c) => + new TestDirectStream(c, { + connectionManager: { + pruner: undefined, + dialer: i === 0 ? { retryDelay: autoDialRetryDelay } : false // allow client 0 to auto dial + } + }) + } + }; + }) + ); // Second arg is due to https://github.com/transport/js-libp2p/issues/1690 + streams = []; - // Dialing will yield a new connection - await waitForResolved(() => - expect(streams[0].stream.peers.size).toEqual(2) - ); + for (const [i, peer] of session.peers.entries()) { + if (i === 0) { + expect( + !!peer.services.directstream.connectionManagerOptions.dialer + ).toBeTrue(); + } else { + expect( + !!peer.services.directstream.connectionManagerOptions.dialer + ).toBeFalse(); + } - expect(dials).toEqual(1); + streams.push(createMetrics(peer.services.directstream)); + } - // Republishing will not result in an additional dial - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId] - }); - await waitFor(() => streams[3].received.length === 2); - expect(dials).toEqual(1); - expect(streams[0].stream.peers.size).toEqual(2); - expect( - streams[0].stream.peers.has(streams[3].stream.publicKeyHash) - ).toBeTrue(); - expect( - streams[0].stream.peers.has(streams[1].stream.publicKeyHash) - ).toBeTrue(); - }); + // slowly connect to that the route maps are deterministic + await session.connect([[session.peers[0], session.peers[1]]]); + await session.connect([[session.peers[1], session.peers[2]]]); + await session.connect([[session.peers[2], session.peers[3]]]); + await waitForPeerStreams(streams[0].stream, streams[1].stream); + await waitForPeerStreams(streams[1].stream, streams[2].stream); + await waitForPeerStreams(streams[2].stream, streams[3].stream); + }); - it("can leave and join quickly", async () => { - await streams[0].stream.publish(data, { - to: [streams[3].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); - await waitForResolved( - () => expect(streams[0].stream.routes.count()).toEqual(2) // neighbour + streams[3] - ); + afterEach(async () => { + await session.stop(); + }); - // miss on messages + it("directly if possible", async () => { + let dials = 0; + const dialFn = + streams[0].stream.components.connectionManager.openConnection.bind( + streams[0].stream.components.connectionManager + ); + streams[0].stream.components.connectionManager.openConnection = ( + a, + b + ) => { + dials += 1; + return dialFn(a, b); + }; + + streams[3].received = []; + expect(streams[0].stream.peers.size).toEqual(1); + + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId], + mode: new SeekDelivery(1) + }); - let missedOne = false; + await waitFor(() => streams[0].ack.length === 1); - let msg: any[] = []; - let unreachable: PublicSignKey[] = []; + // Dialing will yield a new connection + await waitForResolved(() => + expect(streams[0].stream.peers.size).toEqual(2) + ); - streams[0].stream.addEventListener("peer:unreachable", (e) => { - unreachable.push(e.detail); - }); + expect(dials).toEqual(1); - // simulate beeing offline for 1 messages - const onDataMessage = streams[3].stream.onDataMessage.bind( - streams[3].stream - ); - streams[3].stream.onDataMessage = async ( - publicKey, - peerStream, - message, - seenBefore - ) => { - msg.push(message); - if (!missedOne) { - missedOne = true; - return true; - } - return onDataMessage(publicKey, peerStream, message, seenBefore); - }; + // Republishing will not result in an additional dial + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId] + }); + await waitFor(() => streams[3].received.length === 2); + expect(dials).toEqual(1); + expect(streams[0].stream.peers.size).toEqual(2); + expect( + streams[0].stream.peers.has(streams[3].stream.publicKeyHash) + ).toBeTrue(); + expect( + streams[0].stream.peers.has(streams[1].stream.publicKeyHash) + ).toBeTrue(); + }); - const publishToMissing = streams[0].stream.publish(data, { - to: [streams[3].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); - await delay(1000); - await streams[0].stream.publish(data, { - // Since this the next message - to: [streams[3].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); - await expect(publishToMissing).rejects.toThrow(TimeoutError); - expect(missedOne).toBeTrue(); - expect(unreachable).toHaveLength(0); // because the next message reached the node before the first message timed out + it("can leave and join quickly", async () => { + await streams[0].stream.publish(data, { + to: [streams[3].stream.publicKeyHash], + mode: new SeekDelivery(2) }); + await waitForResolved( + () => expect(streams[0].stream.routes.count()).toEqual(2) // neighbour + streams[3] + ); - it("retry dial after a while", async () => { - let dials: (PeerId | Multiaddr | Multiaddr[])[] = []; - streams[0].stream.components.connectionManager.openConnection = ( - a, - b - ) => { - dials.push(a); - throw new Error("Mock Error"); - }; + // miss on messages - streams[3].received = []; - expect(streams[0].stream.peers.size).toEqual(1); + let missedOne = false; - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId], - mode: new SeekDelivery(1) - }); + let msg: any[] = []; + let unreachable: PublicSignKey[] = []; - await waitForResolved(() => expect(streams[0].ack).toHaveLength(1)); + streams[0].stream.addEventListener("peer:unreachable", (e) => { + unreachable.push(e.detail); + }); - // Dialing will yield a new connection - await waitFor(() => streams[0].stream.peers.size === 1); - let expectedDialsCount = 1; // 1 dial directly - expect(dials).toHaveLength(expectedDialsCount); + // simulate beeing offline for 1 messages + const onDataMessage = streams[3].stream.onDataMessage.bind( + streams[3].stream + ); + streams[3].stream.onDataMessage = async ( + publicKey, + peerStream, + message, + seenBefore + ) => { + msg.push(message); + if (!missedOne) { + missedOne = true; + return true; + } + return onDataMessage(publicKey, peerStream, message, seenBefore); + }; - // Republishing will not result in an additional dial - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId], - mode: new SeekDelivery(1) - }); + const publishToMissing = streams[0].stream.publish(data, { + to: [streams[3].stream.publicKeyHash], + mode: new SeekDelivery(2) + }); + await delay(1000); + await streams[0].stream.publish(data, { + // Since this the next message + to: [streams[3].stream.publicKeyHash], + mode: new SeekDelivery(2) + }); + await expect(publishToMissing).rejects.toThrow(TimeoutError); + expect(missedOne).toBeTrue(); + expect(unreachable).toHaveLength(0); // because the next message reached the node before the first message timed out + }); - await waitForResolved(() => expect(streams[0].ack).toHaveLength(2)); + it("retry dial after a while", async () => { + let dials: (PeerId | Multiaddr | Multiaddr[])[] = []; + streams[0].stream.components.connectionManager.openConnection = ( + a, + b + ) => { + dials.push(a); + throw new Error("Mock Error"); + }; + + streams[3].received = []; + expect(streams[0].stream.peers.size).toEqual(1); + + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId], + mode: new SeekDelivery(1) + }); - let t1 = +new Date(); - expect(dials).toHaveLength(expectedDialsCount); // No change, because TTL > autoDialRetryTimeout - await waitFor(() => +new Date() - t1 > autoDialRetryDelay); + await waitForResolved(() => expect(streams[0].ack).toHaveLength(1)); - // Try again, now expect another dial call, since the retry interval has been reached - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId], - mode: new SeekDelivery(1) - }); - await waitForResolved(() => expect(streams[0].ack).toHaveLength(3)); + // Dialing will yield a new connection + await waitFor(() => streams[0].stream.peers.size === 1); + let expectedDialsCount = 1; // 1 dial directly + expect(dials).toHaveLength(expectedDialsCount); - expect(dials).toHaveLength(2); + // Republishing will not result in an additional dial + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId], + mode: new SeekDelivery(1) }); - /* TODO test that autodialler tries multiple addresses - - it("through relay if fails", async () => { - const dialFn = - streams[0].stream.components.connectionManager.openConnection.bind( - streams[0].stream.components.connectionManager - ); + await waitForResolved(() => expect(streams[0].ack).toHaveLength(2)); + + let t1 = +new Date(); + expect(dials).toHaveLength(expectedDialsCount); // No change, because TTL > autoDialRetryTimeout + await waitFor(() => +new Date() - t1 > autoDialRetryDelay); + + // Try again, now expect another dial call, since the retry interval has been reached + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId], + mode: new SeekDelivery(1) + }); + await waitForResolved(() => expect(streams[0].ack).toHaveLength(3)); + + expect(dials).toHaveLength(2); + }); - let directlyDialded = false; - const filteredDial = (address: PeerId | Multiaddr | Multiaddr[]) => { + /* TODO test that autodialler tries multiple addresses + + it("through relay if fails", async () => { + const dialFn = + streams[0].stream.components.connectionManager.openConnection.bind( + streams[0].stream.components.connectionManager + ); + + let directlyDialded = false; + const filteredDial = (address: PeerId | Multiaddr | Multiaddr[]) => { + if ( + isPeerId(address) && + address.toString() === streams[3].stream.peerIdStr + ) { + throw new Error("Mock fail"); // don't allow connect directly + } + + let addresses: Multiaddr[] = Array.isArray(address) + ? address + : [address as Multiaddr]; + for (const a of addresses) { if ( - isPeerId(address) && - address.toString() === streams[3].stream.peerIdStr + !a.protoNames().includes("p2p-circuit") && + a.toString().includes(streams[3].stream.peerIdStr) ) { throw new Error("Mock fail"); // don't allow connect directly } + } + addresses = addresses.map((x) => + x.protoNames().includes("p2p-circuit") + ? multiaddr(x.toString().replace("/webrtc/", "/")) + : x + ); // TODO use webrtc in node + + directlyDialded = true; + return dialFn(addresses); + }; + + streams[0].stream.components.connectionManager.openConnection = + filteredDial; + expect(streams[0].stream.peers.size).toEqual(1); + await streams[0].stream.publish(data, { + to: [streams[3].stream.components.peerId], + mode: new SeekDelivery(1) + }); + await waitFor(() => streams[3].received.length === 1); + await waitForResolved(() => expect(directlyDialded).toBeTrue()); + }); */ + }); - let addresses: Multiaddr[] = Array.isArray(address) - ? address - : [address as Multiaddr]; - for (const a of addresses) { - if ( - !a.protoNames().includes("p2p-circuit") && - a.toString().includes(streams[3].stream.peerIdStr) - ) { - throw new Error("Mock fail"); // don't allow connect directly - } - } - addresses = addresses.map((x) => - x.protoNames().includes("p2p-circuit") - ? multiaddr(x.toString().replace("/webrtc/", "/")) - : x - ); // TODO use webrtc in node - - directlyDialded = true; - return dialFn(addresses); - }; - - streams[0].stream.components.connectionManager.openConnection = - filteredDial; - expect(streams[0].stream.peers.size).toEqual(1); - await streams[0].stream.publish(data, { - to: [streams[3].stream.components.peerId], - mode: new SeekDelivery(1) - }); - await waitFor(() => streams[3].received.length === 1); - await waitForResolved(() => expect(directlyDialded).toBeTrue()); - }); */ - }); + describe("4", () => { + beforeEach(async () => { + session = await disconnected(4, { + services: { + directstream: (c) => + new TestDirectStream(c, { connectionManager: false }) + } + }); - describe("4", () => { - beforeEach(async () => { - session = await disconnected(4, { - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { autoDial: false } - }) - } - }); + /* + ┌─┐ + │3│ + └┬┘ + ┌▽┐ + │0│ + └┬┘ + ┌▽┐ + │1│ + └┬┘ + ┌▽┐ + │2│ + └─┘ + + */ - /* - ┌─┐ - │3│ - └┬┘ - ┌▽┐ - │0│ - └┬┘ - ┌▽┐ - │1│ - └┬┘ - ┌▽┐ - │2│ - └─┘ - - */ + streams = []; + for (const peer of session.peers) { + streams.push(createMetrics(peer.services.directstream)); + } - streams = []; - for (const peer of session.peers) { - streams.push(createMetrics(peer.services.directstream)); - } + // slowly connect to that the route maps are deterministic + await session.connect([[session.peers[0], session.peers[1]]]); + await session.connect([[session.peers[1], session.peers[2]]]); + await session.connect([[session.peers[0], session.peers[3]]]); + await waitForPeerStreams(streams[0].stream, streams[1].stream); + await waitForPeerStreams(streams[1].stream, streams[2].stream); + await waitForPeerStreams(streams[0].stream, streams[3].stream); + + expect([...streams[0].stream.peers.keys()]).toEqual([ + streams[1].stream.publicKeyHash, + streams[3].stream.publicKeyHash + ]); + expect([...streams[1].stream.peers.keys()]).toEqual([ + streams[0].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ]); + expect([...streams[2].stream.peers.keys()]).toEqual([ + streams[1].stream.publicKeyHash + ]); + expect([...streams[3].stream.peers.keys()]).toEqual([ + streams[0].stream.publicKeyHash + ]); // peer has recevied reachable event from everone + }); - // slowly connect to that the route maps are deterministic - await session.connect([[session.peers[0], session.peers[1]]]); - await session.connect([[session.peers[1], session.peers[2]]]); - await session.connect([[session.peers[0], session.peers[3]]]); - await waitForPeerStreams(streams[0].stream, streams[1].stream); - await waitForPeerStreams(streams[1].stream, streams[2].stream); - await waitForPeerStreams(streams[0].stream, streams[3].stream); + afterEach(async () => { + await session.stop(); + }); - expect([...streams[0].stream.peers.keys()]).toEqual([ - streams[1].stream.publicKeyHash, - streams[3].stream.publicKeyHash - ]); - expect([...streams[1].stream.peers.keys()]).toEqual([ - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ]); - expect([...streams[2].stream.peers.keys()]).toEqual([ - streams[1].stream.publicKeyHash - ]); - expect([...streams[3].stream.peers.keys()]).toEqual([ - streams[0].stream.publicKeyHash - ]); // peer has recevied reachable event from everone + it("re-route new connection", async () => { + /* + ┌───┐ + │3 │ + └┬─┬┘ + │┌▽┐ + ││0│ + │└┬┘ + │┌▽─┐ + ││1 │ + │└┬─┘ + ┌▽─▽┐ + │2 │ + └───┘ + */ + + await streams[3].stream.publish(new Uint8Array(0), { + to: [streams[2].stream.publicKeyHash], + mode: new SeekDelivery(2) }); - - afterEach(async () => { - await session.stop(); + expect( + streams[3].stream.routes + .findNeighbor( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ?.list?.map((x) => x.hash) + ).toEqual([streams[0].stream.publicKeyHash]); + await session.connect([[session.peers[2], session.peers[3]]]); + await waitForPeerStreams(streams[2].stream, streams[3].stream); + await streams[3].stream.publish(new Uint8Array(0), { + to: [streams[2].stream.publicKeyHash], + mode: new SeekDelivery(2) }); - - it("re-route new connection", async () => { - /* - ┌───┐ - │3 │ - └┬─┬┘ - │┌▽┐ - ││0│ - │└┬┘ - │┌▽─┐ - ││1 │ - │└┬─┘ - ┌▽─▽┐ - │2 │ - └───┘ - */ - - await streams[3].stream.publish(new Uint8Array(0), { - to: [streams[2].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); + await waitForResolved(() => { expect( streams[3].stream.routes .findNeighbor( streams[3].stream.publicKeyHash, streams[2].stream.publicKeyHash ) - ?.list?.map((x) => x.hash) - ).toEqual([streams[0].stream.publicKeyHash]); - await session.connect([[session.peers[2], session.peers[3]]]); - await waitForPeerStreams(streams[2].stream, streams[3].stream); - await streams[3].stream.publish(new Uint8Array(0), { - to: [streams[2].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); - await waitForResolved(() => { - expect( - streams[3].stream.routes - .findNeighbor( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ?.list.map((x) => x.hash) - ).toEqual([ - streams[2].stream.publicKeyHash, - streams[0].stream.publicKeyHash - ]); - }); + ?.list.map((x) => x.hash) + ).toEqual([ + streams[2].stream.publicKeyHash, + streams[0].stream.publicKeyHash + ]); }); + }); - it("neighbour drop", async () => { - await streams[3].stream.publish(new Uint8Array(0), { - to: [streams[2].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); + it("neighbour drop", async () => { + await streams[3].stream.publish(new Uint8Array(0), { + to: [streams[2].stream.publicKeyHash], + mode: new SeekDelivery(2) + }); + expect( + streams[3].stream.routes + .findNeighbor( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ?.list?.map((x) => x.hash) + ).toEqual([streams[0].stream.publicKeyHash]); + await session.peers[0].stop(); + await waitForResolved(() => { expect( - streams[3].stream.routes - .findNeighbor( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ?.list?.map((x) => x.hash) - ).toEqual([streams[0].stream.publicKeyHash]); - await session.peers[0].stop(); - await waitForResolved(() => { - expect( - streams[3].stream.routes.findNeighbor( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ).toBeUndefined(); - }); + streams[3].stream.routes.findNeighbor( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ).toBeUndefined(); }); + }); - it("distant drop", async () => { - await streams[3].stream.publish(new Uint8Array(0), { - to: [streams[2].stream.publicKeyHash], - mode: new SeekDelivery(2) - }); + it("distant drop", async () => { + await streams[3].stream.publish(new Uint8Array(0), { + to: [streams[2].stream.publicKeyHash], + mode: new SeekDelivery(2) + }); + expect( + streams[3].stream.routes + .findNeighbor( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ?.list?.map((x) => x.hash) + ).toEqual([streams[0].stream.publicKeyHash]); + await session.peers[2].stop(); + await waitForResolved(() => { expect( - streams[3].stream.routes - .findNeighbor( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ?.list?.map((x) => x.hash) - ).toEqual([streams[0].stream.publicKeyHash]); - await session.peers[2].stop(); - await waitForResolved(() => { - expect( - streams[3].stream.routes.isReachable( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ).toEqual(false); - }); + streams[3].stream.routes.isReachable( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ).toEqual(false); + }); - await waitForResolved(() => { - expect( - streams[3].stream.routes.findNeighbor( - streams[3].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ) - ).toBeUndefined(); - }); + await waitForResolved(() => { + expect( + streams[3].stream.routes.findNeighbor( + streams[3].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ) + ).toBeUndefined(); }); }); + }); - describe("invalidation", () => { - let extraSession: TestSessionStream; - beforeEach(async () => { - session = await connected(3); + describe("invalidation", () => { + let extraSession: TestSessionStream; + beforeEach(async () => { + session = await connected(3); - for (let i = 0; i < session.peers.length; i++) { - await waitForResolved(() => - expect( - session.peers[i].services.directstream.routes.count() - ).toEqual(2) - ); - } - }); - afterEach(async () => { - await session?.stop(); - await extraSession?.stop(); - }); + for (let i = 0; i < session.peers.length; i++) { + await waitForResolved(() => + expect(session.peers[i].services.directstream.routes.count()).toEqual( + 2 + ) + ); + } + }); + afterEach(async () => { + await session?.stop(); + await extraSession?.stop(); + }); - it("will not get blocked for slow writes", async () => { - let slowPeer = [1, 2]; - let fastPeer = [2, 1]; - let directDelivery = [true, false]; - for (let i = 0; i < slowPeer.length; i++) { - const slow = session.peers[0].services.directstream.peers.get( - session.peers[slowPeer[i]].services.directstream.publicKeyHash - )!; - const fast = session.peers[0].services.directstream.peers.get( - session.peers[fastPeer[i]].services.directstream.publicKeyHash - )!; - - expect(slow).toBeDefined(); - const waitForWriteDefaultFn = slow.waitForWrite.bind(slow); - slow.waitForWrite = async (bytes) => { - await delay(3000); - return waitForWriteDefaultFn(bytes); - }; + it("will not get blocked for slow writes", async () => { + let slowPeer = [1, 2]; + let fastPeer = [2, 1]; + let directDelivery = [true, false]; + for (let i = 0; i < slowPeer.length; i++) { + const slow = session.peers[0].services.directstream.peers.get( + session.peers[slowPeer[i]].services.directstream.publicKeyHash + )!; + const fast = session.peers[0].services.directstream.peers.get( + session.peers[fastPeer[i]].services.directstream.publicKeyHash + )!; + + expect(slow).toBeDefined(); + const waitForWriteDefaultFn = slow.waitForWrite.bind(slow); + + let abortController = new AbortController(); + slow.waitForWrite = async (bytes) => { + try { + await delay(3000, { signal: abortController.signal }); + } catch (error) { + return; + } + return waitForWriteDefaultFn(bytes); + }; - const t0 = +new Date(); - let t1: number | undefined = undefined; - await session.peers[0].services.directstream.publish( - new Uint8Array([1, 2, 3]), - { - to: directDelivery[i] ? [slow.publicKey, fast.publicKey] : [] // undefined ? - } - ); + const t0 = +new Date(); + let t1: number | undefined = undefined; + await session.peers[0].services.directstream.publish( + new Uint8Array([1, 2, 3]), + { + to: directDelivery[i] ? [slow.publicKey, fast.publicKey] : [] // undefined ? + } + ); - let listener = () => { - t1 = +new Date(); - }; - session.peers[fastPeer[i]].services.directstream.addEventListener( - "data", - listener - ); - await waitForResolved(() => expect(t1).toBeDefined()); + let listener = () => { + t1 = +new Date(); + }; + session.peers[fastPeer[i]].services.directstream.addEventListener( + "data", + listener + ); + await waitForResolved(() => expect(t1).toBeDefined()); - expect(t1! - t0).toBeLessThan(3000); + expect(t1! - t0).toBeLessThan(3000); - // reset - slow.waitForWrite = waitForWriteDefaultFn; - session.peers[fastPeer[i]].services.directstream.removeEventListener( - "data", - listener - ); - } - }); + // reset + slow.waitForWrite = waitForWriteDefaultFn; + session.peers[fastPeer[i]].services.directstream.removeEventListener( + "data", + listener + ); + abortController.abort(); + } }); }); +}); - describe("start/stop", () => { - let session: TestSessionStream; +describe("start/stop", () => { + let session: TestSessionStream; - afterEach(async () => { - await session.stop(); - }); + afterEach(async () => { + await session.stop(); + }); - it("can restart", async () => { - session = await connected(2, { - transports: [tcp(), webSockets({ filter: filters.all })], - services: { - directstream: (c) => - new TestDirectStream(c, { - connectionManager: { autoDial: false } - }) - } - }); // use 2 transports as this might cause issues if code is not handling multiple connections correctly - await waitForPeerStreams(stream(session, 0), stream(session, 1)); - - /* await waitFor(() => stream(session, 1).helloMap.size == 1); */ - await stream(session, 0).stop(); - /* await waitFor(() => stream(session, 1).helloMap.size === 0); */ - - await stream(session, 1).stop(); - expect(stream(session, 0).peers.size).toEqual(0); - await delay(3000); - await stream(session, 0).start(); - /* expect(stream(session, 0).helloMap.size).toEqual(0); */ - await stream(session, 1).start(); - - await waitFor(() => stream(session, 0).peers.size === 1); - /* await waitFor(() => stream(session, 0).helloMap.size === 1); - await waitFor(() => stream(session, 1).helloMap.size === 1); */ - await waitForPeerStreams(stream(session, 0), stream(session, 1)); + it("can restart", async () => { + session = await connected(2, { + transports: [tcp(), webSockets({ filter: filters.all })], + services: { + directstream: (c) => new TestDirectStream(c) + } + }); // use 2 transports as this might cause issues if code is not handling multiple connections correctly + await waitForPeerStreams(stream(session, 0), stream(session, 1)); + + /* await waitFor(() => stream(session, 1).helloMap.size == 1); */ + await stream(session, 0).stop(); + /* await waitFor(() => stream(session, 1).helloMap.size === 0); */ + + await stream(session, 1).stop(); + expect(stream(session, 0).peers.size).toEqual(0); + await delay(3000); + await stream(session, 0).start(); + /* expect(stream(session, 0).helloMap.size).toEqual(0); */ + await stream(session, 1).start(); + + await waitFor(() => stream(session, 0).peers.size === 1); + /* await waitFor(() => stream(session, 0).helloMap.size === 1); + await waitFor(() => stream(session, 1).helloMap.size === 1); */ + await waitForPeerStreams(stream(session, 0), stream(session, 1)); + }); + it("can connect after start", async () => { + session = await disconnected(2, { + transports: [tcp(), webSockets({ filter: filters.all })], + services: { + directstream: (c) => new TestDirectStream(c) + } }); - it("can connect after start", async () => { - session = await disconnected(2, { - transports: [tcp(), webSockets({ filter: filters.all })], - services: { - directstream: (c) => new TestDirectStream(c) - } - }); - await session.connect(); - await waitForPeerStreams(stream(session, 0), stream(session, 1)); - }); + await session.connect(); + await waitForPeerStreams(stream(session, 0), stream(session, 1)); + }); - it("can connect before start", async () => { - session = await connected(2, { - transports: [tcp(), webSockets({ filter: filters.all })], - services: { - directstream: (c) => new TestDirectStream(c) - } - }); - await delay(3000); + it("can connect before start", async () => { + session = await connected(2, { + transports: [tcp(), webSockets({ filter: filters.all })], + services: { + directstream: (c) => new TestDirectStream(c) + } + }); + await delay(3000); - await stream(session, 0).start(); - await stream(session, 1).start(); + await stream(session, 0).start(); + await stream(session, 1).start(); - await waitForPeerStreams(stream(session, 0), stream(session, 1)); - }); + await waitForPeerStreams(stream(session, 0), stream(session, 1)); + }); - it("can connect with delay", async () => { - session = await connected(2, { - transports: [tcp(), webSockets({ filter: filters.all })], - services: { - directstream: (c) => new TestDirectStream(c) - } - }); - await waitForPeerStreams(stream(session, 0), stream(session, 1)); - await session.peers[0].services.directstream.stop(); - await session.peers[1].services.directstream.stop(); - await waitFor( - () => session.peers[0].services.directstream.peers.size === 0 - ); - await waitFor( - () => session.peers[1].services.directstream.peers.size === 0 - ); - await session.peers[1].services.directstream.start(); - await delay(3000); - await session.peers[0].services.directstream.start(); - await waitForPeerStreams(stream(session, 0), stream(session, 1)); + it("can connect with delay", async () => { + session = await connected(2, { + transports: [tcp(), webSockets({ filter: filters.all })], + services: { + directstream: (c) => new TestDirectStream(c) + } }); + await waitForPeerStreams(stream(session, 0), stream(session, 1)); + await session.peers[0].services.directstream.stop(); + await session.peers[1].services.directstream.stop(); + await waitFor( + () => session.peers[0].services.directstream.peers.size === 0 + ); + await waitFor( + () => session.peers[1].services.directstream.peers.size === 0 + ); + await session.peers[1].services.directstream.start(); + await delay(3000); + await session.peers[0].services.directstream.start(); + await waitForPeerStreams(stream(session, 0), stream(session, 1)); }); +}); - describe("multistream", () => { - let session: TestSessionStream; - beforeEach(async () => { - session = await TestSession.connected(2, { - transports: [tcp(), webSockets({ filter: filters.all })], - services: { - directstream: (c) => new TestDirectStream(c), - directstream2: (c) => - new TestDirectStream(c, { id: "another-protocol" }) - } - }); // use 2 transports as this might cause issues if code is not handling multiple connections correctly - }); +describe("multistream", () => { + let session: TestSessionStream; + beforeEach(async () => { + session = await TestSession.connected(2, { + transports: [tcp(), webSockets({ filter: filters.all })], + services: { + directstream: (c) => new TestDirectStream(c), + directstream2: (c) => + new TestDirectStream(c, { id: "another-protocol" }) + } + }); // use 2 transports as this might cause issues if code is not handling multiple connections correctly + }); - afterEach(async () => { - await session.stop(); - }); + afterEach(async () => { + await session.stop(); + }); - it("can setup multiple streams at once", async () => { - await waitFor(() => !!stream(session, 0).peers.size); - await waitFor(() => !!stream(session, 1).peers.size); - await waitFor(() => !!service(session, 0, "directstream2").peers.size); - await waitFor(() => !!service(session, 1, "directstream2").peers.size); - }); + it("can setup multiple streams at once", async () => { + await waitFor(() => !!stream(session, 0).peers.size); + await waitFor(() => !!stream(session, 1).peers.size); + await waitFor(() => !!service(session, 0, "directstream2").peers.size); + await waitFor(() => !!service(session, 1, "directstream2").peers.size); }); }); diff --git a/packages/transport/stream/src/index.ts b/packages/transport/stream/src/index.ts index c0e94e0c4..e82bb15d9 100644 --- a/packages/transport/stream/src/index.ts +++ b/packages/transport/stream/src/index.ts @@ -10,7 +10,6 @@ import { Uint8ArrayList } from "uint8arraylist"; import { abortableSource } from "abortable-iterator"; import * as lp from "it-length-prefixed"; import { Routes } from "./routes.js"; -import { PeerMap } from "./peer-map.js"; import type { IncomingStreamData, Registrar @@ -61,6 +60,7 @@ import { import { DeliveryMode } from "@peerbit/stream-interface"; import { MultiAddrinfo } from "@peerbit/stream-interface"; +import { MovingAverageTracker } from "./metrics.js"; const logError = (e?: { message: string }) => logger.error(e?.message); export interface PeerStreamsInit { @@ -81,6 +81,14 @@ export interface PeerStreamEvents { } const SEEK_DELIVERY_TIMEOUT = 9e3; +const MAX_DATA_LENGTH = 1e7 + 1000; // 10 mb and some metadata +const MAX_QUEUED_BYTES = MAX_DATA_LENGTH * 50; + +const DEFAULT_PRUNE_CONNECTIONS_INTERVAL = 1e4; +const DEFAULT_MIN_CONNECTIONS = 2; +const DEFAULT_MAX_CONNECTIONS = 300; + +const DEFAULT_PRUNED_CONNNECTIONS_TIMEOUT = 30 * 1000; /** * Thin wrapper around a peer's inbound / outbound pubsub streams @@ -114,6 +122,8 @@ export class PeerStreams extends EventEmitter { private closed: boolean; public connId: string; + + private usedBandWidthTracker: MovingAverageTracker; constructor(init: PeerStreamsInit) { super(); @@ -124,6 +134,7 @@ export class PeerStreams extends EventEmitter { this.closed = false; this.connId = init.connId; this.counter = 1; + this.usedBandWidthTracker = new MovingAverageTracker(); } /** @@ -140,6 +151,10 @@ export class PeerStreams extends EventEmitter { return Boolean(this.outboundStream); } + get usedBanwidth() { + return this.usedBandWidthTracker.value; + } + /** * Send a message to this peer. * Throws if there is no `stream` to write to available. @@ -149,6 +164,9 @@ export class PeerStreams extends EventEmitter { logger.error("No writable connection to " + this.peerId.toString()); throw new Error("No writable connection to " + this.peerId.toString()); } + + this.usedBandWidthTracker.add(data.byteLength); + this.outboundStream.push( data instanceof Uint8Array ? new Uint8ArrayList(data) : data ); @@ -214,7 +232,7 @@ export class PeerStreams extends EventEmitter { this._rawInboundStream = stream; this.inboundStream = abortableSource( pipe(this._rawInboundStream, (source) => - lp.decode(source, { maxDataLength: 100001000 }) + lp.decode(source, { maxDataLength: MAX_DATA_LENGTH }) ), this.inboundAbortController.signal, { @@ -304,10 +322,22 @@ export interface StreamEvents extends PeerEvents, MessageEvents { data: CustomEvent; } -export type ConnectionManagerOptions = { - autoDial?: boolean; - retryDelay?: number; +type DialerOptions = { + retryDelay: number; +}; +type PrunerOptions = { + interval: number; // how often to check for pruning + bandwidth?: number; // Mbps, unlimited if unset + maxBuffer?: number; // max queued bytes until pruning + connectionTimeout: number; // how long a pruned connection should be treated as non wanted }; +type ConnectionManagerOptions = { + minConnections: number; + maxConnections: number; + dialer?: DialerOptions; + pruner?: PrunerOptions; +}; + export type DirectStreamOptions = { canRelayMessage?: boolean; emitSelf?: boolean; @@ -315,8 +345,7 @@ export type DirectStreamOptions = { maxInboundStreams?: number; maxOutboundStreams?: number; signaturePolicy?: SignaturePolicy; - pingInterval?: number | null; - connectionManager?: ConnectionManagerOptions; + connectionManager?: ConnectionManagerArguments; }; export interface DirectStreamComponents extends Components { @@ -328,6 +357,13 @@ export interface DirectStreamComponents extends Components { events: TypedEventTarget; } +export type ConnectionManagerArguments = + | (Partial> & + Partial> & { + pruner?: Partial | false; + } & { dialer?: Partial | false }) + | false; + export abstract class DirectStream< Events extends { [s: string]: any } = StreamEvents > @@ -335,7 +371,6 @@ export abstract class DirectStream< implements WaitForPeer { public peerId: PeerId; - public peerIdStr: string; public publicKey: PublicSignKey; public publicKeyHash: string; public sign: (bytes: Uint8Array) => Promise; @@ -345,7 +380,7 @@ export abstract class DirectStream< /** * Map of peer streams */ - public peers: PeerMap; + public peers: Map; public peerKeyHashToPublicKey: Map; public routes: Routes; /** @@ -364,11 +399,13 @@ export abstract class DirectStream< private _registrarTopologyIds: string[] | undefined; private readonly maxInboundStreams?: number; private readonly maxOutboundStreams?: number; - private connectionManagerOptions: ConnectionManagerOptions; - private recentDials: Cache; - private traces: Map; + connectionManagerOptions: ConnectionManagerOptions; + private recentDials?: Cache; + private traces: Cache; closeController: AbortController; private healthChecks: Map>; + private pruneConnectionsTimeout: ReturnType; + private prunedConnectionsCache?: Cache; private _ackCallbacks: Map< string, @@ -392,13 +429,12 @@ export abstract class DirectStream< maxInboundStreams, maxOutboundStreams, signaturePolicy = "StictSign", - connectionManager = { autoDial: true } + connectionManager } = options || {}; const signKey = getKeypairFromPeerId(components.peerId); this.sign = signKey.sign.bind(signKey); this.peerId = components.peerId; - this.peerIdStr = components.peerId.toString(); this.publicKey = signKey.publicKey; this.publicKeyHash = signKey.publicKey.hashcode(); this.multicodecs = multicodecs; @@ -417,16 +453,55 @@ export abstract class DirectStream< this.onPeerConnected = this.onPeerConnected.bind(this); this.onPeerDisconnected = this.onPeerDisconnected.bind(this); this.signaturePolicy = signaturePolicy; - this.connectionManagerOptions = connectionManager; - this.recentDials = new Cache({ - ttl: connectionManager.retryDelay || 60 * 1000, - max: 1e3 - }); - this.traces = new Map(); /* new Cache({ + if (connectionManager === false || connectionManager === null) { + this.connectionManagerOptions = { + maxConnections: Number.MAX_SAFE_INTEGER, + minConnections: 0, + dialer: undefined, + pruner: undefined + }; + } else { + this.connectionManagerOptions = { + maxConnections: DEFAULT_MAX_CONNECTIONS, + minConnections: DEFAULT_MIN_CONNECTIONS, + ...connectionManager, + dialer: + connectionManager?.dialer !== false && + connectionManager?.dialer !== null + ? { retryDelay: 60 * 1000, ...connectionManager?.dialer } + : undefined, + pruner: + connectionManager?.pruner !== false && + connectionManager?.pruner !== null + ? { + connectionTimeout: DEFAULT_PRUNED_CONNNECTIONS_TIMEOUT, + interval: DEFAULT_PRUNE_CONNECTIONS_INTERVAL, + maxBuffer: MAX_QUEUED_BYTES, + ...connectionManager?.pruner + } + : undefined + }; + } + + this.recentDials = this.connectionManagerOptions.dialer + ? new Cache({ + ttl: this.connectionManagerOptions.dialer.retryDelay, + max: 1e3 + }) + : undefined; + + this.traces = new Cache({ ttl: 10 * 1000, max: 1e6 - }); */ + }); + + this.prunedConnectionsCache = this.connectionManagerOptions.pruner + ? new Cache({ + max: 1e6, + ttl: this.connectionManagerOptions.pruner.connectionTimeout + }) + : undefined; } async start() { @@ -459,11 +534,9 @@ export abstract class DirectStream< }) ) ); - // TODO remove/modify when https://github.com/libp2p/js-libp2p/issues/2036 is resolved this.components.events.addEventListener("connection:open", (e) => { if (e.detail.multiplexer === "/webrtc") { - console.log("ADD WEBRTC CONNECTION"); this.onPeerConnected(e.detail.remotePeer, e.detail); } }); @@ -493,35 +566,19 @@ export abstract class DirectStream< await this.onPeerConnected(conn.remotePeer, conn, { fromExisting: true }); } - - /* const pingJob = async () => { - // TODO don't use setInterval but waitFor previous done to be done - await this.pingJobPromise; - const promises: Promise[] = []; - this.peers.forEach((peer) => { - promises.push( - this.ping(peer).catch((e) => { - if (e instanceof TimeoutError) { - // Ignore - } else { - logger.error(e); + if (this.connectionManagerOptions.pruner) { + const pruneConnectionsLoop = () => { + this.pruneConnectionsTimeout = setTimeout(() => { + this.maybePruneConnections().finally(() => { + if (!this.started) { + return; } - }) - ); - }); - promises.push(this.hello()); // Repetedly say hello to everyone to create traces in the network to measure latencies - this.pingJobPromise = Promise.all(promises) - .catch((e) => { - logger.error(e?.message); - }) - .finally(() => { - if (!this.started || !this.pingInterval) { - return; - } - this.pingJob = setTimeout(pingJob, this.pingInterval); - }); - }; - pingJob(); */ + pruneConnectionsLoop(); + }); + }, this.connectionManagerOptions.pruner!.interval); + }; + pruneConnectionsLoop(); + } } /** @@ -534,6 +591,8 @@ export abstract class DirectStream< this.started = false; this.closeController.abort(); + clearTimeout(this.pruneConnectionsTimeout); + await Promise.all( this.multicodecs.map((x) => this.components.registrar.unhandle(x)) ); @@ -546,6 +605,7 @@ export abstract class DirectStream< clearTimeout(v); } this.healthChecks.clear(); + this.prunedConnectionsCache?.clear(); // unregister protocol and handlers if (this._registrarTopologyIds != null) { @@ -588,6 +648,12 @@ export abstract class DirectStream< } const publicKey = getPublicKeyFromPeerId(peerId); + + if (this.prunedConnectionsCache?.has(publicKey.hashcode())) { + await connection.close(); + return; + } + const peer = this.addPeer( peerId, publicKey, @@ -613,6 +679,11 @@ export abstract class DirectStream< if (!this.isStarted() || conn.status !== "open") { return; } + const peerKey = getPublicKeyFromPeerId(peerId); + + if (this.prunedConnectionsCache?.has(peerKey.hashcode())) { + return; // we recently pruned this connect, dont allow it to connect for a while + } try { // TODO remove/modify when https://github.com/libp2p/js-libp2p/issues/2036 is resolved @@ -649,8 +720,6 @@ export abstract class DirectStream< return; } try { - const peerKey = getPublicKeyFromPeerId(peerId); - for (const existingStreams of conn.streams) { if ( existingStreams.protocol && @@ -845,6 +914,7 @@ export abstract class DirectStream< connId: string ): PeerStreams { const publicKeyHash = publicKey.hashcode(); + const existing = this.peers.get(publicKeyHash); // If peer streams already exists, do nothing @@ -1069,8 +1139,12 @@ export abstract class DirectStream< // TODO only give origin info to peers we want to connect to us header: new MessageHeader({ to: message.header.signatures!.publicKeys.map((x) => x.hashcode()), + // include our origin if message is SeekDelivery and we have not recently pruned a connection to this peer origin: - message.deliveryMode instanceof SeekDelivery + message.deliveryMode instanceof SeekDelivery && + !message.header.signatures!.publicKeys.find( + (x) => this.prunedConnectionsCache?.has(x.hashcode()) + ) ? new MultiAddrinfo( this.components.addressManager .getAddresses() @@ -1121,7 +1195,8 @@ export abstract class DirectStream< await this.publishMessage(this.publicKey, message, [nextStream], true); } } else { - if (message.header.origin && this.connectionManagerOptions.autoDial) { + // if origin exist (we can connect to remote peer) && we have autodialer turned on + if (message.header.origin && this.connectionManagerOptions.dialer) { this.maybeConnectDirectly( message.header.signatures!.publicKeys[0].hashcode(), message.header.origin @@ -1254,7 +1329,7 @@ export abstract class DirectStream< public async relayMessage( from: PublicSignKey, message: Message, - to?: PeerStreams[] | PeerMap + to?: PeerStreams[] | Map ) { if (this.canRelayMessage) { if (message instanceof DataMessage) { @@ -1275,7 +1350,7 @@ export abstract class DirectStream< message.deliveryMode instanceof SeekDelivery ) { const messageId = await sha256Base64(message.id); - this.traces.set(messageId, from.hashcode()); + this.traces.add(messageId, from.hashcode()); } } @@ -1432,7 +1507,7 @@ export abstract class DirectStream< public async publishMessage( from: PublicSignKey, message: Message, - to?: PeerStreams[] | PeerMap, + to?: PeerStreams[] | Map, relayed?: boolean ): Promise { let delivereyPromise: DeferredPromise | undefined = undefined as any; @@ -1514,7 +1589,7 @@ export abstract class DirectStream< } // We fils to send the message directly, instead fallback to floodsub - const peers: PeerStreams[] | PeerMap = to || this.peers; + const peers: PeerStreams[] | Map = to || this.peers; if ( peers == null || (Array.isArray(peers) && peers.length === 0) || @@ -1547,14 +1622,14 @@ export abstract class DirectStream< } async maybeConnectDirectly(toHash: string, origin: MultiAddrinfo) { - if (this.peers.has(toHash)) { + if (this.peers.has(toHash) || this.prunedConnectionsCache?.has(toHash)) { return; // TODO, is this expected, or are we to dial more addresses? } const addresses = origin.multiaddrs .filter((x) => { - const ret = !this.recentDials.has(x); - this.recentDials.add(x); + const ret = !this.recentDials!.has(x); + this.recentDials!.add(x); return ret; }) .map((x) => multiaddr(x)); @@ -1615,6 +1690,58 @@ export abstract class DirectStream< get pending(): boolean { return this._ackCallbacks.size > 0; } + + lastQueuedBytes = 0; + + // make this into a job? run every few ms + maybePruneConnections(): Promise { + if (this.connectionManagerOptions.pruner!.bandwidth != null) { + let usedBandwidth = 0; + for (const [_k, v] of this.peers) { + usedBandwidth += v.usedBanwidth; + } + usedBandwidth /= this.peers.size; + + if (usedBandwidth > this.connectionManagerOptions.pruner!.bandwidth) { + // prune + return this.pruneConnections(); + } + } else if (this.connectionManagerOptions.pruner!.maxBuffer != null) { + const queuedBytes = this.getQueuedBytes(); + if (queuedBytes > this.connectionManagerOptions.pruner!.maxBuffer) { + // prune + return this.pruneConnections(); + } + } + return Promise.resolve(); + } + + async pruneConnections(): Promise { + // TODO sort by bandwidth + if (this.peers.size <= this.connectionManagerOptions.minConnections) { + return; + } + const sorted = [...this.peers.values()] + .sort((x, y) => x.usedBanwidth - y.usedBanwidth) + .map((x) => x.publicKey.hashcode()); + const prunables = this.routes.getPrunable(sorted); + if (prunables.length === 0) { + return; + } + const stream = this.peers.get(prunables[0])!; + this.prunedConnectionsCache!.add(stream.publicKey.hashcode()); + + await this.onPeerDisconnected(stream.peerId); + return this.components.connectionManager.closeConnections(stream.peerId); + } + + getQueuedBytes(): number { + let sum = 0; + for (const peer of this.peers) { + sum += peer[1].outboundStream?.readableLength || 0; + } + return sum; + } } export const waitForPeers = async ( diff --git a/packages/transport/stream/src/metrics.ts b/packages/transport/stream/src/metrics.ts index 75773a8df..cc9663f3a 100644 --- a/packages/transport/stream/src/metrics.ts +++ b/packages/transport/stream/src/metrics.ts @@ -1,6 +1,7 @@ import { PublicSignKey } from "@peerbit/crypto"; - +/* export class Frequency { + private interval: ReturnType; private lastFrequency: number; private events = 0; @@ -68,3 +69,21 @@ export class RouteFrequency { return undefined; } } + */ + +export class MovingAverageTracker { + private lastTS = 0; + + value = 0; + + constructor(readonly tau = 10) { + this.lastTS = +new Date(); + } + add(number: number) { + const now = +new Date(); + const dt = (now - this.lastTS) / 1000; + const alpha_t = 1 - Math.exp(-dt / this.tau); + this.value = (1 - alpha_t) * this.value + (alpha_t * number) / dt; + this.lastTS = now; + } +} diff --git a/packages/transport/stream/src/peer-map.ts b/packages/transport/stream/src/peer-map.ts deleted file mode 100644 index e940543db..000000000 --- a/packages/transport/stream/src/peer-map.ts +++ /dev/null @@ -1 +0,0 @@ -export type PeerMap = Map; diff --git a/packages/transport/stream/src/routes.ts b/packages/transport/stream/src/routes.ts index 5b7c38431..174d30afa 100644 --- a/packages/transport/stream/src/routes.ts +++ b/packages/transport/stream/src/routes.ts @@ -3,16 +3,12 @@ import { PublicSignKey } from "@peerbit/crypto"; export class Routes { // END receiver -> Neighbour - constructor( - readonly me: string, - readonly routes: Map< - string, - Map< - string, - { session: number; list: { hash: string; distance: number }[] } - > - > = new Map() - ) {} + routes: Map< + string, + Map + > = new Map(); + + constructor(readonly me: string) {} clear() { this.routes.clear(); @@ -248,4 +244,27 @@ export class Routes { } return fanoutMap || (fanoutMap = new Map()); } + + /** + * Returns a list of a prunable nodes that are not needed to reach all remote nodes + */ + getPrunable(neighbours: string[]): string[] { + const map = this.routes.get(this.me); + if (map) { + // check if all targets can be reached without it + return neighbours.filter((candidate) => { + for (const [target, neighbours] of map) { + if ( + target !== candidate && + neighbours.list.length === 1 && + neighbours.list[0].hash === candidate + ) { + return false; + } + } + return true; + }); + } + return []; + } } diff --git a/packages/transport/stream/test/browser/browser-node/src/App.tsx b/packages/transport/stream/test/browser/browser-node/src/App.tsx index afdd8daab..2169fd78c 100644 --- a/packages/transport/stream/test/browser/browser-node/src/App.tsx +++ b/packages/transport/stream/test/browser/browser-node/src/App.tsx @@ -1,8 +1,3 @@ -import { - ConnectionManagerOptions, - DirectStream, - DirectStreamComponents -} from "@peerbit/stream"; import { useEffect, useReducer, useState, useRef } from "react"; import { createLibp2p } from "libp2p"; import { circuitRelayTransport } from "libp2p/circuit-relay"; @@ -20,8 +15,7 @@ await ready; const client = await createLibp2p<{ stream: TestDirectStream; identify: any }>({ services: { - stream: (c) => - new TestDirectStream(c, { connectionManager: { autoDial: true } }), + stream: (c) => new TestDirectStream(c), identify: identifyService() }, connectionGater: { diff --git a/packages/transport/stream/test/browser/shared/utils.ts b/packages/transport/stream/test/browser/shared/utils.ts index 1d4a0f665..e2f438e0a 100644 --- a/packages/transport/stream/test/browser/shared/utils.ts +++ b/packages/transport/stream/test/browser/shared/utils.ts @@ -1,5 +1,5 @@ import { - ConnectionManagerOptions, + ConnectionManagerArguments, DirectStream, DirectStreamComponents } from "@peerbit/stream"; @@ -9,16 +9,13 @@ export class TestDirectStream extends DirectStream { components: DirectStreamComponents, options: { id?: string; - pingInterval?: number | null; - connectionManager?: ConnectionManagerOptions; + connectionManager?: ConnectionManagerArguments; } = {} ) { super(components, [options.id || "/browser-test/0.0.0"], { canRelayMessage: true, emitSelf: false, - connectionManager: options.connectionManager || { - autoDial: false - }, + connectionManager: options.connectionManager || false, ...options }); }