Skip to content

Commit

Permalink
wiP
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Oct 6, 2023
1 parent 63802e4 commit 6667bfb
Show file tree
Hide file tree
Showing 15 changed files with 1,072 additions and 1,410 deletions.
5 changes: 2 additions & 3 deletions packages/programs/clock-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"@peerbit/trusted-network": "2.0.21"
},
"devDependencies": {
"@peerbit/test-utils": "^1.0.33",
"memory-level": "^1.0.0"
"@peerbit/test-utils": "^1.0.33"
}
}
}
6 changes: 2 additions & 4 deletions packages/transport/blocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
"@peerbit/blocks-interface": "1.1.3",
"@peerbit/crypto": "1.0.10",
"@ipld/dag-cbor": "^9.0.2",
"abstract-level": "^1.0.3",
"libp2p": "0.46.9",
"memory-level": "^1.0.0"
"libp2p": "0.46.9"
}
}
}
6 changes: 2 additions & 4 deletions packages/transport/libp2p-test-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@
"@libp2p/tcp": "^8.0.5",
"@libp2p/webrtc": "^3.1.9",
"@libp2p/websockets": "^7.0.5",
"datastore-level": "^10.1.4",
"libp2p": "0.46.9",
"memory-level": "^1.0.0"
"libp2p": "0.46.9"
},
"devDependencies": {
"@peerbit/time": "1.0.4"
},
"localMaintainers": [
"dao.xyz"
]
}
}
6 changes: 2 additions & 4 deletions packages/transport/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
"@peerbit/logger": "1.0.1",
"@peerbit/uint8arrays": "3.0.1",
"@libp2p/interfaces": "^3.3.2",
"abstract-level": "^1.0.3",
"libp2p": "0.46.9",
"memory-level": "^1.0.0"
"libp2p": "0.46.9"
}
}
}
193 changes: 175 additions & 18 deletions packages/transport/pubsub/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,31 @@ const createSubscriptionMetrics = (pubsub: DirectSub) => {
return m;
};

