Skip to content

Commit

Permalink
Merge pull request #347 from dao-xyz/rateless
Browse files Browse the repository at this point in the history
Rateless syncing
  • Loading branch information
marcus-pousette authored Dec 28, 2024
2 parents 299e7a6 + a97d78a commit 434e140
Show file tree
Hide file tree
Showing 138 changed files with 23,207 additions and 8,871 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ jobs:
cache: yarn
- name: Install deps
run: |
yarn && npx playwright install-deps
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
yarn
npx playwright install-deps
- name: Build
run: |
yarn build
Expand Down Expand Up @@ -59,7 +61,9 @@ jobs:
cache: yarn
- name: Install deps
run: |
yarn && npx playwright install-deps
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
yarn
npx playwright install-deps
- name: Build
run: |
yarn build
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ jobs:
with:
node-version: lts/*
registry-url: 'https://registry.npmjs.org'

- name: Install deps
run: |
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
yarn
- name: Build Packages
if: ${{ steps.release.outputs.releases_created }}
run: |
Expand Down
4 changes: 3 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
**/public/peerbit/**
**/public/peerbit/**
**/target/**
**/pkg/**
3 changes: 2 additions & 1 deletion .release-please.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"packages/utils/indexer/simple": {},
"packages/utils/indexer/sqlite3": {},
"packages/utils/indexer/tests": {},
"packages/utils/indexer/interface": {}
"packages/utils/indexer/interface": {},
"packages/utils/indexer/rateless-iblt": {}
}
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"packages/utils/cache",
"packages/utils/logger",
"packages/utils/keychain",
"packages/utils/indexer/*"
"packages/utils/indexer/*",
"packages/utils/rateless-iblt"
],
"engines": {
"node": ">=18"
Expand Down Expand Up @@ -87,6 +88,7 @@
"aegir": "github:marcus-pousette/aegir#multiple-assets",
"eslint-config-peerbit": "https://github.com/dao-xyz/eslint-config-peerbit",
"benchmark": "^2.1.4",
"tinybench": "^3",
"chai-as-promised": "^7.1.1",
"dotenv": "^16.4.5",
"eslint-plugin-n": "^17.10.2",
Expand Down
6 changes: 6 additions & 0 deletions packages/clients/peerbit-proxy/proxy/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ export class PeerbitProxyClient implements ProgramClient {
return response.value;
}

async hangUp(
_address: PeerId | PublicSignKey | string | Multiaddr,
): Promise<void> {
throw new Error("Not implemented");
}

get services(): { pubsub: PubSub; blocks: Blocks; keychain: Keychain } {
return this._services;
}
Expand Down
6 changes: 5 additions & 1 deletion packages/clients/peerbit-proxy/proxy/src/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { type PeerId } from "@libp2p/interface";
import { type Multiaddr } from "@multiformats/multiaddr";
import { type AnyStore } from "@peerbit/any-store-interface";
import { type Blocks } from "@peerbit/blocks-interface";
import { Ed25519Keypair } from "@peerbit/crypto";
import { Ed25519Keypair, type PublicSignKey } from "@peerbit/crypto";
import type { Indices } from "@peerbit/indexer-interface";
import { type Keychain } from "@peerbit/keychain";
import { type ProgramClient } from "@peerbit/program";
Expand Down Expand Up @@ -82,6 +82,10 @@ export class PeerbitProxyHost implements ProgramClient {
return this.hostClient.dial(address);
}

hangUp(address: PeerId | PublicSignKey | string | Multiaddr): Promise<void> {
return this.hostClient.hangUp(address);
}

get services(): { pubsub: PubSub; blocks: Blocks; keychain: Keychain } {
return this.hostClient.services;
}
Expand Down
27 changes: 15 additions & 12 deletions packages/clients/peerbit-proxy/window/e2e/browser/child/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const client = await createClient("*");

export const App = () => {
const mounted = useRef<boolean>(false);
const dbRef = useRef<SharedLog>();
const dbRef = useRef<SharedLog<any, any>>();
const [_, forceUpdate] = useReducer((x) => x + 1, 0);
useEffect(() => {
const queryParameters = new URLSearchParams(window.location.search);
Expand All @@ -18,19 +18,22 @@ export const App = () => {
}
mounted.current = true;
client
.open<SharedLog<Uint8Array>>(new SharedLog({ id: new Uint8Array(32) }), {
args: {
onChange: (change: Change<Uint8Array>) => {
forceUpdate();
setTimeout(() => {
dbRef.current?.log.load().then(() => {
forceUpdate();
console.log(client.messages.id, dbRef.current?.log.length);
});
}, 1000);
.open<SharedLog<Uint8Array, any>>(
new SharedLog({ id: new Uint8Array(32) }),
{
args: {
onChange: (change: Change<Uint8Array>) => {
forceUpdate();
setTimeout(() => {
dbRef.current?.log.load().then(() => {
forceUpdate();
console.log(client.messages.id, dbRef.current?.log.length);
});
}, 1000);
},
},
},
})
)
.then((x: any) => {
dbRef.current = x;
if (queryParameters.get("read") !== "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SharedLog } from "@peerbit/shared-log";
@variant("test-log")
export class TestLog extends Program {
@field({ type: SharedLog })
log: SharedLog<Uint8Array>;
log: SharedLog<Uint8Array, any>;

constructor() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ export default defineConfig({
{
command: "yarn --cwd ./child start",
url: "http://localhost:5201",
reuseExistingServer: true,
},
{
command: "yarn --cwd ./parent start",
url: "http://localhost:5202",
reuseExistingServer: true,
},
],
});
2 changes: 1 addition & 1 deletion packages/clients/peerbit-server/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"build-lib": "tsc -p tsconfig.json",
"build-ui": "cd ../frontend && yarn build && cd ../node",
"postbuild": "cp src/nginx-template.conf dist/src/ && cp -r ../frontend/dist/. dist/ui",
"test": "aegir test",
"test": "aegir test --t node",
"lint": "aegir lint"
},
"devDependencies": {
Expand Down
3 changes: 2 additions & 1 deletion packages/clients/peerbit-server/node/test/api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ describe("server", () => {
let node: Peerbit;

afterEach(async () => {
// @ts-ignore
await node?.stop();
// @ts-ignore
server?.close();
});
it("bootstrap on start", async () => {
Expand All @@ -92,7 +94,6 @@ describe("server", () => {
describe("api", () => {
let session: TestSession, peer: ProgramClient, server: http.Server;
let db: PermissionedString;

before(async () => {});

beforeEach(async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/clients/peerbit-server/test-lib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"devDependencies": {
"tty-table": "^4.2.1",
"@peerbit/test-utils": "*",
"libp2p": "^2.2.1"
"libp2p": "^2.3.1"
},
"dependencies": {
"@peerbit/string": "*",
Expand Down
10 changes: 5 additions & 5 deletions packages/clients/peerbit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@
"@peerbit/indexer-sqlite3": "^1.1.4",
"datastore-level": "^11.0.1",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/webrtc": "^5.0.16",
"@libp2p/websockets": "^9.0.11",
"@libp2p/identify": "^3.0.10",
"@libp2p/circuit-relay-v2": "^3.1.0",
"@libp2p/tcp": "^10.0.11",
"@libp2p/webrtc": "^5.0.19",
"@libp2p/websockets": "^9.0.13",
"@libp2p/identify": "^3.0.12",
"@libp2p/circuit-relay-v2": "^3.1.3",
"@libp2p/tcp": "^10.0.13",
"level": "^8.0.1",
"memory-level": "^1.0.0",
"path-browserify": "^1.0.1"
Expand Down
5 changes: 4 additions & 1 deletion packages/clients/peerbit/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ export const createLibp2pExtended = (
): Promise<Libp2pExtended> => {
let extraServices: any = {};

if (!opts.services?.["relay"]) {
if (opts.services?.["relay"] === null) {
delete opts.services?.["relay"];
} else if (!opts.services?.["relay"]) {
const relayComponent = relay();
if (relayComponent) {
// will be null in browser
extraServices["relay"] = relayComponent;
}
}

if (!opts.services?.["identify"]) {
extraServices["identify"] = identify();
}
Expand Down
64 changes: 46 additions & 18 deletions packages/clients/peerbit/src/peer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { privateKeyFromRaw } from "@libp2p/crypto/keys";
import type { PeerId } from "@libp2p/interface";
import "@libp2p/peer-id";
import {
type Multiaddr,
Expand All @@ -10,6 +11,7 @@ import { DirectBlock } from "@peerbit/blocks";
import {
Ed25519Keypair,
Ed25519PublicKey,
PublicSignKey,
Secp256k1Keypair,
getKeypairFromPrivateKey,
} from "@peerbit/crypto";
Expand All @@ -26,7 +28,6 @@ import {
ProgramHandler,
} from "@peerbit/program";
import { DirectSub } from "@peerbit/pubsub";
import { waitFor } from "@peerbit/time";
import { LevelDatastore } from "datastore-level";
import type { Libp2p } from "libp2p";
import sodium from "libsodium-wrappers";
Expand Down Expand Up @@ -119,7 +120,7 @@ export class Peerbit implements ProgramClient {
let libp2pExtended: Libp2pExtended | undefined = (options as Libp2pOptions)
.libp2p as Libp2pExtended;

const asRelay = (options as SimpleLibp2pOptions).relay;
const asRelay = (options as SimpleLibp2pOptions).relay ?? true;

const directory = options.directory;
const hasDir = directory != null;
Expand Down Expand Up @@ -176,19 +177,25 @@ export class Peerbit implements ProgramClient {
: undefined;
}

const services: any = {
keychain: (c: any) => keychain,
blocks: (c: any) =>
new DirectBlock(c, {
canRelayMessage: asRelay,
directory: blocksDirectory,
}),
pubsub: (c: any) => new DirectSub(c, { canRelayMessage: asRelay }),
...extendedOptions?.services,
};

if (!asRelay) {
services.relay = null;
}

libp2pExtended = await createLibp2pExtended({
...extendedOptions,
privateKey,
services: {
keychain: (c: any) => keychain,
blocks: (c: any) =>
new DirectBlock(c, {
canRelayMessage: asRelay,
directory: blocksDirectory,
}),
pubsub: (c: any) => new DirectSub(c, { canRelayMessage: asRelay }),
...extendedOptions?.services,
} as any, // TODO types are funky
services,
datastore,
});
}
Expand Down Expand Up @@ -280,16 +287,37 @@ export class Peerbit implements ProgramClient {
? address
: address.getMultiaddrs();
const connection = await this.libp2p.dial(maddress);

const publicKey = Ed25519PublicKey.fromPeerId(connection.remotePeer);

// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
return (
(await waitFor(
() =>
this.libp2p.services.pubsub.peers.has(publicKey.hashcode()) &&
this.libp2p.services.blocks.peers.has(publicKey.hashcode()),
)) || false
try {
await this.libp2p.services.pubsub.waitFor(publicKey.hashcode(), {
neighbour: true,
});
} catch (error) {
throw new Error(`Failed to dial peer. Not available on Pubsub`);
}

try {
await this.libp2p.services.blocks.waitFor(publicKey.hashcode(), {
neighbour: true,
});
} catch (error) {
throw new Error(`Failed to dial peer. Not available on Blocks`);
}
return true;
}

async hangUp(address: PeerId | PublicSignKey | string | Multiaddr) {
await this.libp2p.hangUp(
address instanceof PublicSignKey
? address.toPeerId()
: typeof address === "string"
? multiaddr(address)
: address,
);
// TODO wait for pubsub and blocks to disconnect?
}

async start() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SeekDelivery } from "@peerbit/stream-interface";
import { waitFor } from "@peerbit/time";
import { waitFor, waitForResolved } from "@peerbit/time";
import { expect } from "chai";
import { Peerbit } from "../src/index.js";

Expand Down Expand Up @@ -52,3 +52,36 @@ describe(`dial`, function () {
);
});
});

describe(`hangup`, function () {
let clients: [Peerbit, Peerbit];

beforeEach(async () => {
clients = [
await Peerbit.create({
relay: false, // https://github.com/libp2p/js-libp2p/issues/2794
}),
await Peerbit.create({
relay: false, // https://github.com/libp2p/js-libp2p/issues/2794
}),
];
});

afterEach(async () => {
await Promise.all(clients.map((c) => c.stop()));
});

it("pubsub subscribers clears up", async () => {
let topic = "topic";
await clients[0].services.pubsub.subscribe(topic);
await clients[1].services.pubsub.subscribe(topic);
await clients[0].dial(clients[1].getMultiaddrs()[0]);
await waitForResolved(() =>
expect(clients[0].services.pubsub.peers.size).to.eq(1),
);
await clients[0].hangUp(clients[1].peerId);
await waitForResolved(() =>
expect(clients[0].services.pubsub.peers.size).to.eq(0),
);
});
});
1 change: 1 addition & 0 deletions packages/clients/peerbit/test/create.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ describe("Create", function () {
const client = await Peerbit.create();
expect(client.services.blocks.canRelayMessage).equal(true);
expect(client.services.pubsub.canRelayMessage).equal(true);
expect(client.services.relay).to.exist;
await client.stop();
});
});
Loading

0 comments on commit 434e140

Please sign in to comment.