Skip to content

Commit

Permalink
feat!: lazy stream routing protocol
Browse files Browse the repository at this point in the history
- PubSub and the block swap protocol has been improved with a lazy stream protocol that allows you only to have knowledge about peers you talk with and no-one else.
- Routing is made more efficient by tracking delivery times
- Redundancy parameters can be used to make delivery more resiliant to unreliable networks
  • Loading branch information
marcus-pousette committed Oct 24, 2023
1 parent 422822a commit d12eb28
Show file tree
Hide file tree
Showing 54 changed files with 3,476 additions and 3,415 deletions.
2 changes: 1 addition & 1 deletion docs/examples/document-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ await peer2.dial(peer.getMultiaddrs());
const store2 = await peer2.open<PostsDB>(store.address!);

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn test:node --roots ./packages/transport ./packages/utils --w 2)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / Test (Node v18.x, OS ubuntu-22.04)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn test:node --roots ./packages/programs ./docs ./packages/log --w 2)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / Test (Node v18.x, OS ubuntu-22.04)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / Test (Node v18.x, OS ubuntu-22.04)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn playwright install --with-deps && yarn test:node --roots ./packages/clients...

Forbidden non-null assertion

// Wait for peer1 to be reachable for query
await store.waitFor(peer2.peerId);
await store.posts.log.waitForReplicator(peer2.identity.publicKey);