const collectDataWrites = (client: DirectSub) => {
const writes: Map<string, PubSubData[]> = new Map();
for (const [name, peer] of client.peers) {
writes.set(name, [])
const writeFn = peer.write.bind(peer)
peer.write = (data) => {
const bytes = data instanceof Uint8Array ? data : data.subarray();
const message = deserialize(bytes, Message)
if (message instanceof DataMessage) {
const pubsubData = deserialize(message.data, PubSubMessage);
if (pubsubData instanceof PubSubData) {
writes.get(name)?.push(pubsubData)
}

}
return writeFn(data)
}
}
return writes;

}
const createMetrics = (pubsub: DirectSub) => {
const m: {
stream: DirectSub;
relayedData: PubSubData[];
messages: Message[];
received: PubSubData[];
allReceived: PubSubData[];
Expand All @@ -54,6 +76,7 @@ const createMetrics = (pubsub: DirectSub) => {
} = {
messages: [],
received: [],
relayedData: [],
allReceived: [],
stream: pubsub,
subscriptionEvents: [],
Expand All @@ -79,6 +102,20 @@ const createMetrics = (pubsub: DirectSub) => {
}
return onDataMessageFn(from, stream, message);
};

const relayMessageFn = pubsub.relayMessage.bind(pubsub);
pubsub.relayMessage = (from, message, to) => {
if (message instanceof DataMessage) {
const pubsubMessage = PubSubMessage.from(message.data);
if (pubsubMessage instanceof PubSubData) {
m.relayedData.push(pubsubMessage);
}
}
return relayMessageFn(from, message, to);
};



return m;
};

Expand Down Expand Up @@ -305,6 +342,7 @@ describe("pubsub", function () {
await delay(3000); // wait some more time to make sure we dont get more messages
expect(streams[2].received).toHaveLength(1);
expect(streams[1].received).toHaveLength(0);

});
it("can send as non subscribeer", async () => {
streams[0].stream.unsubscribe(TOPIC);
Expand Down Expand Up @@ -465,7 +503,7 @@ describe("pubsub", function () {

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "world";
beforeAll(async () => {});
beforeAll(async () => { });
beforeEach(async () => {
session = await TestSession.disconnected(5, {
services: {
Expand Down Expand Up @@ -506,7 +544,7 @@ describe("pubsub", function () {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});
afterAll(async () => {});
afterAll(async () => { });

it("will publish on routes", async () => {
streams[3].received = [];
Expand All @@ -528,6 +566,7 @@ describe("pubsub", function () {
});

describe("line", () => {

/*
┌─┐
│0│ // Sender of message
Expand All @@ -545,7 +584,7 @@ describe("pubsub", function () {

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "world";
beforeAll(async () => {});
beforeAll(async () => { });
beforeEach(async () => {
session = await TestSession.disconnected(3, {
services: {
Expand Down Expand Up @@ -584,12 +623,11 @@ describe("pubsub", function () {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});
afterAll(async () => {});
afterAll(async () => { });

it("will not forward unless necessary", async () => {
streams[1].received = [];
streams[2].received = [];
await delay(5000);
await streams[0].stream.publish(data, { topics: [TOPIC] });
await waitFor(() => streams[1].received.length === 1);
expect(new Uint8Array(streams[1].received[0].data)).toEqual(data);
Expand All @@ -605,21 +643,140 @@ describe("pubsub", function () {
}
});
});

describe("4 connected", () => {

/*
┌───┐
│ 0 │
└┬─┬┘
│┌▽┐
││1│
│└┬┘
┌▽┐│
│2││
└┬┘│
┌▽─▽─┐
│ 3 │
└────┘
*/


let session: TestSession<{ pubsub: DirectSub }>;
let streams: ReturnType<typeof createMetrics>[];

const data = new Uint8Array([1, 2, 3]);
const TOPIC = "topic";
const PING_INTERVAL = 1000;
beforeEach(async () => {
session = await TestSession.disconnected(4, {
services: {
pubsub: (c) =>
new DirectSub(c, {
pingInterval: PING_INTERVAL,
canRelayMessage: true,
connectionManager: { autoDial: false }
})
}
});


await session.connect([
[session.peers[0], session.peers[1]],
[session.peers[0], session.peers[2]],
[session.peers[1], session.peers[3]],
[session.peers[2], session.peers[3]],
]);

streams = [];

for (const [i, peer] of session.peers.entries()) {
streams.push(createMetrics(peer.services.pubsub));
if ([0, 3].includes(i)) {
await peer.services.pubsub.subscribe(TOPIC);
}
}


for (const [i, peer] of streams.entries()) {
if ([0, 3].includes(i)) {
await peer.stream.requestSubscribers(TOPIC);
await waitFor(
() => peer.stream.getSubscribers(TOPIC)?.size === 1)
}
}


})

afterEach(async () => {
await Promise.all(streams.map((peer) => peer.stream.stop()));
await session.stop();
});

it("will not send messages back another route", async () => {

// if '0' pushes data on TOPIC to '3'
// there is no point for '3' to send messages back to '0'

const allWrites = streams.map(stream => collectDataWrites(stream.stream))
console.log(streams[0].stream.publicKeyHash)


await delay(PING_INTERVAL * 6)

for (const stream of streams) {
for (let i = 0; i < streams.length; i++) {
for (let j = 0; j < streams.length; j++) {
if (j !== i) {
const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash);
if (data) {
expect(data?.weight).toBeLessThan(1e4);
}
}
}

}
}



let count = 1;
for (let i = 0; i < count; i++) {
await streams[0].stream.publish(data, { topics: [TOPIC] });
}

await waitForResolved(() => expect(streams[3].received).toHaveLength(count));

const p330 = streams[3].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p320 = streams[3].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p310 = streams[3].stream.routes.getPath(streams[1].stream.publicKeyHash, streams[0].stream.publicKeyHash)

const p230 = streams[2].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash)
const p220 = streams[2].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash)

for (const stream of streams) {
for (let i = 0; i < streams.length; i++) {
for (let j = 0; j < streams.length; j++) {
if (j !== i) {
const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash);
if (data) {
expect(data?.weight).toBeLessThan(1e4);
}
}
}

}
}

for (const [peer, writes] of allWrites[3]) {
expect(writes).toHaveLength(0)
}
})
})
});

/*
TODO
┌────┐
│0 │
└┬──┬┘
┌▽┐┌▽┐
│2││1│
└┬┘└┬┘
┌▽──▽┐
│3 │
└────┘
*/

// test sending "0" to "3" only 1 message should appear even though not in strict mode

describe("join/leave", () => {
Expand Down
Loading

0 comments on commit 6667bfb

Please sign in to comment.