Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Oct 9, 2023
1 parent 6667bfb commit 2ea95e5
Show file tree
Hide file tree
Showing 16 changed files with 1,121 additions and 797 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-prettier": "^5.0.0",
"gh-pages": "^5.0.0",
"jest": "^29.6.4",
"jest": "^29.7.0",
"jest-extended": "^4.0.1",
"json": "^11.0.0",
"lerna": "^7.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/programs/clock-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
"devDependencies": {
"@peerbit/test-utils": "^1.0.33"
}
}
}
2 changes: 1 addition & 1 deletion packages/transport/blocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@
"@ipld/dag-cbor": "^9.0.2",
"libp2p": "0.46.9"
}
}
}
2 changes: 1 addition & 1 deletion packages/transport/blocks/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class DirectBlock extends DirectStream implements IBlocks {
messageProcessingConcurrency?: number;
}
) {
super(components, ["direct-block/1.0.0"], {
super(components, ["/lazyblock/1.0.0"], {
emitSelf: false,
signaturePolicy: "StrictNoSign",
messageProcessingConcurrency: options?.messageProcessingConcurrency || 10,
Expand Down
2 changes: 1 addition & 1 deletion packages/transport/libp2p-test-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@
"localMaintainers": [
"dao.xyz"
]
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TestSession } from "../session";
import { TestSession } from "../session.js";

it("connect", async () => {
const session = await TestSession.connected(3);
Expand Down
2 changes: 1 addition & 1 deletion packages/transport/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@
"@libp2p/interfaces": "^3.3.2",
"libp2p": "0.46.9"
}
}
}
103 changes: 54 additions & 49 deletions packages/transport/pubsub/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,22 @@ const createSubscriptionMetrics = (pubsub: DirectSub) => {
const collectDataWrites = (client: DirectSub) => {
const writes: Map<string, PubSubData[]> = new Map();
for (const [name, peer] of client.peers) {
writes.set(name, [])
const writeFn = peer.write.bind(peer)
writes.set(name, []);
const writeFn = peer.write.bind(peer);
peer.write = (data) => {
const bytes = data instanceof Uint8Array ? data : data.subarray();
const message = deserialize(bytes, Message)
const message = deserialize(bytes, Message);
if (message instanceof DataMessage) {
const pubsubData = deserialize(message.data, PubSubMessage);
if (pubsubData instanceof PubSubData) {
writes.get(name)?.push(pubsubData)
writes.get(name)?.push(pubsubData);
}

}
return writeFn(data)
}
return writeFn(data);
};
}
return writes;

}
};
const createMetrics = (pubsub: DirectSub) => {
const m: {
stream: DirectSub;
Expand Down Expand Up @@ -114,8 +112,6 @@ const createMetrics = (pubsub: DirectSub) => {
return relayMessageFn(from, message, to);
};



return m;
};

Expand Down Expand Up @@ -342,7 +338,6 @@ describe("pubsub", function () {
await delay(3000); // wait some more time to make sure we dont get more messages
expect(streams[2].received).toHaveLength(1);
expect(streams[1].received).toHaveLength(0);

});
it("can send as non subscribeer", async () => {
streams[0].stream.unsubscribe(TOPIC);
Expand Down Expand Up @@ -503,7 +498,7 @@ describe("pubsub", function () {

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "world";
beforeAll(async () => { });
beforeAll(async () => {});
beforeEach(async () => {
session = await TestSession.disconnected(5, {
services: {
Expand Down Expand Up @@ -544,7 +539,7 @@ describe("pubsub", function () {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});
afterAll(async () => { });
afterAll(async () => {});

it("will publish on routes", async () => {
streams[3].received = [];
Expand All @@ -566,7 +561,6 @@ describe("pubsub", function () {
});

describe("line", () => {

/*
┌─┐
│0│ // Sender of message
Expand All @@ -584,7 +578,7 @@ describe("pubsub", function () {

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "world";
beforeAll(async () => { });
beforeAll(async () => {});
beforeEach(async () => {
session = await TestSession.disconnected(3, {
services: {
Expand Down Expand Up @@ -623,7 +617,7 @@ describe("pubsub", function () {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});
afterAll(async () => { });
afterAll(async () => {});

it("will not forward unless necessary", async () => {
streams[1].received = [];
Expand All @@ -645,7 +639,6 @@ describe("pubsub", function () {
});

describe("4 connected", () => {

/*
┌───┐
│ 0 │
Expand All @@ -661,7 +654,6 @@ describe("pubsub", function () {
└────┘
*/


let session: TestSession<{ pubsub: DirectSub }>;
let streams: ReturnType<typeof createMetrics>[];

Expand All @@ -680,12 +672,11 @@ describe("pubsub", function () {
}
});


await session.connect([
[session.peers[0], session.peers[1]],
[session.peers[0], session.peers[2]],
[session.peers[1], session.peers[3]],
[session.peers[2], session.peers[3]],
[session.peers[2], session.peers[3]]
]);

streams = [];
Expand All @@ -697,86 +688,100 @@ describe("pubsub", function () {
}
}


for (const [i, peer] of streams.entries()) {
if ([0, 3].includes(i)) {
await peer.stream.requestSubscribers(TOPIC);
await waitFor(
() => peer.stream.getSubscribers(TOPIC)?.size === 1)
await waitFor(() => peer.stream.getSubscribers(TOPIC)?.size === 1);
}
}


})
});

afterEach(async () => {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});

it("will not send messages back another route", async () => {

// if '0' pushes data on TOPIC to '3'
// there is no point for '3' to send messages back to '0'

const allWrites = streams.map(stream => collectDataWrites(stream.stream))
console.log(streams[0].stream.publicKeyHash)
// there is no point for '3' to send messages back to '0'

const allWrites = streams.map((stream) =>
collectDataWrites(stream.stream)
);
console.log(streams[0].stream.publicKeyHash);

await delay(PING_INTERVAL * 6)
await delay(PING_INTERVAL * 6);

for (const stream of streams) {
for (let i = 0; i < streams.length; i++) {
for (let j = 0; j < streams.length; j++) {
if (j !== i) {
const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash);
const data = stream.stream.routes.getLinkData(
streams[i].stream.publicKeyHash,
streams[j].stream.publicKeyHash
);
if (data) {
expect(data?.weight).toBeLessThan(1e4);
}
}
}

}
}



let count = 1;
for (let i = 0; i < count; i++) {
await streams[0].stream.publish(data, { topics: [TOPIC] });
}

await waitForResolved(() => expect(streams[3].received).toHaveLength(count));
await waitForResolved(() =>
expect(streams[3].received).toHaveLength(count)
);

const p330 = streams[3].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p320 = streams[3].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p310 = streams[3].stream.routes.getPath(streams[1].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p330 = streams[3].stream.routes.getPath(
streams[3].stream.publicKeyHash,
streams[0].stream.publicKeyHash
);
const p320 = streams[3].stream.routes.getPath(
streams[2].stream.publicKeyHash,
streams[0].stream.publicKeyHash
);
const p310 = streams[3].stream.routes.getPath(
streams[1].stream.publicKeyHash,
streams[0].stream.publicKeyHash
);

const p230 = streams[2].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p220 = streams[2].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p230 = streams[2].stream.routes.getPath(
streams[3].stream.publicKeyHash,
streams[0].stream.publicKeyHash
);
const p220 = streams[2].stream.routes.getPath(
streams[2].stream.publicKeyHash,
streams[0].stream.publicKeyHash
);

for (const stream of streams) {
for (let i = 0; i < streams.length; i++) {
for (let j = 0; j < streams.length; j++) {
if (j !== i) {
const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash);
const data = stream.stream.routes.getLinkData(
streams[i].stream.publicKeyHash,
streams[j].stream.publicKeyHash
);
if (data) {
expect(data?.weight).toBeLessThan(1e4);
}
}
}

}
}

for (const [peer, writes] of allWrites[3]) {
expect(writes).toHaveLength(0)
expect(writes).toHaveLength(0);
}
})
})
});
});
});


// test sending "0" to "3" only 1 message should appear even though not in strict mode

describe("join/leave", () => {
Expand Down
Loading

0 comments on commit 2ea95e5

Please sign in to comment.