Skip to content

Commit

Permalink
Merge pull request #220 from dao-xyz/lazysub
Browse files Browse the repository at this point in the history
Lazsub - The lazy way of getting to know a network
  • Loading branch information
marcus-pousette authored Oct 28, 2023
2 parents 63802e4 + 7768e13 commit 8c30632
Show file tree
Hide file tree
Showing 77 changed files with 5,366 additions and 5,155 deletions.
2 changes: 1 addition & 1 deletion docs/examples/document-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ await peer2.dial(peer.getMultiaddrs());
const store2 = await peer2.open<PostsDB>(store.address!);

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn playwright install --with-deps && yarn test:node --roots ./packages/clients...

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn test:node --roots ./packages/programs ./docs ./packages/log --w 2)

Forbidden non-null assertion

Check warning on line 80 in docs/examples/document-store.ts

View workflow job for this annotation

GitHub Actions / test_push (18.x, yarn test:node --roots ./packages/transport ./packages/utils --w 2)

Forbidden non-null assertion

// Wait for peer1 to be reachable for query
await store.waitFor(peer2.peerId);
await store.posts.log.waitForReplicator(peer2.identity.publicKey);

const responses: Post[] = await store2.posts.index.search(
new SearchRequest({
Expand Down
3 changes: 2 additions & 1 deletion docs/modules/client/bootstrap.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
it("bootstrap", async () => {
await import("./bootstrap.js");
// TMP disable until bootstrap nodes have migrated
// await import("./bootstrap.js");
});
1 change: 1 addition & 0 deletions docs/modules/client/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ import { Peerbit } from "peerbit";

const peer = await Peerbit.create();
await peer.bootstrap();
await peer.stop();
3 changes: 2 additions & 1 deletion docs/modules/client/connectivity-relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
it("connectivity", async () => {
await import("./connectivity-relay.js");
// TMP disable until bootstrap nodes have migrated
// await import("./connectivity-relay.js");
});
4 changes: 2 additions & 2 deletions docs/modules/program/composition/composition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ const forum2 = await client2.open<Forum>(forum.address, {
});

// Wait for client 1 to be available (only needed for testing locally)
await forum2.waitFor(client.peerId);
await forum2.channels.log.waitForReplicator(client.identity.publicKey);

// find channels from the forum from client2 perspective
const channels = await forum2.channels.index.search(new SearchRequest());
Expand All @@ -133,7 +133,7 @@ const channel2 = await client2.open<Channel>(channels[0], {
});

// Wait for client 1 to be available (only needed for testing locally)
await channel2.waitFor(client.peerId);
await channel2.db.posts.log.waitForReplicator(client.identity.publicKey);

// find messages
const messages = await channel2.db.posts.index.search(new SearchRequest());
Expand Down
7 changes: 2 additions & 5 deletions docs/modules/program/rpc/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,15 @@ class RPCTest extends Program<Args> {
? (hello, from) => {
return new World();
}
: undefined, // only create a response handler if we are to respond to requests
subscriptionData: args?.role ? serialize(args.role) : undefined
: undefined // only create a response handler if we are to respond to requests
});
}

async getAllResponders(): Promise<PublicSignKey[]> {
const allSubscribers = await this.node.services.pubsub.getSubscribers(
this.rpc.rpcTopic
);
return [...(allSubscribers ? allSubscribers.values() : [])]
.filter((x) => x.data && equals(x.data, serialize(new Responder())))
.map((x) => x.publicKey);
return allSubscribers || [];
}
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-prettier": "^5.0.0",
"gh-pages": "^5.0.0",
"jest": "^29.6.4",
"jest-extended": "^4.0.1",
"jest": "^29.7.0",
"jest-extended": "^4.0.2",
"json": "^11.0.0",
"lerna": "^7.2.0",
"prettier": "^3.0.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ describe("index", () => {
await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))!.size
(await client1.services.pubsub.getSubscribers("topic"))!.length
).toEqual(1)
);
await client1.services.pubsub.publish(data, { topics: ["topic"] });
Expand Down Expand Up @@ -315,8 +315,8 @@ describe("index", () => {
await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))?.get(
client2.identity.publicKey.hashcode()
(await client1.services.pubsub.getSubscribers("topic"))?.find((x) =>
x.equals(client2.identity.publicKey)
)
).toBeDefined()
);
Expand All @@ -325,7 +325,7 @@ describe("index", () => {
it("requestSubsribers", async () => {
let receivedMessages: (GetSubscribers | undefined)[] = [];
await client2.services.pubsub.addEventListener("message", (message) => {
if (message.detail instanceof DataMessage) {
if (message.detail instanceof DataMessage && message.detail.data) {
receivedMessages.push(
deserialize(message.detail.data, GetSubscribers)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import * as connection from "../connection.js";
import { EventEmitter, CustomEvent } from "@libp2p/interface/events";
import { CustomEvent } from "@libp2p/interface/events";
import type { TypedEventTarget } from "@libp2p/interface/events";

export class EventEmitterNode extends connection.MessageNode {
constructor(
readonly eventEmitter: EventEmitter<{
readonly eventEmitter: TypedEventTarget<{
hello: CustomEvent<connection.Hello>;
data: CustomEvent<connection.DataMessage>;
}>
Expand Down
6 changes: 3 additions & 3 deletions packages/clients/peerbit-proxy/interface/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export class PeerbitProxyClient implements ProgramClient {
const resp = await this.request<pubsub.RESP_GetSubscribers>(
new pubsub.REQ_GetSubscribers(topic)
);
return resp.map || undefined;
return resp.subscribers;
},
publish: async (data, options) => {
const resp = await this.request<pubsub.RESP_Publish>(
Expand All @@ -208,9 +208,9 @@ export class PeerbitProxyClient implements ProgramClient {
new pubsub.REQ_RequestSubscribers(topic)
);
},
subscribe: async (topic, options) => {
subscribe: async (topic) => {
await this.request<pubsub.RESP_Subscribe>(
new pubsub.REQ_Subscribe(topic, options)
new pubsub.REQ_Subscribe(topic)
);
},
unsubscribe: async (topic, options) => {
Expand Down
4 changes: 1 addition & 3 deletions packages/clients/peerbit-proxy/interface/src/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ export class PeerbitProxyHost implements ProgramClient {
await this.services.pubsub.requestSubscribers(message.topic);
await this.respond(message, new pubsub.RESP_RequestSubscribers(), from);
} else if (message instanceof pubsub.REQ_Subscribe) {
await this.services.pubsub.subscribe(message.topic, {
data: message.data
});
await this.services.pubsub.subscribe(message.topic);

let set = this._pubsubTopicSubscriptions.get(from.id);
if (!set) {
Expand Down
38 changes: 6 additions & 32 deletions packages/clients/peerbit-proxy/interface/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,12 @@ export class REQ_GetSubscribers extends PubSubMessage {

@variant(1)
export class RESP_GetSubscribers extends PubSubMessage {
@field({ type: option(vec(SubscriptionData)) })
data?: SubscriptionData[];
@field({ type: option(vec(PublicSignKey)) })
subscribers?: PublicSignKey[];

constructor(map?: Map<string, SubscriptionData>) {
constructor(subscribers?: PublicSignKey[]) {
super();
if (map) {
this.data = [];
for (const [k, v] of map.entries()) {
this.data.push(v);
}
}
}

_map: Map<string, SubscriptionData> | null | undefined;
get map() {
if (this._map !== undefined) {
return this._map;
}
if (this.data) {
const map = new Map();
for (const [i, data] of this.data.entries()) {
map.set(data.publicKey.hashcode(), data);
}
return (this._map = map);
} else {
return (this._map = null);
}
this.subscribers = subscribers;
}
}

Expand Down Expand Up @@ -121,13 +100,9 @@ export class REQ_Subscribe extends PubSubMessage {
@field({ type: "string" })
topic: string;

@field({ type: option(Uint8Array) })
data?: Uint8Array;

constructor(topic: string, options?: { data?: Uint8Array }) {
constructor(topic: string) {
super();
this.topic = topic;
this.data = options?.data;
}
}

Expand All @@ -136,11 +111,10 @@ export class RESP_Subscribe extends PubSubMessage {}

@variant(8)
export class REQ_Unsubscribe extends PubSubMessage {
constructor(topic: string, options?: { force?: boolean; data?: Uint8Array }) {
constructor(topic: string, options?: { force?: boolean }) {
super();
this.topic = topic;
this.force = options?.force;
this.data = options?.data;
}
@field({ type: "string" })
topic: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"devDependencies": {
"@types/react": "^18.2.12",
"@types/react-dom": "^18.2.5",
"@vitejs/plugin-react": "^4.0.4",
"@vitejs/plugin-react": "^4.1.0",
"vite": "^4.4.9"
},
"browserslist": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"devDependencies": {
"@types/react": "^18.2.12",
"@types/react-dom": "^18.2.5",
"@vitejs/plugin-react": "^4.0.4",
"@vitejs/plugin-react": "^4.1.0",
"vite": "^4.4.9"
},
"browserslist": {
Expand Down
2 changes: 1 addition & 1 deletion packages/clients/peerbit-server/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"devDependencies": {
"@types/react": "^18.2.12",
"@types/react-dom": "^18.2.5",
"@vitejs/plugin-react": "^4.0.4",
"@vitejs/plugin-react": "^4.1.0",
"vite": "^4.4.9"
},
"browserslist": {
Expand Down
2 changes: 1 addition & 1 deletion packages/clients/peerbit-server/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"devDependencies": {
"@peerbit/test-lib": "^0.0.1",
"@peerbit/test-utils": "1.0.33",
"@types/yargs": "^17.0.24",
"@types/yargs": "17.0.24",
"aws-sdk": "^2.1259.0",
"dotenv": "^16.1.4",
"@types/tmp": "^0.2.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,9 @@ describe("libp2p only", () => {
});

beforeEach(async () => {
session.peers[0].services.pubsub.subscribe("1", {
data: new Uint8Array([1])
});
session.peers[0].services.pubsub.subscribe("2", {
data: new Uint8Array([2])
});
session.peers[0].services.pubsub.subscribe("3", {
data: new Uint8Array([3])
});
session.peers[0].services.pubsub.subscribe("1");
session.peers[0].services.pubsub.subscribe("2");
session.peers[0].services.pubsub.subscribe("3");
configDirectory = path.join(
__dirname,
"tmp",
Expand Down Expand Up @@ -86,13 +80,14 @@ describe("server", () => {
server?.close();
});
it("bootstrap on start", async () => {
let result = await startServerWithNode({
bootstrap: true,
directory: path.join(__dirname, "tmp", "api-test", "server", uuid())
});
node = result.node;
server = result.server;
expect(node.libp2p.services.pubsub.peers.size).toBeGreaterThan(0);
// TMP disable until bootstrap nodes have migrated
/* let result = await startServerWithNode({
bootstrap: true,
directory: path.join(__dirname, "tmp", "api-test", "server", uuid())
});
node = result.node;
server = result.server;
expect(node.libp2p.services.pubsub.peers.size).toBeGreaterThan(0); */
});
});
describe("api", () => {
Expand Down Expand Up @@ -227,14 +222,15 @@ describe("server", () => {
});

it("bootstrap", async () => {
expect((session.peers[0] as Peerbit).services.pubsub.peers.size).toEqual(
// TMP disable until bootstrap nodes have migrated
/* expect((session.peers[0] as Peerbit).services.pubsub.peers.size).toEqual(
0
);
const c = await client(session.peers[0].identity);
await c.network.bootstrap();
expect(
(session.peers[0] as Peerbit).services.pubsub.peers.size
).toBeGreaterThan(0);
).toBeGreaterThan(0); */
});

/* TODO how to test this properly? Seems to hang once we added 'sudo --prefix __dirname' to the npm install in the child_process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ const __dirname = dirname(fileURLToPath(import.meta.url));

const modulesPath = path.join(__dirname, "./tmp/cli-test/modules");

const runCommandAsync = (args): ProcessWithOut => {
const cmd = `node --loader ts-node/esm ${path.join(
const runCommandProcess = (args): ProcessWithOut => {
const cmd = `node --experimental-vm-modules ${path.join(
__dirname,
"../",
"bin.ts"
"../../",
"lib",
"esm",
"bin.js"
)} ${args}`;
const p = exec(
cmd /* , { env: { ...process.env, "PEERBIT_MODULES_PATH": modulesPath } } */
Expand All @@ -30,10 +32,12 @@ const runCommandAsync = (args): ProcessWithOut => {
};

const runCommand = (args): string => {
const cmd = `node --loader ts-node/esm ${path.join(
const cmd = `node --experimental-vm-modules ${path.join(
__dirname,
"../",
"bin.ts"
"../../",
"lib",
"esm",
"bin.js"
)} ${args}`;
return execSync(cmd).toString();
};
Expand Down Expand Up @@ -102,7 +106,7 @@ describe("cli", () => {
let PORT = 9993;

const start = async (extraArgs: string = "") => {
const cmd = runCommandAsync(
const cmd = runCommandProcess(
`start --reset --port-api ${PORT} --port-node 0 --directory ${configDirectory} ${extraArgs}`
);
processes.push(cmd);
Expand All @@ -127,24 +131,14 @@ describe("cli", () => {
remote: string = LOCAL_REMOTE_NAME,
address: string = "http://localhost:" + PORT
): void => {
execSync(
`node --loader ts-node/esm ${path.join(
__dirname,
"../",
"bin.ts"
)} remote add ${remote} ${address} --directory ${configDirectory}`
runCommand(
`remote add ${remote} ${address} --directory ${configDirectory}`
);
};

const connect = (remote: string = LOCAL_REMOTE_NAME): ProcessWithOut => {
const p = getProcessWithOut(
exec(
`node --loader ts-node/esm ${path.join(
__dirname,
"../",
"bin.ts"
)} remote connect ${remote} --directory ${configDirectory}`
)
const p = runCommandProcess(
`remote connect ${remote} --directory ${configDirectory}`
);
processes.push(p);
return p;
Expand Down Expand Up @@ -320,6 +314,7 @@ describe("cli", () => {
);
await checkPeerId(terminal);
});

/*
it("re-opens on restart", async () => {
const server = await start();
Expand Down
2 changes: 1 addition & 1 deletion packages/clients/peerbit-server/node/src/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export const launchNodes = async (properties: {
new RunInstancesCommand({
ImageId: AWS_LINUX_ARM_AMIs[regionString],
SecurityGroupIds: [securityGroupOut.GroupId!],
InstanceType: "t4g." + (properties.size || "micro"),
InstanceType: ("t4g." + (properties.size || "micro")) as any, // TODO types
UserData: Buffer.from(
setupUserData(properties.email, properties.grantAccess)
).toString("base64"),
Expand Down
Loading

0 comments on commit 8c30632

Please sign in to comment.