Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Nov 4, 2024
1 parent c679a0d commit 40bfb0f
Show file tree
Hide file tree
Showing 55 changed files with 4,678 additions and 2,101 deletions.
3 changes: 2 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
**/public/peerbit/**
**/public/peerbit/**
**/target/**
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"packages/utils/cache",
"packages/utils/logger",
"packages/utils/keychain",
"packages/utils/indexer/*"
"packages/utils/indexer/*",
"packages/utils/riblt/riblt-rust"
],
"engines": {
"node": ">=18"
Expand Down
5 changes: 4 additions & 1 deletion packages/clients/peerbit/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ export const createLibp2pExtended = (
): Promise<Libp2pExtended> => {
let extraServices: any = {};

if (!opts.services?.["relay"]) {
if (opts.services?.["relay"] == null) {
delete opts.services?.["relay"];
} else if (!opts.services?.["relay"]) {
const relayComponent = relay();
if (relayComponent) {
// will be null in browser
extraServices["relay"] = relayComponent;
}
}

if (!opts.services?.["identify"]) {
extraServices["identify"] = identify();
}
Expand Down
42 changes: 31 additions & 11 deletions packages/clients/peerbit/src/peer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { privateKeyFromRaw } from "@libp2p/crypto/keys";
import type { PeerId } from "@libp2p/interface";
import "@libp2p/peer-id";
import {
type Multiaddr,
Expand All @@ -10,6 +11,7 @@ import { DirectBlock } from "@peerbit/blocks";
import {
Ed25519Keypair,
Ed25519PublicKey,
PublicSignKey,
Secp256k1Keypair,
getKeypairFromPrivateKey,
} from "@peerbit/crypto";
Expand Down Expand Up @@ -119,7 +121,7 @@ export class Peerbit implements ProgramClient {
let libp2pExtended: Libp2pExtended | undefined = (options as Libp2pOptions)
.libp2p as Libp2pExtended;

const asRelay = (options as SimpleLibp2pOptions).relay;
const asRelay = (options as SimpleLibp2pOptions).relay ?? true;

const directory = options.directory;
const hasDir = directory != null;
Expand Down Expand Up @@ -176,19 +178,25 @@ export class Peerbit implements ProgramClient {
: undefined;
}

const services: any = {
keychain: (c: any) => keychain,
blocks: (c: any) =>
new DirectBlock(c, {
canRelayMessage: asRelay,
directory: blocksDirectory,
}),
pubsub: (c: any) => new DirectSub(c, { canRelayMessage: asRelay }),
...extendedOptions?.services,
};

if (!asRelay) {
services.relay = null;
}

libp2pExtended = await createLibp2pExtended({
...extendedOptions,
privateKey,
services: {
keychain: (c: any) => keychain,
blocks: (c: any) =>
new DirectBlock(c, {
canRelayMessage: asRelay,
directory: blocksDirectory,
}),
pubsub: (c: any) => new DirectSub(c, { canRelayMessage: asRelay }),
...extendedOptions?.services,
} as any, // TODO types are funky
services,
datastore,
});
}
Expand Down Expand Up @@ -280,6 +288,7 @@ export class Peerbit implements ProgramClient {
? address
: address.getMultiaddrs();
const connection = await this.libp2p.dial(maddress);

const publicKey = Ed25519PublicKey.fromPeerId(connection.remotePeer);

// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
Expand All @@ -292,6 +301,17 @@ export class Peerbit implements ProgramClient {
);
}

async hangUp(address: PeerId | PublicSignKey | string | Multiaddr) {
await this.libp2p.hangUp(
address instanceof PublicSignKey
? address.toPeerId()
: typeof address == "string"
? multiaddr(address)
: address,
);
// TODO wait for pubsub and blocks to disconnect?
}

async start() {
await this._storage.open();
await this.indexer.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SeekDelivery } from "@peerbit/stream-interface";
import { waitFor } from "@peerbit/time";
import { waitFor, waitForResolved } from "@peerbit/time";
import { expect } from "chai";
import { Peerbit } from "../src/index.js";

Expand Down Expand Up @@ -52,3 +52,36 @@ describe(`dial`, function () {
);
});
});

describe(`hangup`, function () {
let clients: [Peerbit, Peerbit];

beforeEach(async () => {
clients = [
await Peerbit.create({
relay: false, // https://github.com/libp2p/js-libp2p/issues/2794
}),
await Peerbit.create({
relay: false, // https://github.com/libp2p/js-libp2p/issues/2794
}),
];
});

afterEach(async () => {
await Promise.all(clients.map((c) => c.stop()));
});

it("pubsub subscribers clears up", async () => {
let topic = "topic";
await clients[0].services.pubsub.subscribe(topic);
await clients[1].services.pubsub.subscribe(topic);
await clients[0].dial(clients[1].getMultiaddrs()[0]);
await waitForResolved(() =>
expect(clients[0].services.pubsub.peers.size).to.eq(1),
);
await clients[0].hangUp(clients[1].peerId);
await waitForResolved(() =>
expect(clients[0].services.pubsub.peers.size).to.eq(0),
);
});
});
142 changes: 72 additions & 70 deletions packages/programs/data/shared-log/benchmark/get-samples.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,82 @@ import { Ed25519Keypair } from "@peerbit/crypto";
import type { Index } from "@peerbit/indexer-interface";
import { create as createIndex } from "@peerbit/indexer-sqlite3";
import B from "benchmark";
import {
ReplicationRangeIndexable,
getEvenlySpacedU32,
getSamples,
} from "../src/ranges.js";
import { createNumbers } from "../src/integers.js";
import { ReplicationRangeIndexableU32, getSamples } from "../src/ranges.js";

// Run with "node --loader ts-node/esm ./benchmark/get-samples.ts"

let create = async (
...rects: ReplicationRangeIndexable[]
): Promise<[Index<ReplicationRangeIndexable, unknown>, any]> => {
const indices = await createIndex();
const index = await indices.init({ schema: ReplicationRangeIndexable });
await indices.start();
for (const rect of rects) {
await index.put(rect);
const suite = new B.Suite();
const resolutions: ["u32", "u64"] = ["u32", "u64"];
for (const resolution of resolutions) {
let create = async (
...rects: ReplicationRangeIndexableU32[]
): Promise<[Index<ReplicationRangeIndexableU32, unknown>, any]> => {
const indices = await createIndex();
const index = await indices.init({ schema: ReplicationRangeIndexableU32 });
await indices.start();
for (const rect of rects) {
await index.put(rect);
}
return [index, indices];
};

let a = (await Ed25519Keypair.create()).publicKey;
let b = (await Ed25519Keypair.create()).publicKey;
let c = (await Ed25519Keypair.create()).publicKey;

let ranges: ReplicationRangeIndexableU32[] = [];
let rangeCount = 1000;
for (let i = 0; i < rangeCount; i++) {
ranges.push(
...[
new ReplicationRangeIndexableU32({
publicKey: a,
length: 0.2 / rangeCount,
offset: (0 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexableU32({
publicKey: b,
length: 0.4 / rangeCount,
offset: (0.333 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexableU32({
publicKey: c,
length: 0.6 / rangeCount,
offset: (0.666 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexableU32({
publicKey: c,
length: 0.6 / rangeCount,
offset: (0.666 + rangeCount / i) % 1,
timestamp: 0n,
}),
],
);
}
return [index, indices];
};

let a = (await Ed25519Keypair.create()).publicKey;
let b = (await Ed25519Keypair.create()).publicKey;
let c = (await Ed25519Keypair.create()).publicKey;
const [index, indices] = await create(...ranges);

let ranges: ReplicationRangeIndexable[] = [];
let rangeCount = 1000;
for (let i = 0; i < rangeCount; i++) {
ranges.push(
...[
new ReplicationRangeIndexable({
publicKey: a,
length: 0.2 / rangeCount,
offset: (0 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexable({
publicKey: b,
length: 0.4 / rangeCount,
offset: (0.333 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexable({
publicKey: c,
length: 0.6 / rangeCount,
offset: (0.666 + rangeCount / i) % 1,
timestamp: 0n,
}),
new ReplicationRangeIndexable({
publicKey: c,
length: 0.6 / rangeCount,
offset: (0.666 + rangeCount / i) % 1,
timestamp: 0n,
}),
],
);
const numbers = createNumbers(resolution);
suite
.add("getSamples", {
fn: async (deferred: any) => {
await getSamples(numbers.getGrid(Math.random(), 2), index, 0, numbers);
deferred.resolve();
},
defer: true,
})
.on("cycle", (event: any) => {
console.log(String(event.target));
})
.on("error", (err: any) => {
throw err;
})
.on("complete", async function (this: any) {
await indices.drop();
})
.run();
}

const [index, indices] = await create(...ranges);
const suite = new B.Suite();
suite
.add("getSamples", {
fn: async (deferred: any) => {
await getSamples(getEvenlySpacedU32(Math.random(), 2), index, 0);
deferred.resolve();
},
defer: true,
})
.on("cycle", (event: any) => {
console.log(String(event.target));
})
.on("error", (err: any) => {
throw err;
})
.on("complete", async function (this: any) {
await indices.drop();
})
.run();
10 changes: 5 additions & 5 deletions packages/programs/data/shared-log/benchmark/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ class Document {
}

@variant("test_shared_log")
class TestStore extends Program<Args<Document>> {
class TestStore extends Program<Args<Document, any>> {
@field({ type: SharedLog })
logs: SharedLog<Document>;
logs: SharedLog<Document, any>;

constructor(properties?: { logs: SharedLog<Document> }) {
constructor(properties?: { logs: SharedLog<Document, any> }) {
super();
this.logs = properties?.logs || new SharedLog();
}

async open(options?: Args<Document>): Promise<void> {
async open(options?: Args<Document, any>): Promise<void> {
await this.logs.open({
...options,
encoding: {
Expand All @@ -57,7 +57,7 @@ const peersCount = 1;
const session = await TestSession.connected(peersCount);

const store = new TestStore({
logs: new SharedLog<Document>({
logs: new SharedLog<Document, any>({
id: new Uint8Array(32),
}),
});
Expand Down
10 changes: 6 additions & 4 deletions packages/programs/data/shared-log/benchmark/replication-prune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ let session: TestSession = await TestSession.connected(3, [
},
},
]);
let db1: EventStore<string>, db2: EventStore<string>, db3: EventStore<string>;
let db1: EventStore<string, any>,
db2: EventStore<string, any>,
db3: EventStore<string, any>;

const init = async (min: number, max?: number) => {
db1 = await session.peers[0].open(new EventStore<string>(), {
db1 = await session.peers[0].open(new EventStore<string, any>(), {
args: {
replicas: {
min,
Expand All @@ -62,7 +64,7 @@ const init = async (min: number, max?: number) => {
replicate: false,
},
});
db2 = (await EventStore.open<EventStore<string>>(
db2 = (await EventStore.open<EventStore<string, any>>(
db1.address!,
session.peers[1],
{
Expand All @@ -75,7 +77,7 @@ const init = async (min: number, max?: number) => {
},
))!;

db3 = (await EventStore.open<EventStore<string>>(
db3 = (await EventStore.open<EventStore<string, any>>(
db1.address!,
session.peers[2],
{
Expand Down
Loading

0 comments on commit 40bfb0f

Please sign in to comment.