Skip to content

Commit

Permalink
feat: add connection pruner in pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Oct 31, 2023
1 parent 8c30632 commit 8b4c095
Show file tree
Hide file tree
Showing 15 changed files with 1,315 additions and 1,008 deletions.
17 changes: 13 additions & 4 deletions packages/clients/peerbit/src/__tests__/dial.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@ describe(`dial`, function () {

it("autodials by default", async () => {
expect(
clients[0].services.pubsub["connectionManagerOptions"].autoDial
).toBeTrue();
clients[0].services.pubsub.connectionManagerOptions.dialer
).toBeDefined();
expect(
clients[1].services.pubsub["connectionManagerOptions"].autoDial
).toBeTrue();
clients[1].services.blocks.connectionManagerOptions.dialer
).toBeDefined();
});

it("autoprunes by default", async () => {
expect(
clients[0].services.pubsub.connectionManagerOptions.pruner
).toBeDefined();
expect(
clients[1].services.blocks.connectionManagerOptions.pruner
).toBeDefined();
});
});
7 changes: 3 additions & 4 deletions packages/clients/peerbit/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ export const createLibp2pExtended = (
((c) =>
new DirectSub(c, {
canRelayMessage: true,
signaturePolicy: "StrictNoSign",
connectionManager: {
autoDial: true
}
signaturePolicy: "StrictNoSign"
// auto dial true
// auto prune true
})),
blocks: opts.services?.blocks || ((c) => new DirectBlock(c)),
...opts.services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Documents, SetupOptions } from "../document-store.js";
import { Replicator } from "@peerbit/shared-log";
import { DirectSub } from "@peerbit/pubsub";
import { mplex } from "@libp2p/mplex";

// Run with "node --loader ts-node/esm ./src/__benchmark__/replication.ts"
// put x 1,009 ops/sec ±2.57% (80 runs sampled)

Expand Down Expand Up @@ -55,18 +56,19 @@ const peers = await Promise.all(
pubsub: (sub) =>
new DirectSub(sub, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
}),
await createLibp2pExtended({
connectionManager: {},
transports: [tcp()],
streamMuxers: [mplex()],
services: {
pubsub: (sub) =>
new DirectSub(sub, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
}),
Expand All @@ -77,7 +79,7 @@ const peers = await Promise.all(
pubsub: (sub) =>
new DirectSub(sub, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe(`leaders`, function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/transport/libp2p-test-utils/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class TestSession<T> {
connectionManager: (options?.[i] || options)?.connectionManager ?? {
minConnections: 0
},

datastore: (options?.[i] || options)?.datastore,
transports:
(options?.[i] || options)?.transports ??
Expand Down
4 changes: 1 addition & 3 deletions packages/transport/pubsub/src/__benchmark__/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ const session = await TestSession.disconnected(4, {
new DirectSub(c, {
canRelayMessage: true,
emitSelf: true,
connectionManager: {
autoDial: false
}
connectionManager: false
})
}
});
Expand Down
21 changes: 9 additions & 12 deletions packages/transport/pubsub/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -217,7 +217,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -369,7 +369,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -523,7 +523,7 @@ describe("pubsub", function () {
new DirectSub(c, {
emitSelf: true,
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -579,7 +579,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -680,7 +680,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -752,17 +752,14 @@ describe("pubsub", function () {
let session: TestSession<{ pubsub: DirectSub }>;
let streams: ReturnType<typeof createMetrics>[];

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "topic";
const PING_INTERVAL = 1000;
beforeEach(async () => {
session = await TestSession.disconnected(4, {
services: {
pubsub: (c) =>
new DirectSub(c, {
pingInterval: PING_INTERVAL,
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -894,7 +891,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down Expand Up @@ -983,7 +980,7 @@ describe("pubsub", function () {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: { autoDial: false }
connectionManager: false
})
}
});
Expand Down
4 changes: 1 addition & 3 deletions packages/transport/stream/src/__benchmark__/transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ class TestStreamImpl extends DirectStream {
super(c, ["bench/0.0.0"], {
canRelayMessage: true,
emitSelf: true,
connectionManager: {
autoDial: false
}
connectionManager: false
});
}
}
Expand Down
Loading

0 comments on commit 8b4c095

Please sign in to comment.