const responses: Post[] = await store2.posts.index.search(
new SearchRequest({
Expand Down
3 changes: 2 additions & 1 deletion docs/modules/client/bootstrap.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
it("bootstrap", async () => {
await import("./bootstrap.js");
// TMP disable until bootstrap nodes have migrated
// await import("./bootstrap.js");
});
1 change: 1 addition & 0 deletions docs/modules/client/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ import { Peerbit } from "peerbit";

const peer = await Peerbit.create();
await peer.bootstrap();
await peer.stop();
3 changes: 2 additions & 1 deletion docs/modules/client/connectivity-relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
it("connectivity", async () => {
await import("./connectivity-relay.js");
// TMP disable until bootstrap nodes have migrated
// await import("./connectivity-relay.js");
});
7 changes: 2 additions & 5 deletions docs/modules/program/rpc/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,15 @@ class RPCTest extends Program<Args> {
? (hello, from) => {
return new World();
}
: undefined, // only create a response handler if we are to respond to requests
subscriptionData: args?.role ? serialize(args.role) : undefined
: undefined // only create a response handler if we are to respond to requests
});
}

async getAllResponders(): Promise<PublicSignKey[]> {
const allSubscribers = await this.node.services.pubsub.getSubscribers(
this.rpc.rpcTopic
);
return [...(allSubscribers ? allSubscribers.values() : [])]
.filter((x) => x.data && equals(x.data, serialize(new Responder())))
.map((x) => x.publicKey);
return allSubscribers || [];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ describe("index", () => {
await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))!.size
(await client1.services.pubsub.getSubscribers("topic"))!.length
).toEqual(1)
);
await client1.services.pubsub.publish(data, { topics: ["topic"] });
Expand Down Expand Up @@ -315,8 +315,8 @@ describe("index", () => {
await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))?.get(
client2.identity.publicKey.hashcode()
(await client1.services.pubsub.getSubscribers("topic"))?.find((x) =>
x.equals(client2.identity.publicKey)
)
).toBeDefined()
);
Expand All @@ -325,7 +325,7 @@ describe("index", () => {
it("requestSubsribers", async () => {
let receivedMessages: (GetSubscribers | undefined)[] = [];
await client2.services.pubsub.addEventListener("message", (message) => {
if (message.detail instanceof DataMessage) {
if (message.detail instanceof DataMessage && message.detail.data) {
receivedMessages.push(
deserialize(message.detail.data, GetSubscribers)
);
Expand Down
6 changes: 3 additions & 3 deletions packages/clients/peerbit-proxy/interface/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export class PeerbitProxyClient implements ProgramClient {
const resp = await this.request<pubsub.RESP_GetSubscribers>(
new pubsub.REQ_GetSubscribers(topic)
);
return resp.map || undefined;
return resp.subscribers;
},
publish: async (data, options) => {
const resp = await this.request<pubsub.RESP_Publish>(
Expand All @@ -208,9 +208,9 @@ export class PeerbitProxyClient implements ProgramClient {
new pubsub.REQ_RequestSubscribers(topic)
);
},
subscribe: async (topic, options) => {
subscribe: async (topic) => {
await this.request<pubsub.RESP_Subscribe>(
new pubsub.REQ_Subscribe(topic, options)
new pubsub.REQ_Subscribe(topic)
);
},
unsubscribe: async (topic, options) => {
Expand Down
4 changes: 1 addition & 3 deletions packages/clients/peerbit-proxy/interface/src/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ export class PeerbitProxyHost implements ProgramClient {
await this.services.pubsub.requestSubscribers(message.topic);
await this.respond(message, new pubsub.RESP_RequestSubscribers(), from);
} else if (message instanceof pubsub.REQ_Subscribe) {
await this.services.pubsub.subscribe(message.topic, {
data: message.data
});
await this.services.pubsub.subscribe(message.topic);

let set = this._pubsubTopicSubscriptions.get(from.id);
if (!set) {
Expand Down
38 changes: 6 additions & 32 deletions packages/clients/peerbit-proxy/interface/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,12 @@ export class REQ_GetSubscribers extends PubSubMessage {

@variant(1)
export class RESP_GetSubscribers extends PubSubMessage {
@field({ type: option(vec(SubscriptionData)) })
data?: SubscriptionData[];
@field({ type: option(vec(PublicSignKey)) })
subscribers?: PublicSignKey[];

constructor(map?: Map<string, SubscriptionData>) {
constructor(subscribers?: PublicSignKey[]) {
super();
if (map) {
this.data = [];
for (const [k, v] of map.entries()) {
this.data.push(v);
}
}
}

_map: Map<string, SubscriptionData> | null | undefined;
get map() {
if (this._map !== undefined) {
return this._map;
}
if (this.data) {
const map = new Map();
for (const [i, data] of this.data.entries()) {
map.set(data.publicKey.hashcode(), data);
}
return (this._map = map);
} else {
return (this._map = null);
}
this.subscribers = subscribers;
}
}

Expand Down Expand Up @@ -121,13 +100,9 @@ export class REQ_Subscribe extends PubSubMessage {
@field({ type: "string" })
topic: string;

@field({ type: option(Uint8Array) })
data?: Uint8Array;

constructor(topic: string, options?: { data?: Uint8Array }) {
constructor(topic: string) {
super();
this.topic = topic;
this.data = options?.data;
}
}

Expand All @@ -136,11 +111,10 @@ export class RESP_Subscribe extends PubSubMessage {}

@variant(8)
export class REQ_Unsubscribe extends PubSubMessage {
constructor(topic: string, options?: { force?: boolean; data?: Uint8Array }) {
constructor(topic: string, options?: { force?: boolean }) {
super();
this.topic = topic;
this.force = options?.force;
this.data = options?.data;
}
@field({ type: "string" })
topic: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,9 @@ describe("libp2p only", () => {
});

beforeEach(async () => {
session.peers[0].services.pubsub.subscribe("1", {
data: new Uint8Array([1])
});
session.peers[0].services.pubsub.subscribe("2", {
data: new Uint8Array([2])
});
session.peers[0].services.pubsub.subscribe("3", {
data: new Uint8Array([3])
});
session.peers[0].services.pubsub.subscribe("1");
session.peers[0].services.pubsub.subscribe("2");
session.peers[0].services.pubsub.subscribe("3");
configDirectory = path.join(
__dirname,
"tmp",
Expand Down Expand Up @@ -86,13 +80,14 @@ describe("server", () => {
server?.close();
});
it("bootstrap on start", async () => {
let result = await startServerWithNode({
bootstrap: true,
directory: path.join(__dirname, "tmp", "api-test", "server", uuid())
});
node = result.node;
server = result.server;
expect(node.libp2p.services.pubsub.peers.size).toBeGreaterThan(0);
// TMP disable until bootstrap nodes have migrated
/* let result = await startServerWithNode({
bootstrap: true,
directory: path.join(__dirname, "tmp", "api-test", "server", uuid())
});
node = result.node;
server = result.server;
expect(node.libp2p.services.pubsub.peers.size).toBeGreaterThan(0); */
});
});
describe("api", () => {
Expand Down Expand Up @@ -227,14 +222,15 @@ describe("server", () => {
});

it("bootstrap", async () => {
expect((session.peers[0] as Peerbit).services.pubsub.peers.size).toEqual(
// TMP disable until bootstrap nodes have migrated
/* expect((session.peers[0] as Peerbit).services.pubsub.peers.size).toEqual(
0
);
const c = await client(session.peers[0].identity);
await c.network.bootstrap();
expect(
(session.peers[0] as Peerbit).services.pubsub.peers.size
).toBeGreaterThan(0);
).toBeGreaterThan(0); */
});

/* TODO how to test this properly? Seems to hang once we added 'sudo --prefix __dirname' to the npm install in the child_process
Expand Down
4 changes: 2 additions & 2 deletions packages/clients/peerbit-server/node/src/docker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { delay, waitForAsync } from "@peerbit/time";
import { delay, waitFor } from "@peerbit/time";

export const installDocker = async () => {
const { exec } = await import("child_process");
Expand Down Expand Up @@ -31,7 +31,7 @@ export const installDocker = async () => {
});

try {
await waitForAsync(() => dockerExist(), {
await waitFor(() => dockerExist(), {
timeout: 30 * 1000,
delayInterval: 1000
});
Expand Down
4 changes: 2 additions & 2 deletions packages/clients/peerbit-server/node/src/domain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { waitFor, waitForAsync } from "@peerbit/time";
import { waitFor } from "@peerbit/time";

const isNode = typeof window === undefined || typeof window === "undefined";

Expand Down Expand Up @@ -179,7 +179,7 @@ export const startCertbot = async (
const { default: axios } = await import("axios");

console.log("Waiting for domain to be ready ...");
await waitForAsync(
await waitFor(
async () => {
try {
const status = (await axios.get("https://" + domain)).status;
Expand Down
5 changes: 3 additions & 2 deletions packages/clients/peerbit/src/__tests__/bootstrap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ describe("bootstrap", () => {
});

it("remote", async () => {
await peer.bootstrap();
expect(peer.libp2p.services.pubsub.peers.size).toBeGreaterThan(0);
// TMP disable until bootstrap nodes have migrated
/* await peer.bootstrap();
expect(peer.libp2p.services.pubsub.peers.size).toBeGreaterThan(0); */
});
});
12 changes: 7 additions & 5 deletions packages/clients/test-utils/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TestSession } from "../session.js";
import { waitFor, waitForAsync } from "@peerbit/time";
import { waitFor } from "@peerbit/time";

describe("session", () => {
let session: TestSession;
Expand All @@ -24,13 +24,15 @@ describe("session", () => {
session.peers[2].services.pubsub.addEventListener("data", (evt) => {
result = evt.detail;
});
await waitForAsync(
await waitFor(
async () =>
(await session.peers[0].services.pubsub.getSubscribers("x"))?.size === 2
(await session.peers[0].services.pubsub.getSubscribers("x"))?.length ===
3
);
await waitForAsync(
await waitFor(
async () =>
(await session.peers[1].services.pubsub.getSubscribers("x"))?.size === 2
(await session.peers[1].services.pubsub.getSubscribers("x"))?.length ===
3
);

session.peers[0].services.pubsub.publish(new Uint8Array([1, 2, 3]), {
Expand Down
4 changes: 2 additions & 2 deletions packages/log/src/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
variant,
vec
} from "@dao-xyz/borsh";
import { waitForAsync } from "@peerbit/time";
import { waitFor } from "@peerbit/time";
import { AnyStore } from "@peerbit/any-store";
import { logger } from "./logger.js";

Expand Down Expand Up @@ -68,7 +68,7 @@ export const save = async <T>(
writer.string(snapshot);
await cache.put(snapshotPath, writer.finalize());

await waitForAsync(async () => (await cache.get(snapshotPath)) != null, {
await waitFor(async () => (await cache.get(snapshotPath)) != null, {
delayInterval: 200,
timeout: 10 * 1000
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ describe("index", () => {
args: { role: new Observer() }
});

await l0b.waitFor(session.peers[0].peerId);
await l0b.store.log.waitForReplicator(
session.peers[0].identity.publicKey
);

const q = async (): Promise<Document[]> => {
return l0b.store.index.search(
Expand Down Expand Up @@ -362,17 +364,6 @@ describe("index", () => {
);
await waitFor(() => l0b.accessController.access.index.size === 1);

const abc = await l0a.store.index.search(
new SearchRequest({
query: [
new StringMatch({
key: "id",
value: "1"
})
]
})
);

const result = await q();
expect(result.length).toBeGreaterThan(0); // Because read access
});
Expand Down Expand Up @@ -411,7 +402,9 @@ describe("index", () => {
});

// Allow all for easy query
await l0b.waitFor(session.peers[0].peerId);
await l0b.accessController.access.log.waitForReplicator(
session.peers[0].identity.publicKey
);
await l0b.accessController.access.log.log.join(
await l0a.accessController.access.log.log.getHeads()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
TrustedNetwork,
IdentityGraph
} from "..";
import { waitFor, waitForResolved } from "@peerbit/time";
import { delay, waitFor, waitForResolved } from "@peerbit/time";
import { AccessError, Ed25519Keypair, Identity } from "@peerbit/crypto";
import { Secp256k1PublicKey } from "@peerbit/crypto";
import { Entry } from "@peerbit/log";
Expand Down Expand Up @@ -200,7 +200,7 @@ describe("index", () => {
session.peers[3]
);

await l0c.waitFor(session.peers[0].peerId, session.peers[1].peerId);
await l0c.waitFor([session.peers[0].peerId, session.peers[1].peerId]);

await l0a.add(session.peers[1].peerId);

Expand All @@ -218,8 +218,11 @@ describe("index", () => {

await waitFor(() => l0b.trustGraph.index.size == 2);
await waitFor(() => l0a.trustGraph.index.size == 2);
await l0c.waitFor(session.peers[0].peerId);
await l0c.waitFor(session.peers[1].peerId);

await l0c.trustGraph.log.waitForReplicator(
session.peers[0].identity.publicKey,
session.peers[1].identity.publicKey
);

// Try query with trusted
let responses: IdentityRelation[] = await l0c.trustGraph.index.search(
Expand Down
Loading

0 comments on commit d12eb28

Please sign in to comment.