diff --git a/packages/programs/clock-service/package.json b/packages/programs/clock-service/package.json index 886b98c66..401a67a02 100644 --- a/packages/programs/clock-service/package.json +++ b/packages/programs/clock-service/package.json @@ -35,7 +35,6 @@ "@peerbit/trusted-network": "2.0.21" }, "devDependencies": { - "@peerbit/test-utils": "^1.0.33", - "memory-level": "^1.0.0" + "@peerbit/test-utils": "^1.0.33" } -} +} \ No newline at end of file diff --git a/packages/transport/blocks/package.json b/packages/transport/blocks/package.json index 6b9745bef..9556be886 100644 --- a/packages/transport/blocks/package.json +++ b/packages/transport/blocks/package.json @@ -58,8 +58,6 @@ "@peerbit/blocks-interface": "1.1.3", "@peerbit/crypto": "1.0.10", "@ipld/dag-cbor": "^9.0.2", - "abstract-level": "^1.0.3", - "libp2p": "0.46.9", - "memory-level": "^1.0.0" + "libp2p": "0.46.9" } -} +} \ No newline at end of file diff --git a/packages/transport/libp2p-test-utils/package.json b/packages/transport/libp2p-test-utils/package.json index b082acbfe..d53ab7e92 100644 --- a/packages/transport/libp2p-test-utils/package.json +++ b/packages/transport/libp2p-test-utils/package.json @@ -42,9 +42,7 @@ "@libp2p/tcp": "^8.0.5", "@libp2p/webrtc": "^3.1.9", "@libp2p/websockets": "^7.0.5", - "datastore-level": "^10.1.4", - "libp2p": "0.46.9", - "memory-level": "^1.0.0" + "libp2p": "0.46.9" }, "devDependencies": { "@peerbit/time": "1.0.4" @@ -52,4 +50,4 @@ "localMaintainers": [ "dao.xyz" ] -} +} \ No newline at end of file diff --git a/packages/transport/pubsub/package.json b/packages/transport/pubsub/package.json index 62ea34f40..f89bef9ce 100644 --- a/packages/transport/pubsub/package.json +++ b/packages/transport/pubsub/package.json @@ -56,8 +56,6 @@ "@peerbit/logger": "1.0.1", "@peerbit/uint8arrays": "3.0.1", "@libp2p/interfaces": "^3.3.2", - "abstract-level": "^1.0.3", - "libp2p": "0.46.9", - "memory-level": "^1.0.0" + "libp2p": "0.46.9" } -} +} \ No newline at end of file diff --git a/packages/transport/pubsub/src/__tests__/index.test.ts b/packages/transport/pubsub/src/__tests__/index.test.ts index 3b3e16819..2a709370c 100644 --- a/packages/transport/pubsub/src/__tests__/index.test.ts +++ b/packages/transport/pubsub/src/__tests__/index.test.ts @@ -43,9 +43,31 @@ const createSubscriptionMetrics = (pubsub: DirectSub) => { return m; }; +const collectDataWrites = (client: DirectSub) => { + const writes: Map = new Map(); + for (const [name, peer] of client.peers) { + writes.set(name, []) + const writeFn = peer.write.bind(peer) + peer.write = (data) => { + const bytes = data instanceof Uint8Array ? data : data.subarray(); + const message = deserialize(bytes, Message) + if (message instanceof DataMessage) { + const pubsubData = deserialize(message.data, PubSubMessage); + if (pubsubData instanceof PubSubData) { + writes.get(name)?.push(pubsubData) + } + + } + return writeFn(data) + } + } + return writes; + +} const createMetrics = (pubsub: DirectSub) => { const m: { stream: DirectSub; + relayedData: PubSubData[]; messages: Message[]; received: PubSubData[]; allReceived: PubSubData[]; @@ -54,6 +76,7 @@ const createMetrics = (pubsub: DirectSub) => { } = { messages: [], received: [], + relayedData: [], allReceived: [], stream: pubsub, subscriptionEvents: [], @@ -79,6 +102,20 @@ const createMetrics = (pubsub: DirectSub) => { } return onDataMessageFn(from, stream, message); }; + + const relayMessageFn = pubsub.relayMessage.bind(pubsub); + pubsub.relayMessage = (from, message, to) => { + if (message instanceof DataMessage) { + const pubsubMessage = PubSubMessage.from(message.data); + if (pubsubMessage instanceof PubSubData) { + m.relayedData.push(pubsubMessage); + } + } + return relayMessageFn(from, message, to); + }; + + + return m; }; @@ -305,6 +342,7 @@ describe("pubsub", function () { await delay(3000); // wait some more time to make sure we dont get more messages expect(streams[2].received).toHaveLength(1); expect(streams[1].received).toHaveLength(0); + }); it("can send as non subscribeer", async () => { streams[0].stream.unsubscribe(TOPIC); @@ -465,7 +503,7 @@ describe("pubsub", function () { const data = new Uint8Array([1, 2, 3]); const TOPIC = "world"; - beforeAll(async () => {}); + beforeAll(async () => { }); beforeEach(async () => { session = await TestSession.disconnected(5, { services: { @@ -506,7 +544,7 @@ describe("pubsub", function () { await Promise.all(streams.map((peer) => peer.stream.stop())); await session.stop(); }); - afterAll(async () => {}); + afterAll(async () => { }); it("will publish on routes", async () => { streams[3].received = []; @@ -528,6 +566,7 @@ describe("pubsub", function () { }); describe("line", () => { + /* ┌─┐ │0│ // Sender of message @@ -545,7 +584,7 @@ describe("pubsub", function () { const data = new Uint8Array([1, 2, 3]); const TOPIC = "world"; - beforeAll(async () => {}); + beforeAll(async () => { }); beforeEach(async () => { session = await TestSession.disconnected(3, { services: { @@ -584,12 +623,11 @@ describe("pubsub", function () { await Promise.all(streams.map((peer) => peer.stream.stop())); await session.stop(); }); - afterAll(async () => {}); + afterAll(async () => { }); it("will not forward unless necessary", async () => { streams[1].received = []; streams[2].received = []; - await delay(5000); await streams[0].stream.publish(data, { topics: [TOPIC] }); await waitFor(() => streams[1].received.length === 1); expect(new Uint8Array(streams[1].received[0].data)).toEqual(data); @@ -605,21 +643,140 @@ describe("pubsub", function () { } }); }); + + describe("4 connected", () => { + + /* + ┌───┐ + │ 0 │ + └┬─┬┘ + │┌▽┐ + ││1│ + │└┬┘ + ┌▽┐│ + │2││ + └┬┘│ + ┌▽─▽─┐ + │ 3 │ + └────┘ + */ + + + let session: TestSession<{ pubsub: DirectSub }>; + let streams: ReturnType[]; + + const data = new Uint8Array([1, 2, 3]); + const TOPIC = "topic"; + const PING_INTERVAL = 1000; + beforeEach(async () => { + session = await TestSession.disconnected(4, { + services: { + pubsub: (c) => + new DirectSub(c, { + pingInterval: PING_INTERVAL, + canRelayMessage: true, + connectionManager: { autoDial: false } + }) + } + }); + + + await session.connect([ + [session.peers[0], session.peers[1]], + [session.peers[0], session.peers[2]], + [session.peers[1], session.peers[3]], + [session.peers[2], session.peers[3]], + ]); + + streams = []; + + for (const [i, peer] of session.peers.entries()) { + streams.push(createMetrics(peer.services.pubsub)); + if ([0, 3].includes(i)) { + await peer.services.pubsub.subscribe(TOPIC); + } + } + + + for (const [i, peer] of streams.entries()) { + if ([0, 3].includes(i)) { + await peer.stream.requestSubscribers(TOPIC); + await waitFor( + () => peer.stream.getSubscribers(TOPIC)?.size === 1) + } + } + + + }) + + afterEach(async () => { + await Promise.all(streams.map((peer) => peer.stream.stop())); + await session.stop(); + }); + + it("will not send messages back another route", async () => { + + // if '0' pushes data on TOPIC to '3' + // there is no point for '3' to send messages back to '0' + + const allWrites = streams.map(stream => collectDataWrites(stream.stream)) + console.log(streams[0].stream.publicKeyHash) + + + await delay(PING_INTERVAL * 6) + + for (const stream of streams) { + for (let i = 0; i < streams.length; i++) { + for (let j = 0; j < streams.length; j++) { + if (j !== i) { + const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash); + if (data) { + expect(data?.weight).toBeLessThan(1e4); + } + } + } + + } + } + + + + let count = 1; + for (let i = 0; i < count; i++) { + await streams[0].stream.publish(data, { topics: [TOPIC] }); + } + + await waitForResolved(() => expect(streams[3].received).toHaveLength(count)); + + const p330 = streams[3].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash) + const p320 = streams[3].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash) + const p310 = streams[3].stream.routes.getPath(streams[1].stream.publicKeyHash, streams[0].stream.publicKeyHash) + + const p230 = streams[2].stream.routes.getPath(streams[3].stream.publicKeyHash, streams[0].stream.publicKeyHash) + const p220 = streams[2].stream.routes.getPath(streams[2].stream.publicKeyHash, streams[0].stream.publicKeyHash) + + for (const stream of streams) { + for (let i = 0; i < streams.length; i++) { + for (let j = 0; j < streams.length; j++) { + if (j !== i) { + const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[j].stream.publicKeyHash); + if (data) { + expect(data?.weight).toBeLessThan(1e4); + } + } + } + + } + } + + for (const [peer, writes] of allWrites[3]) { + expect(writes).toHaveLength(0) + } + }) + }) }); - /* - TODO - ┌────┐ - │0 │ - └┬──┬┘ - ┌▽┐┌▽┐ - │2││1│ - └┬┘└┬┘ - ┌▽──▽┐ - │3 │ - └────┘ - - */ + // test sending "0" to "3" only 1 message should appear even though not in strict mode describe("join/leave", () => { diff --git a/packages/transport/pubsub/src/index.ts b/packages/transport/pubsub/src/index.ts index d50950beb..e7f552380 100644 --- a/packages/transport/pubsub/src/index.ts +++ b/packages/transport/pubsub/src/index.ts @@ -1,6 +1,5 @@ import type { PeerId as Libp2pPeerId } from "@libp2p/interface/peer-id"; import { logger as logFn } from "@peerbit/logger"; - import { DataMessage } from "@peerbit/stream-interface"; import { DirectStream, @@ -267,19 +266,19 @@ export class DirectSub extends DirectStream implements PubSub { ); } - getPeersWithTopics(topics: string[], otherPeers?: string[]): Set { - const peers: Set = otherPeers ? new Set(otherPeers) : new Set(); + getPeersOnTopics(topics: string[]): Set { + const newPeers: Set = new Set() if (topics?.length) { for (const topic of topics) { const peersOnTopic = this.topicsToPeers.get(topic.toString()); if (peersOnTopic) { peersOnTopic.forEach((peer) => { - peers.add(peer); + newPeers.add(peer); }); } } } - return peers; + return newPeers; } /* getStreamsWithTopics(topics: string[], otherPeers?: string[]): PeerStreams[] { @@ -293,15 +292,15 @@ export class DirectSub extends DirectStream implements PubSub { data: Uint8Array, options: | { - topics?: string[]; - to?: (string | PeerId)[]; - strict?: false; - } + topics?: string[]; + to?: (string | PeerId)[]; + strict?: false; + } | { - topics: string[]; - to: (string | PeerId)[]; - strict: true; - } + topics: string[]; + to: (string | PeerId)[]; + strict: true; + } ): Promise { if (!this.started) { throw new Error("Not started"); @@ -314,9 +313,9 @@ export class DirectSub extends DirectStream implements PubSub { x instanceof PublicSignKey ? x.hashcode() : typeof x === "string" - ? x - : getPublicKeyFromPeerId(x).hashcode() - ) || this.getPeersWithTopics(topics); + ? x + : getPublicKeyFromPeerId(x).hashcode() + ) || this.getPeersOnTopics(topics); // Embedd topic info before the data so that peers/relays can also use topic info to route messages efficiently const dataMessage = new PubSubData({ @@ -459,10 +458,51 @@ export class DirectSub extends DirectStream implements PubSub { // Forward if (!pubsubMessage.strict) { - const newTos = this.getPeersWithTopics( + + const existingPeers: Set = new Set(message.to) + const allPeersOnTopic = this.getPeersOnTopics( pubsubMessage.topics, - message.to + ); + + for (const existing of existingPeers) { + allPeersOnTopic.add(existing) + } + + // for check path to newTos from from + const newTos: Set = new Set(); + for (const to of allPeersOnTopic) { + + if (existingPeers.has(to)) { // the message should be sent to 'to' from us + newTos.add(to) + continue + } + + + const pathFromFrom = this.routes.getPath(this.peerIdToPublicKey.get(from.toString())!.hashcode(), to) + + if (pathFromFrom?.length > 0) { // A new 'receiver', lets try to figure out why 'from' did not send it to this receiver + const pathFromMe = this.routes.getPath(this.publicKeyHash, to) + if (pathFromMe?.length > 0) { + const weightFromFrom = this.routes.getDistance(pathFromFrom) + const weightFromMe = this.routes.getDistance(pathFromMe) + if (weightFromMe < weightFromFrom) /// TODO add threshold for resiliancy this line requires that FROM and ME have the same latency map + { + newTos.add(to); + } + else { + logger.debug("Skip relying messages since it is longer to send from me") + } + } + else { + newTos.add(to) + } + } + else { + newTos.add(to); + } + } + newTos.delete(this.publicKeyHash); message.to = [...newTos]; } diff --git a/packages/transport/stream-interface/src/messages.ts b/packages/transport/stream-interface/src/messages.ts index 327f20b13..a3e9bca12 100644 --- a/packages/transport/stream-interface/src/messages.ts +++ b/packages/transport/stream-interface/src/messages.ts @@ -104,6 +104,7 @@ class PublicKeys { const SIGNATURES_SIZE_ENCODING = "u8"; // with 7 steps you know everyone in the world?, so u8 *should* suffice @variant(0) export class Signatures { + @field({ type: vec(SignatureWithKey, SIGNATURES_SIZE_ENCODING) }) signatures: SignatureWithKey[]; @@ -127,100 +128,71 @@ export class Signatures { } } -const keyMap: Map = new Map(); interface Signed { - get signatures(): Signatures; + signatures?: Signatures; +} + +interface ToSign { + toSign(): Promise; } -interface Suffix { - getSuffix(iteration: number): Uint8Array | Uint8Array[]; + + + +const sign = async (obj: T, signer: (bytes: Uint8Array) => Promise): Promise => { + const bytes = await obj.toSign(); + const signature = await signer(bytes) + obj.signatures = new Signatures([signature]) + return obj } + const verifyMultiSig = async ( - message: Suffix & Prefixed & Signed, + message: Signed & ToSign, expectSignatures: boolean ) => { - const signatures = message.signatures.signatures; - if (signatures.length === 0) { + const signatures = message.signatures?.signatures; + if (!signatures || signatures.length === 0) { return !expectSignatures; } - await message.createPrefix(); - - const dataGenerator = getMultiSigDataToSignHistory(message, 0); - let done: boolean | undefined = false; + const bytes = await message.toSign() for (const signature of signatures) { - if (done) { - throw new Error( - "Unexpected, the amount of signatures does not match the amount of data verify" - ); - } - const data = dataGenerator.next(); - done = data.done; - if (!(await verify(signature, data.value!))) { + if (!(await verify(signature, bytes))) { return false; } } return true; -}; -interface Prefixed { - prefix: Uint8Array; - createPrefix: () => Promise; } -const emptySignatures = serialize(new Signatures()); -function* getMultiSigDataToSignHistory( - message: Suffix & Prefixed & Signed, - from = 0 -): Generator { - if (from === 0) { - yield concatBytes( - [message.prefix, emptySignatures], - message.prefix.length + emptySignatures.length - ); - } - - for ( - let i = Math.max(from - 1, 0); - i < message.signatures.signatures.length; - i++ - ) { - const bytes = message.getSuffix(i); // TODO make more performant - const concat = [message.prefix]; - let len = message.prefix.length; - if (bytes instanceof Uint8Array) { - concat.push(bytes); - len += bytes.byteLength; - } else { - for (const arr of bytes) { - concat.push(arr); - len += arr.byteLength; - } - } - yield concatBytes(concat, len); - } - return; -} - -export abstract class Message { +export abstract class Message implements Signed, ToSign { static from(bytes: Uint8ArrayList) { if (bytes.get(0) === DATA_VARIANT) { // Data return DataMessage.from(bytes); - } else if (bytes.get(0) === HELLO_VARIANT) { + } /* else if (bytes.get(0) === HELLO_VARIANT) { // heartbeat return Hello.from(bytes); } else if (bytes.get(0) === GOODBYE_VARIANT) { // heartbeat return Goodbye.from(bytes); - } else if (bytes.get(0) === PING_VARIANT) { - return PingPong.from(bytes); + } */else if (bytes.get(0) === ACKNOWLEDGE_VARIANT) { + return ACK.from(bytes); } throw new Error("Unsupported"); } + abstract get id(): Uint8Array + abstract get signatures(): Signatures | undefined + abstract set signatures(signature: Signatures | undefined) + abstract toSign(): Promise; + + async sign(signer: (bytes: Uint8Array) => Promise): Promise { + return sign(this, signer) + + } abstract bytes(): Uint8ArrayList | Uint8Array; - abstract equals(other: Message): boolean; + /* abstract equals(other: Message): boolean; */ abstract verify(expectSignatures: boolean): Promise; } @@ -228,39 +200,44 @@ export abstract class Message { const DATA_VARIANT = 0; @variant(DATA_VARIANT) export class DataMessage extends Message { + + @field({ type: MessageHeader }) private _header: MessageHeader; + @field({ type: Uint8Array }) + private _data: Uint8Array; + + @field({ type: 'bool' }) + private _seekPath: boolean; // if true then this message will fanout larger. And recipent will send message back to acknowledge + @field({ type: vec("string") }) - private _to: string[]; // not signed! TODO should we sign this? + private _to: string[]; // Not signed + + @field({ type: option(Signatures) }) + public signatures: Signatures | undefined - @field({ type: Signatures }) - private _signatures: Signatures; - @field({ type: Uint8Array }) - private _data: Uint8Array; constructor(properties: { header?: MessageHeader; to?: string[]; data: Uint8Array; signatures?: Signatures; + seekPath?: boolean }) { super(); this._data = properties.data; this._header = properties.header || new MessageHeader(); this._to = properties.to || []; - this._signatures = properties.signatures || new Signatures(); + this.signatures = properties.signatures || new Signatures(); + this._seekPath = properties.seekPath || false } get id(): Uint8Array { return this._header.id; } - get signatures(): Signatures { - return this._signatures; - } - get header(): MessageHeader { return this._header; } @@ -273,59 +250,30 @@ export class DataMessage extends Message { this._to = to; } - get sender(): PublicSignKey { - return this.signatures.signatures[0].publicKey; - } get data(): Uint8Array { return this._data; } + get seekPath(): boolean { + return this._seekPath + } + _serialized: Uint8Array | undefined; get serialized(): Uint8Array | undefined { return this.serialized; } - _prefix: Uint8Array | undefined; - get prefix(): Uint8Array { - if (!this._prefix) { - throw new Error("Prefix not created"); - } - return this._prefix; - } - async createPrefix(): Promise { - if (this._prefix) { - return this._prefix; - } + + async toSign(): Promise { const headerSer = serialize(this._header); const hashBytes = await sha256(this.data); - this._prefix = concatBytes( - [new Uint8Array([DATA_VARIANT]), headerSer, hashBytes], - 1 + headerSer.length + hashBytes.length - ); - return this._prefix; - } - - getSuffix(iteration: number): Uint8Array { - return serialize( - new Signatures(this.signatures.signatures.slice(0, iteration + 1)) + return concatBytes( + [new Uint8Array([DATA_VARIANT]), headerSer, hashBytes, new Uint8Array([this._seekPath ? 1 : 0])], + 1 + headerSer.length + hashBytes.length + 1 ); } - async sign(sign: (bytes: Uint8Array) => Promise) { - this._serialized = undefined; // because we will change this object, so the serialized version will not be applicable anymore - await this.createPrefix(); - this.signatures.signatures.push( - await sign( - getMultiSigDataToSignHistory( - this, - this.signatures.signatures.length - ).next().value! - ) - ); - return this; - } - async verify(expectSignatures: boolean): Promise { return this._header.verify() && verifyMultiSig(this, expectSignatures); } @@ -348,38 +296,14 @@ export class DataMessage extends Message { return ret; } - equals(other: Message) { - if (other instanceof DataMessage) { - const a = - equals(this.data, other.data) && - equals(this.id, other.id) && - this.to.length === other.to.length; - if (!a) { - return false; - } - for (let i = 0; i < this.to.length; i++) { - if (this.to[i] !== other.to[i]) { - return false; - } - } - return this.signatures.equals(other.signatures); - } - return false; - } -} -@variant(0) -export class NetworkInfo { - @field({ type: vec("u32", SIGNATURES_SIZE_ENCODING) }) - pingLatencies: number[]; - constructor(pingLatencies: number[]) { - this.pingLatencies = pingLatencies; - } } +/* // I send this too all my peers const HELLO_VARIANT = 1; @variant(HELLO_VARIANT) + export class Hello extends Message { @field({ type: MessageHeader }) header: MessageHeader; @@ -390,9 +314,6 @@ export class Hello extends Message { @field({ type: option(Uint8Array) }) data?: Uint8Array; - @field({ type: NetworkInfo }) - networkInfo: NetworkInfo; - @field({ type: Signatures }) signatures: Signatures; @@ -403,7 +324,9 @@ export class Hello extends Message { this.multiaddrs = options?.multiaddrs?.filter((x) => !x.includes("/p2p-circuit/")) || []; // don't forward relay addresess (TODO ?) this.signatures = new Signatures(); - this.networkInfo = new NetworkInfo([]); + } + get id() { + return this.header.id } get sender(): PublicSignKey { @@ -441,32 +364,9 @@ export class Hello extends Message { return this._prefix; } - getSuffix(iteration: number): Uint8Array[] { - return [ - serialize( - new NetworkInfo(this.networkInfo.pingLatencies.slice(0, iteration + 1)) - ), - serialize( - new Signatures(this.signatures.signatures.slice(0, iteration + 1)) - ) - ]; - } - - async sign(sign: (bytes: Uint8Array) => Promise) { - await this.createPrefix(); - const toSign = getMultiSigDataToSignHistory( - this, - this.signatures.signatures.length - ).next().value!; - this.signatures.signatures.push(await sign(toSign)); - return this; - } - async verify(expectSignatures: boolean): Promise { return ( this.header.verify() && - this.networkInfo.pingLatencies.length === - this.signatures.signatures.length - 1 && verifyMultiSig(this, expectSignatures) ); } @@ -492,6 +392,7 @@ export class Hello extends Message { // Me or some my peer disconnected const GOODBYE_VARIANT = 2; @variant(GOODBYE_VARIANT) + export class Goodbye extends Message { @field({ type: MessageHeader }) header: MessageHeader; @@ -502,8 +403,8 @@ export class Goodbye extends Message { @field({ type: option(Uint8Array) }) data?: Uint8Array; // not signed - @field({ type: Signatures }) - signatures: Signatures; + @field({ type: option(Signatures) }) + signatures: Signatures | undefined constructor(properties?: { header?: MessageHeader; @@ -518,16 +419,15 @@ export class Goodbye extends Message { this.signatures = new Signatures(); } - get sender(): PublicSignKey { - return this.signatures.signatures[0]!.publicKey; + get id() { + return this.header.id } - bytes() { return serialize(this); } static from(bytes: Uint8ArrayList): Goodbye { const result = deserialize(bytes.subarray(), Goodbye); - if (result.signatures.signatures.length === 0) { + if (!result.signatures || result.signatures.signatures.length === 0) { throw new Error("Missing sender on Goodbye"); } return result; @@ -554,205 +454,73 @@ export class Goodbye extends Message { return this._prefix; } - getSuffix(iteration: number): Uint8Array { - return serialize( - new Signatures(this.signatures.signatures.slice(0, iteration + 1)) - ); - } - - async sign(sign: (bytes: Uint8Array) => Promise) { - await this.createPrefix(); - this.signatures.signatures.push( - await sign( - getMultiSigDataToSignHistory( - this, - this.signatures.signatures.length - ).next().value! - ) - ); - return this; - } - async verify(expectSignatures: boolean): Promise { return this.header.verify() && verifyMultiSig(this, expectSignatures); } + +} */ - equals(other: Message) { - if (other instanceof Goodbye) { - if (this.early !== other.early) { - return false; - } - - const dataEquals = - (!!this.data && !!other.data && equals(this.data, other.data)) || - !this.data === !other.data; - if (!dataEquals) { - return false; - } - return ( - this.header.equals(other.header) && - this.signatures.equals(other.signatures) - ); - } - return false; - } -} - -const PING_VARIANT = 3; - -@variant(PING_VARIANT) -export abstract class PingPong extends Message { - static from(bytes: Uint8ArrayList) { - return deserialize(bytes.subarray(), PingPong); - } - - bytes(): Uint8ArrayList | Uint8Array { - return serialize(this); - } - - verify(_expectSignatures: boolean): Promise { - return Promise.resolve(true); - } - - abstract get pingBytes(): Uint8Array; -} - -@variant(0) -export class Ping extends PingPong { - @field({ type: fixedArray("u8", 32) }) - pingBytes: Uint8Array; - - constructor() { - super(); - this.pingBytes = randomBytes(32); - } - equals(other: Message) { - if (other instanceof Ping) { - return equals(this.pingBytes, other.pingBytes); - } - return false; - } -} - -@variant(1) -export class Pong extends PingPong { - @field({ type: fixedArray("u8", 32) }) - pingBytes: Uint8Array; - - constructor(pingBytes: Uint8Array) { - super(); - this.pingBytes = pingBytes; - } - - equals(other: Message) { - if (other instanceof Pong) { - return equals(this.pingBytes, other.pingBytes); - } - return false; - } -} - -@variant(0) -export class Connections { - @field({ type: vec(fixedArray("string", 2)) }) - connections: [string, string][]; +const ACKNOWLEDGE_VARIANT = 2; - constructor(connections: [string, string][]) { - this.connections = connections; - } +@variant(ACKNOWLEDGE_VARIANT) +export class ACK extends Message { - equals(other: Connections) { - if (this.connections.length !== other.connections.length) { - return false; - } - for (let i = 0; i < this.connections.length; i++) { - if (this.connections[i].length !== other.connections[i].length) { - return false; - } - const a1 = this.connections[i][0]; - const a2 = this.connections[i][1]; - const b1 = other.connections[i][0]; - const b2 = other.connections[i][1]; - - if (a1 === b1 && a2 === b2) { - continue; - } - if (a1 === b2 && a2 === b1) { - continue; - } - return false; - } - return true; - } -} - -// Share connections -/* const NETWORK_INFO_VARIANT = 3; -@variant(NETWORK_INFO_VARIANT) -export class NetworkInfo extends Message { @field({ type: MessageHeader }) header: MessageHeader; - @field({ type: Connections }) - connections: Connections; + @field({ type: fixedArray('u8', 32) }) + messageIdToAcknowledge: Uint8Array + @field({ type: 'u8' }) + seenCounter: number // Number of times a peer has received the messageIdToAcknowledge before + + @field({ type: vec("string") }) + to: string[]; // not signed! TODO should we sign this? @field({ type: Signatures }) - signatures: Signatures + signatures: Signatures; - constructor(connections: [string, string][]) { - super(); + constructor(properties: { messageIdToAcknowledge: Uint8Array, seenCounter: number, to: string[] }) { + super() this.header = new MessageHeader(); - this.connections = new Connections(connections); - this.signatures = new Signatures() - } + this.messageIdToAcknowledge = properties.messageIdToAcknowledge + this.signatures = new Signatures(); + this.to = properties.to + this.seenCounter = Math.min(255, properties.seenCounter); - getDataToSign(): Uint8Array { - return this.serialize() } - - _prefix: Uint8Array | undefined - get prefix(): Uint8Array { - if (this._prefix) - return this._prefix - const header = serialize(this.header); - const connections = serialize(this.connections); - this._prefix = concatBytes([new Uint8Array([NETWORK_INFO_VARIANT]), header, connections], 1 + header.length + connections.length); - return this._prefix; + get id() { + return this.header.id } - sign(sign: (bytes: Uint8Array) => SignatureWithKey) { - this.signatures.signatures.push(sign(getMultiSigDataToSignHistory(this, this.signatures.signatures.length).next().value!)); - return this; + async verify(expectSignatures: boolean): Promise { + return this.header.verify() && verifyMultiSig(this, expectSignatures); } - verify(): boolean { - return this.header.verify() && verifyMultiSig(this) + + bytes() { + return serialize(this); } + async toSign(): Promise { - serialize() { - return serialize(this) - } - static from(bytes: Uint8ArrayList): NetworkInfo { - return deserialize(bytes.subarray(), NetworkInfo) + const headerSer = serialize(this.header); + return concatBytes( + [new Uint8Array([ACKNOWLEDGE_VARIANT]), headerSer, this.messageIdToAcknowledge, new Uint8Array([this.seenCounter])], + 1 + headerSer.length + 32 + 1 + ); } - equals(other: Message) { - if (other instanceof NetworkInfo) { - if (!equals(this.header.id, other.header.id) || !this.header.equals(other.header)) { // TODO fix uneccessary copy - return false; - } - if (!this.connections.equals(other.connections)) { - return false; - } - return this.signatures.equals(other.signatures) + + static from(bytes: Uint8ArrayList): ACK { + const result = deserialize(bytes.subarray(), ACK); + if (!result.signatures || result.signatures.signatures.length === 0) { + throw new Error("Missing sender on ACK"); } - return false; + return result; } -} - */ +} diff --git a/packages/transport/stream/package.json b/packages/transport/stream/package.json index 118c15491..e5d7127a8 100644 --- a/packages/transport/stream/package.json +++ b/packages/transport/stream/package.json @@ -51,19 +51,14 @@ ], "devDependencies": { "@peerbit/libp2p-test-utils": "1.0.8", - "@types/yallist": "^4.0.1", - "graphology-types": "^0.24.7" + "@types/yallist": "^4.0.1" }, "dependencies": { "@dao-xyz/borsh": "^5.1.8", "@peerbit/cache": "1.1.1", "@peerbit/crypto": "1.0.10", "@peerbit/stream-interface": "^1.0.11", - "abstract-level": "^1.0.3", - "graphology": "0.25.1", - "graphology-shortest-path": "2.0.2", "libp2p": "0.46.9", - "memory-level": "^1.0.0", "yallist": "^4.0.0" } -} +} \ No newline at end of file diff --git a/packages/transport/stream/src/__tests__/routes.test.ts b/packages/transport/stream/src/__tests__/routes.test.ts deleted file mode 100644 index 9157bfef6..000000000 --- a/packages/transport/stream/src/__tests__/routes.test.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { Routes } from "../routes"; -import crypto from "crypto"; - -describe("routes", () => { - /* - - We create this in the setup - ┌─┐ - │y│ - └┬┘ - ┌▽┐ - │x│ - └─┘ - - ┌─┐ - │a│ - └┬┘ - ┌▽┐ - │b│ - └┬┘ - ┌▽┐ - │c│ - └─┘ - - and conenct x and a during the tests - - */ - - let routes: Routes; - let a: string, b: string, c: string, x: string, y: string; - - const set = () => { - a = crypto.randomBytes(16).toString("hex"); - b = crypto.randomBytes(16).toString("hex"); - c = crypto.randomBytes(16).toString("hex"); - x = crypto.randomBytes(16).toString("hex"); - y = crypto.randomBytes(16).toString("hex"); - }; - beforeEach(() => { - routes = new Routes("_"); - set(); - expect(routes.addLink(a, b, 1)).toContainAllValues([]); - expect(routes.addLink(b, c, 1)).toContainAllValues([]); - expect(routes.addLink(x, y, 1)).toContainAllValues([]); - }); - describe("path", () => { - it("will find path", () => { - const path = routes.getPath(a, c); - expect(path).toEqual([a, b, c]); - }); - - it("will find shortest path", () => { - let path = routes.getPath(a, c); - expect(path).toEqual([a, b, c]); - - // add a longer path directly from a to c - expect(routes.addLink(a, c, 2.00001)).toContainAllValues([]); - - // shortest path is still the same - path = routes.getPath(a, c); - expect(path).toEqual([a, b, c]); - - // Update the weight tobeless than 2 - expect(routes.addLink(a, c, 1.99999)).toContainAllValues([]); - path = routes.getPath(a, c); - expect(path).toEqual([a, c]); - }); - - it("can block node", () => { - // Add slow path from a to c which should not be prefered by default - expect(routes.addLink(a, c, 1e3)).toContainAllValues([]); - - let path = routes.getPath(a, c); - expect(path).toEqual([a, b, c]); // a -> b -> c fastest still - - // Update the weight to be less than 2 - let block = b; - path = routes.getPath(a, c, { block }); - expect(path).toEqual([a, c]); // slow path is fastest because b is blocked - - // Make sure notthing has changed - path = routes.getPath(a, c); - expect(path).toEqual([a, b, c]); - }); - - it("missing node", () => { - const path = routes.getPath(a, "?"); - expect(path).toHaveLength(0); - }); - it("missing path", () => { - const path = routes.getPath(a, x); - expect(path).toHaveLength(0); - }); - }); - - describe("add", () => { - it("insertion symmetric", () => { - const ab = routes.getLink(a, b); - const ba = routes.getLink(b, a); - expect(ab).toBeDefined(); - expect(ba).toBeDefined(); - }); - }); - - describe("delete", () => { - it("single", () => { - expect(routes.deleteLink(b, a)).toEqual([]); // netiher a or b was reachablee (because of origin arg) - expect(routes.getPath(a, c)).toHaveLength(0); - }); - - it("symmetric", () => { - routes.addLink(b, a, 1); - expect(routes.deleteLink(b, a)).toEqual([]); // netiher a or b was reachablee (because of origin arg) - expect(routes.getPath(a, c)).toHaveLength(0); - }); - - it("subgraph 1", () => { - expect(routes.addLink(a, x, 1, x)).toEqual([a, b, c]); - expect(routes.getPath(x, c).length === 4); - expect(routes.linksCount).toEqual(4); - - expect(routes.deleteLink(a, x, x)).toEqual([a, b, c]); - expect(routes.linksCount).toEqual(1); // x -> y - expect(routes.getLink(x, y)).toBeDefined(); - }); - - it("subgraph 2", () => { - expect(routes.addLink(a, x, 1, x)).toEqual([a, b, c]); - expect(routes.getPath(x, c).length === 4); - expect(routes.linksCount).toEqual(4); - - expect(routes.deleteLink(a, b, x)).toEqual([b, c]); - expect(routes.linksCount).toEqual(2); // x -> y - expect(routes.getLink(x, a)).toBeDefined(); - expect(routes.getLink(x, y)).toBeDefined(); - }); - - it("subgraph 3", () => { - expect(routes.addLink(a, x, 1, y)).toEqual([a, b, c]); - expect(routes.getPath(x, c).length === 4); - expect(routes.linksCount).toEqual(4); - - expect(routes.deleteLink(a, b, y)).toEqual([b, c]); - expect(routes.linksCount).toEqual(2); // x -> a x -> y - expect(routes.getLink(x, a)).toBeDefined(); - expect(routes.getLink(x, y)).toBeDefined(); - }); - }); -}); diff --git a/packages/transport/stream/src/__tests__/stream.test.ts b/packages/transport/stream/src/__tests__/stream.test.ts index cabf3aefb..eb5348f5d 100644 --- a/packages/transport/stream/src/__tests__/stream.test.ts +++ b/packages/transport/stream/src/__tests__/stream.test.ts @@ -7,7 +7,7 @@ import { ConnectionManagerOptions, DirectStreamComponents } from ".."; -import { DataMessage, Message, getMsgId } from "@peerbit/stream-interface"; +import { ACK, DataMessage, Message, getMsgId } from "@peerbit/stream-interface"; import { PublicSignKey } from "@peerbit/crypto"; import { PeerId, isPeerId } from "@libp2p/interface/peer-id"; import { Multiaddr } from "@multiformats/multiaddr"; @@ -15,13 +15,43 @@ import { multiaddr } from "@multiformats/multiaddr"; import { tcp } from "@libp2p/tcp"; import { webSockets } from "@libp2p/websockets"; import * as filters from "@libp2p/websockets/filters"; -import { serialize } from "@dao-xyz/borsh"; +import { deserialize, serialize } from "@dao-xyz/borsh"; + + + +const collectDataWrites = (client: DirectStream) => { + const writes: Map = new Map(); + for (const [name, peer] of client.peers) { + writes.set(name, []) + const writeFn = peer.write.bind(peer) + peer.write = (data) => { + const bytes = data instanceof Uint8Array ? data : data.subarray(); + const message = deserialize(bytes, Message) + if (message instanceof DataMessage) { + writes.get(name)?.push(message) + + } + return writeFn(data) + } + } + return writes; + +} + +const getWritesCount = (writes: Map) => { + let sum = 0; + for (const [k, v] of writes) { + sum += v.length; + } + return sum; +} const createMetrics = (stream: DirectStream) => { const s: { stream: TestDirectStream; messages: Message[]; received: DataMessage[]; + ack: ACK[]; reachable: PublicSignKey[]; unrechable: PublicSignKey[]; seen: Map; @@ -29,6 +59,7 @@ const createMetrics = (stream: DirectStream) => { messages: [], received: [], reachable: [], + ack: [], unrechable: [], seen: new Map(), stream @@ -52,6 +83,13 @@ const createMetrics = (stream: DirectStream) => { s.seen.set(k, (prev ?? 0) + 1); return seenHas(k); }; + + const ackFn = s.stream.onAck.bind(s.stream); + s.stream.onAck = (a, b, c) => { + s.ack.push(c); + return ackFn(a, b, c); + }; + return s; }; class TestDirectStream extends DirectStream { @@ -115,65 +153,11 @@ const service = (s: TestSessionStream, i: number, service: string) => const waitForPeers = (s: TestSessionStream) => waitForPeerStreams(...s.peers.map((x) => x.services.directstream)); -describe("streams", function () { - describe("ping", () => { - let session: TestSessionStream; - afterEach(async () => { - await session?.stop(); - }); - - it("2-ping", async () => { - // 0 and 2 not connected - session = await connected(2); - await waitForPeers(session); - - // Pings can be aborted, by the interval pinging, so we just need to check that eventually we get results - await stream(session, 0).ping( - stream(session, 0).peers.get(stream(session, 1).publicKeyHash)! - ); - await waitFor( - () => - stream(session, 0).peers.get(stream(session, 1).publicKeyHash) - ?.pingLatency! < 1000 - ); - }); - it("4-ping", async () => { - // 0 and 2 not connected - session = await connected(4); - await waitForPeers(session); - // Pings can be aborted, by the interval pinging, so we just need to check that eventually we get results - await stream(session, 0).ping( - stream(session, 0).peers.get(stream(session, 1).publicKeyHash)! - ); - await waitFor( - () => - stream(session, 0).peers.get(stream(session, 1).publicKeyHash) - ?.pingLatency! < 1000 - ); - }); - // TODO add test to make sure Hello's are not resent uneccessary amount of times +describe("streams", function () { - it("ping interval", async () => { - // 0 and 2 not connected - session = await connected(2, { - services: { - directstream: (c) => new TestDirectStream(c, { pingInterval: 1000 }) - } - }); - await waitForPeers(session); - - let counter = 0; - const pingFn = stream(session, 0).onPing.bind(stream(session, 0)); - stream(session, 0).onPing = (a, b, c) => { - counter += 1; - return pingFn(a, b, c); - }; - await waitFor(() => counter > 5); - }); - }); describe("publish", () => { const data = new Uint8Array([1, 2, 3]); @@ -182,7 +166,7 @@ describe("streams", function () { let session: TestSessionStream; let streams: ReturnType[]; - beforeAll(async () => {}); + beforeAll(async () => { }); beforeEach(async () => { // 0 and 2 not connected @@ -231,6 +215,17 @@ describe("streams", function () { await session.stop(); }); + /* it('updates latency map', async () => { + await delay(6000) + for (const stream of streams) { + for (let i = 0; i < streams.length - 1; i++) { + const data = stream.stream.routes.getLinkData(streams[i].stream.publicKeyHash, streams[i + 1].stream.publicKeyHash); + expect(data?.weight).toBeLessThan(1e4); + } + } + const q = 123; + }) */ + it("many", async () => { let iterations = 300; @@ -244,6 +239,12 @@ describe("streams", function () { }); }); + it("vvvvvvvvxyz", async () => { + const q = 12; + console.log(q) + expect(1).toBeFalse() + }) + it("1->unknown", async () => { await streams[0].stream.publish(data); await waitFor(() => streams[1].received.length === 1); @@ -328,15 +329,13 @@ describe("streams", function () { await session.connect([[session.peers[0], session.peers[2]]]); await waitForPeerStreams(streams[0].stream, streams[2].stream); - // make path 1->3 longest, to make sure we send over it directly anyways because it is a direct path - streams[0].stream.routes.graph.setEdgeAttribute( - streams[0].stream.routes.getLink( - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ), - "weight", - 1e5 + // mark 0 -> 1 -> 2 as shortest route... + + streams[0].stream.routes.add( + streams[1].stream.publicKeyHash, + streams[2].stream.publicKeyHash ); + await streams[0].stream.publish(crypto.randomBytes(1e2), { to: [streams[2].stream.components.peerId] }); @@ -345,6 +344,7 @@ describe("streams", function () { expect(streams[2].received).toHaveLength(1) ); + // ...yet make sure the data has not travelled this path expect( streams[1].messages.filter((x) => x instanceof DataMessage) ).toHaveLength(0); @@ -369,26 +369,9 @@ describe("streams", function () { await session.connect([[session.peers[0], session.peers[2]]]); await waitForPeerStreams(streams[0].stream, streams[2].stream); - const defaultEdgeWeightFnPeer0 = - streams[0].stream.routes.graph.getEdgeAttribute.bind( - streams[0].stream.routes.graph - ); - let link02 = streams[0].stream.routes.getLink( - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ); + streams[0].stream.routes.add(streams[1].stream.publicKeyHash, streams[3].stream.publicKeyHash) - // make path from 0 -> 2 long, so data will be sent in the path 0 -> 1 -> 2 -> 3 - streams[0].stream.routes.graph.getEdgeAttribute = ( - edge: unknown, - name: any - ) => { - if (edge === link02) { - return 1e5; - } - return defaultEdgeWeightFnPeer0(edge, name); - }; await streams[0].stream.publish(crypto.randomBytes(1e2), { to: [streams[3].stream.components.peerId] @@ -409,23 +392,10 @@ describe("streams", function () { streams[1].messages = []; - // Make [0] -> [2] path short - streams[0].stream.routes.graph.getEdgeAttribute = ( - edge: unknown, - name: any - ) => { - if (edge === link02) { - return 0; - } - return defaultEdgeWeightFnPeer0(edge, name); - }; - - expect( - streams[0].stream.routes.getPath( - streams[0].stream.publicKeyHash, - streams[2].stream.publicKeyHash - ).length - ).toEqual(2); + streams[0].stream.routes.add( + streams[2].stream.publicKeyHash, + streams[3].stream.publicKeyHash, + ) await streams[0].stream.publish(crypto.randomBytes(1e2), { to: [streams[3].stream.components.peerId] }); @@ -438,6 +408,55 @@ describe("streams", function () { expect(messages).toHaveLength(0); expect(streams[1].received).toHaveLength(0); }); + + it("will eventually figure out shortest path", async () => { + /* + ┌───┐ + │ 0 │ + └┬─┬┘ + │┌▽┐ + ││1│ + │└┬┘ + ┌▽─▽┐ + │2 │ + └┬──┘ + ┌▽┐ + │3│ + └─┘ + */ + + for (const n of streams) { + console.log(n.stream.publicKey.hashcode()) + } + + await session.connect([[session.peers[0], session.peers[2]]]); + await waitForPeerStreams(streams[0].stream, streams[2].stream); + + await streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[3].stream.components.peerId] + }); + + + // because node 2 will deduplicate message coming from 1, only 1 data message will arrive to node 3 + // hence only one ACK will be returned to A + await waitForResolved(() => expect(streams[0].ack).toHaveLength(1)) + await delay(2000) + await waitForResolved(() => expect(streams[0].ack).toHaveLength(1)) + + streams[1].messages = []; + streams[3].received = []; + + + await streams[0].stream.publish(crypto.randomBytes(1e2), { + to: [streams[3].stream.components.peerId] + }); + + await waitFor(() => streams[3].received.length === 1) + + expect(streams[1].messages).toHaveLength(0); // Because shortest route is 0 -> 2 -> 3 + + + }); }); describe("fanout", () => { @@ -445,7 +464,7 @@ describe("streams", function () { let session: TestSessionStream; let streams: ReturnType[]; - beforeAll(async () => {}); + beforeAll(async () => { }); beforeEach(async () => { session = await connected(3, { @@ -545,6 +564,79 @@ describe("streams", function () { ); }); }); + + describe("1->2", () => { + + + let session: TestSessionStream; + let streams: ReturnType[]; + const data = new Uint8Array([1, 2, 3]); + + beforeAll(async () => { }); + + beforeEach(async () => { + session = await connected(3, { + services: { + directstream: (c) => + new TestDirectStream(c, { + connectionManager: { autoDial: false }, + }) + } + }); + streams = []; + for (const peer of session.peers) { + streams.push(createMetrics(peer.services.directstream)); + } + await waitForResolved(() => + expect(streams[0].stream.routes.count()).toEqual(3) + ); + await waitForResolved(() => + expect(streams[1].stream.routes.count()).toEqual(3) + ); + await waitForResolved(() => + expect(streams[2].stream.routes.count()).toEqual(3) + ); + }); + + afterEach(async () => { + await session.stop(); + }); + + it("messages are only sent once to each peer", async () => { + + const allWrites = streams.map(x => collectDataWrites(x.stream)); + + let totalWrites = 100; + await delay(4000) + + for (let i = 0; i < totalWrites; i++) { + streams[0].stream.publish(data, { + to: [ + streams[1].stream.publicKeyHash, + streams[2].stream.publicKeyHash + ] + }); + } + + await waitForResolved(() => + expect(streams[1].received).toHaveLength(totalWrites) + ); + await waitForResolved(() => + expect(streams[2].received).toHaveLength(totalWrites) + ); + await delay(4000) + + // Check number of writes for each node + expect(getWritesCount(allWrites[0])).toEqual(totalWrites * 2) // write to "1" or "2" + expect(getWritesCount(allWrites[1])).toEqual(0) // "1" should never has to push any data + expect(getWritesCount(allWrites[2])).toEqual(0) // "2" should never has to push any data + + }); + }); + + + + describe("1->2->2", () => { /** ┌─────┐ @@ -560,7 +652,7 @@ describe("streams", function () { ┌│──│┘│ ││ │┌┘ ┌▽▽┐┌▽▽┐ - │3 ││4 │ + │3 ││4 │ // 3 and 4 are connected also └──┘└──┘ */ @@ -568,7 +660,7 @@ describe("streams", function () { let streams: ReturnType[]; const data = new Uint8Array([1, 2, 3]); - beforeAll(async () => {}); + beforeAll(async () => { }); beforeEach(async () => { session = await disconnected(5, { @@ -592,7 +684,9 @@ describe("streams", function () { [session.peers[1], session.peers[4]], [session.peers[2], session.peers[3]], - [session.peers[2], session.peers[4]] + [session.peers[2], session.peers[4]], + + [session.peers[3], session.peers[4]] ]); await waitForPeerStreams(streams[0].stream, streams[1].stream); @@ -601,21 +695,22 @@ describe("streams", function () { await waitForPeerStreams(streams[1].stream, streams[4].stream); await waitForPeerStreams(streams[2].stream, streams[3].stream); await waitForPeerStreams(streams[2].stream, streams[4].stream); + await waitForPeerStreams(streams[3].stream, streams[4].stream); await waitForResolved(() => - expect(streams[0].stream.routes.nodeCount).toEqual(5) + expect(streams[0].stream.routes.count()).toEqual(5) ); await waitForResolved(() => - expect(streams[1].stream.routes.nodeCount).toEqual(5) + expect(streams[1].stream.routes.count()).toEqual(5) ); await waitForResolved(() => - expect(streams[2].stream.routes.nodeCount).toEqual(5) + expect(streams[2].stream.routes.count()).toEqual(5) ); await waitForResolved(() => - expect(streams[3].stream.routes.nodeCount).toEqual(5) + expect(streams[3].stream.routes.count()).toEqual(5) ); await waitForResolved(() => - expect(streams[4].stream.routes.nodeCount).toEqual(5) + expect(streams[4].stream.routes.count()).toEqual(5) ); }); @@ -624,17 +719,25 @@ describe("streams", function () { }); it("messages are only sent once to each peer", async () => { - streams[0].stream.publish(data, { - to: [ - streams[3].stream.publicKeyHash, - streams[4].stream.publicKeyHash - ] - }); + + const allWrites = streams.map(x => collectDataWrites(x.stream)); + + let totalWrites = 100; + + for (let i = 0; i < totalWrites; i++) { + streams[0].stream.publish(data, { + to: [ + streams[3].stream.publicKeyHash, + streams[4].stream.publicKeyHash + ] + }); + } + await waitForResolved(() => - expect(streams[3].received).toHaveLength(1) + expect(streams[3].received).toHaveLength(totalWrites) ); await waitForResolved(() => - expect(streams[4].received).toHaveLength(1) + expect(streams[4].received).toHaveLength(totalWrites) ); const id1 = await getMsgId(serialize(streams[3].received[0])); @@ -643,8 +746,18 @@ describe("streams", function () { expect(streams[3].seen.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node expect(streams[4].seen.get(id1)).toEqual(1); // 1 delivery even though there are multiple path leading to this node + + + // Check number of writes for each node + expect(getWritesCount(allWrites[0])).toEqual(totalWrites) // write to "1" or "2" + expect(getWritesCount(allWrites[1]) + getWritesCount(allWrites[2])).toEqual(totalWrites * 2) // write to "3" and "4" + expect(getWritesCount(allWrites[3])).toEqual(0) // "3" should never has to push any data + expect(getWritesCount(allWrites[4])).toEqual(0) // "4" should never has to push any data + }); }); + + }); }); @@ -691,16 +804,16 @@ describe("streams", function () { } // slowly connect to that the route maps are deterministic - await session.connect([[session.peers[0], session.peers[1]]]); + await session.connect([[session.peers[0], session.peers[1]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 1); - await waitFor(() => streams[1].stream.routes.linksCount === 1); - await session.connect([[session.peers[1], session.peers[2]]]); + await waitFor(() => streams[1].stream.routes.linksCount === 1); */ + await session.connect([[session.peers[1], session.peers[2]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 2); - await waitFor(() => streams[1].stream.routes.linksCount === 2); - await session.connect([[session.peers[2], session.peers[3]]]); + await waitFor(() => streams[1].stream.routes.linksCount === 2); */ + await session.connect([[session.peers[2], session.peers[3]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 3); await waitFor(() => streams[1].stream.routes.linksCount === 3); - await waitFor(() => streams[2].stream.routes.linksCount === 3); + await waitFor(() => streams[2].stream.routes.linksCount === 3); */ await waitForPeerStreams(streams[0].stream, streams[1].stream); await waitForPeerStreams(streams[1].stream, streams[2].stream); await waitForPeerStreams(streams[2].stream, streams[3].stream); @@ -878,6 +991,7 @@ describe("streams", function () { └────┘ */ + // make sure node '0' both tries to directly dial '3' using '1' and '2' const dialedCircuitRelayAddresses: Set = new Set(); @@ -966,18 +1080,18 @@ describe("streams", function () { } // slowly connect to that the route maps are deterministic - await session.connect([[session.peers[0], session.peers[1]]]); + await session.connect([[session.peers[0], session.peers[1]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 1); - await waitFor(() => streams[1].stream.routes.linksCount === 1); - await session.connect([[session.peers[1], session.peers[2]]]); + await waitFor(() => streams[1].stream.routes.linksCount === 1); */ + await session.connect([[session.peers[1], session.peers[2]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 2); await waitFor(() => streams[1].stream.routes.linksCount === 2); - await waitFor(() => streams[2].stream.routes.linksCount === 2); - await session.connect([[session.peers[0], session.peers[3]]]); + await waitFor(() => streams[2].stream.routes.linksCount === 2); */ + await session.connect([[session.peers[0], session.peers[3]]]);/* await waitFor(() => streams[0].stream.routes.linksCount === 3); await waitFor(() => streams[1].stream.routes.linksCount === 3); await waitFor(() => streams[2].stream.routes.linksCount === 3); - await waitFor(() => streams[3].stream.routes.linksCount === 3); + await waitFor(() => streams[3].stream.routes.linksCount === 3); */ await waitForPeerStreams(streams[0].stream, streams[1].stream); await waitForPeerStreams(streams[1].stream, streams[2].stream); await waitForPeerStreams(streams[0].stream, streams[3].stream); @@ -1101,43 +1215,43 @@ describe("streams", function () { └───┘ */ - expect( + /* expect( streams[3].stream.routes.getPath( streams[3].stream.publicKeyHash, streams[2].stream.publicKeyHash ) - ).toHaveLength(4); + ).toHaveLength(4); */ await session.connect([[session.peers[2], session.peers[3]]]); - await waitFor( + /* await waitFor( () => streams[3].stream.routes.getPath( streams[3].stream.publicKeyHash, streams[2].stream.publicKeyHash ).length === 2 - ); + ); */ }); it("handle on drop no routes", async () => { - expect( + /* expect( streams[3].stream.routes.getPath( streams[3].stream.publicKeyHash, streams[2].stream.publicKeyHash ) - ).toHaveLength(4); - expect(streams[1].stream.earlyGoodbyes.size).toEqual(2); - expect(streams[3].stream.earlyGoodbyes.size).toEqual(1); + ).toHaveLength(4); */ + /* expect(streams[1].stream.earlyGoodbyes.size).toEqual(2); + expect(streams[3].stream.earlyGoodbyes.size).toEqual(1); */ await session.peers[0].stop(); - await waitFor(() => streams[3].stream.routes.linksCount === 0); // because 1, 2 are now disconnected + //await waitFor(() => streams[3].stream.routes.linksCount === 0); // because 1, 2 are now disconnected await delay(1000); // make sure nothing get readded - expect(streams[3].stream.routes.linksCount).toEqual(0); + /* expect(streams[3].stream.routes.linksCount).toEqual(0); expect( streams[3].stream.routes.getPath( streams[3].stream.publicKeyHash, streams[2].stream.publicKeyHash ) - ).toHaveLength(0); - expect(streams[3].stream.earlyGoodbyes.size).toEqual(0); + ).toHaveLength(0); */ + /* expect(streams[3].stream.earlyGoodbyes.size).toEqual(0); */ }); }); @@ -1187,9 +1301,9 @@ describe("streams", function () { streams.push(createMetrics(peer.services.directstream)); } - for (const peer of streams.values()) { + /* for (const peer of streams.values()) { await waitFor(() => peer.stream.routes.linksCount === 2); - } + } */ for (let i = 0; i < 2; i++) { await waitForPeerStreams(streams[i].stream, streams[i + 1].stream); @@ -1206,13 +1320,14 @@ describe("streams", function () { for (let i = 3; i < 5; i++) { await waitForPeerStreams(streams[i].stream, streams[i + 1].stream); } - expect(streams[2].stream.helloMap.size).toEqual(2); // these hellos will be forwarded on connect + /* expect(streams[2].stream.helloMap.size).toEqual(2); // these hellos will be forwarded on connect expect(streams[3].stream.helloMap.size).toEqual(2); // these hellos will be forwarded on connect + */ await session.connect([[session.peers[2], session.peers[3]]]); - for (const peer of streams) { + /* for (const peer of streams) { await waitFor(() => peer.stream.routes.linksCount === 5); // everyone knows everone - } + } */ }); }); @@ -1224,7 +1339,7 @@ describe("streams", function () { for (let i = 0; i < session.peers.length; i++) { await waitForResolved(() => expect( - session.peers[i].services.directstream.routes.nodeCount + session.peers[i].services.directstream.routes.count() ).toEqual(3) ); } @@ -1239,12 +1354,12 @@ describe("streams", function () { await extraSession.peers[0].dial(session.peers[2].getMultiaddrs()); await waitForResolved(() => expect( - extraSession.peers[0].services.directstream.routes.nodeCount + extraSession.peers[0].services.directstream.routes.count() ).toEqual(3) ); await delay(3000); expect( - extraSession.peers[0].services.directstream.routes.nodeCount + extraSession.peers[0].services.directstream.routes.count() ).toEqual(3); }); @@ -1318,20 +1433,20 @@ describe("streams", function () { } }); // use 2 transports as this might cause issues if code is not handling multiple connections correctly - await waitFor(() => stream(session, 1).helloMap.size == 1); + /* await waitFor(() => stream(session, 1).helloMap.size == 1); */ await stream(session, 0).stop(); - await waitFor(() => stream(session, 1).helloMap.size === 0); + /* await waitFor(() => stream(session, 1).helloMap.size === 0); */ await stream(session, 1).stop(); expect(stream(session, 0).peers.size).toEqual(0); await delay(3000); await stream(session, 0).start(); - expect(stream(session, 0).helloMap.size).toEqual(0); + /* expect(stream(session, 0).helloMap.size).toEqual(0); */ await stream(session, 1).start(); await waitFor(() => stream(session, 0).peers.size === 1); - await waitFor(() => stream(session, 0).helloMap.size === 1); - await waitFor(() => stream(session, 1).helloMap.size === 1); + /* await waitFor(() => stream(session, 0).helloMap.size === 1); + await waitFor(() => stream(session, 1).helloMap.size === 1); */ await waitForPeerStreams(stream(session, 0), stream(session, 1)); }); it("can connect after start", async () => { diff --git a/packages/transport/stream/src/index.ts b/packages/transport/stream/src/index.ts index 1c66889a7..28aa08a52 100644 --- a/packages/transport/stream/src/index.ts +++ b/packages/transport/stream/src/index.ts @@ -10,7 +10,6 @@ import { Uint8ArrayList } from "uint8arraylist"; import { abortableSource } from "abortable-iterator"; import * as lp from "it-length-prefixed"; import { Routes } from "./routes.js"; -import { multiaddr } from "@multiformats/multiaddr"; import { PeerMap } from "./peer-map.js"; import type { IncomingStreamData, @@ -27,6 +26,8 @@ import { getKeypairFromPeerId, getPublicKeyFromPeerId, PublicSignKey, + sha256Base64, + sha256Base64Sync, SignatureWithKey } from "@peerbit/crypto"; @@ -38,15 +39,12 @@ import type { Libp2pEvents } from "@libp2p/interface"; import { PeerEvents, Message as Message, - Goodbye, - Hello, DataMessage, - Ping, - PingPong, - Pong, getMsgId, - WaitForPeer + WaitForPeer, + ACK } from "@peerbit/stream-interface"; + export interface PeerStreamsInit { peerId: PeerId; publicKey: PublicSignKey; @@ -93,9 +91,6 @@ export class PeerStreams extends EventEmitter { private closed: boolean; - public pingJob: { resolve: () => void; abort: () => void }; - public pingLatency: number | undefined; - public connId: string; constructor(init: PeerStreamsInit) { super(); @@ -176,9 +171,9 @@ export class PeerStreams extends EventEmitter { .catch((error) => { logger.error( "Failed to send to stream: " + - this.peerId + - ". " + - (error?.message || error?.toString()) + this.peerId + + ". " + + (error?.message || error?.toString()) ); }); } else { @@ -221,8 +216,6 @@ export class PeerStreams extends EventEmitter { const _prevStream = this.outboundStream; this._rawOutboundStream = stream; - this.pingJob?.abort(); - this.outboundStream = pushable({ objectMode: true, onEnd: () => { @@ -275,9 +268,6 @@ export class PeerStreams extends EventEmitter { await this._rawInboundStream?.close(); } - this.pingJob?.abort(); - this.pingLatency = undefined; - //this.dispatchEvent(new CustomEvent('close')) this._rawOutboundStream = undefined; this.outboundStream = undefined; @@ -321,11 +311,10 @@ export interface DirectStreamComponents extends Components { } export abstract class DirectStream< - Events extends { [s: string]: any } = StreamEvents - > + Events extends { [s: string]: any } = StreamEvents +> extends EventEmitter - implements WaitForPeer -{ + implements WaitForPeer { public peerId: PeerId; public peerIdStr: string; public publicKey: PublicSignKey; @@ -353,19 +342,22 @@ export abstract class DirectStream< public emitSelf: boolean; public queue: Queue; public multicodecs: string[]; - public seenCache: Cache; - public earlyGoodbyes: Map; - public helloMap: Map>; // key is hash of publicKey, value is map whey key is hash of signature bytes, and value is latest Hello + public seenCache: Cache; public multiaddrsMap: Map; private _registrarTopologyIds: string[] | undefined; private readonly maxInboundStreams?: number; private readonly maxOutboundStreams?: number; - private topology: any; private pingJobPromise: any; private pingJob: any; private pingInterval: number | null; private connectionManagerOptions: ConnectionManagerOptions; private recentDials: Cache; + private traces: Cache + + // FROM -> TO + private badRoutes: Map> + + private _ackCallbacks: Map void, timeout: ReturnType, }> = new Map() constructor( readonly components: DirectStreamComponents, @@ -393,16 +385,16 @@ export abstract class DirectStream< this.multicodecs = multicodecs; this.started = false; this.peers = new Map(); - this.helloMap = new Map(); + this.multiaddrsMap = new Map(); - this.routes = new Routes(this.publicKeyHash); + this.routes = new Routes(); this.canRelayMessage = canRelayMessage; this.emitSelf = emitSelf; this.queue = new Queue({ concurrency: messageProcessingConcurrency }); - this.earlyGoodbyes = new Map(); this.maxInboundStreams = maxInboundStreams; this.maxOutboundStreams = maxOutboundStreams; this.seenCache = new Cache({ max: 1e3, ttl: 10 * 60 * 1e3 }); + this.badRoutes = new Map() this.peerKeyHashToPublicKey = new Map(); this.peerIdToPublicKey = new Map(); this.pingInterval = pingInterval; @@ -415,6 +407,11 @@ export abstract class DirectStream< ttl: connectionManager.retryDelay || 60 * 1000, max: 1e3 }); + + this.traces = new Cache({ + ttl: connectionManager.retryDelay || 10 * 1000, + max: 1e6 + }); } async start() { @@ -479,7 +476,7 @@ export abstract class DirectStream< await this.onPeerConnected(conn.remotePeer, conn, { fromExisting: true }); } - const pingJob = async () => { + /* const pingJob = async () => { // TODO don't use setInterval but waitFor previous done to be done await this.pingJobPromise; const promises: Promise[] = []; @@ -506,7 +503,7 @@ export abstract class DirectStream< this.pingJob = setTimeout(pingJob, this.pingInterval); }); }; - pingJob(); + pingJob(); */ } /** @@ -537,15 +534,20 @@ export abstract class DirectStream< ); } + this.queue.clear(); - this.helloMap.clear(); this.multiaddrsMap.clear(); - this.earlyGoodbyes.clear(); this.peers.clear(); this.seenCache.clear(); this.routes.clear(); this.peerKeyHashToPublicKey.clear(); this.peerIdToPublicKey.clear(); + for (const [k, v] of this._ackCallbacks) { + clearTimeout(v.timeout); + } + this._ackCallbacks.clear() + this.traces.clear() + this.badRoutes.clear() logger.debug("stopped"); } @@ -725,36 +727,12 @@ export abstract class DirectStream< /* if (!existingStream) */ { this.addRouteConnection( - this.publicKey, - peerKey, - Number.MAX_SAFE_INTEGER + peerKey ); - // Get accurate latency - promises.push(this.ping(peer)); - // Say hello - promises.push( - this.publishMessage( - this.components.peerId, - await new Hello({ - multiaddrs: this.components.addressManager - .getAddresses() - .map((x) => x.toString()) - }).sign(this.sign), - [peer] - ) - ); - // Send my goodbye early if I disconnect for some reason, (so my peer can say goodbye for me) - // TODO add custom condition fn for doing below - promises.push( - this.publishMessage( - this.components.peerId, - await new Goodbye({ early: true }).sign(this.sign), - [peer] - ) - ); + /* // replay all hellos for (const [sender, hellos] of this.helloMap) { if (sender === peerKeyHash) { @@ -778,7 +756,7 @@ export abstract class DirectStream< this.publishMessage(this.components.peerId, hello, [peer]) ); } - } + } */ } const resolved = await Promise.all(promises); return resolved; @@ -788,34 +766,18 @@ export abstract class DirectStream< } private addRouteConnection( - from: PublicSignKey, - to: PublicSignKey, - latency: number + to: PublicSignKey ) { - this.peerKeyHashToPublicKey.set(from.hashcode(), from); this.peerKeyHashToPublicKey.set(to.hashcode(), to); - const links = this.routes.addLink(from.hashcode(), to.hashcode(), latency); - for (const added of links) { - const key = this.peerKeyHashToPublicKey.get(added); - if (key?.equals(this.publicKey) === false) { - this.onPeerReachable(key!); - } - } + this.routes.add(to.hashcode(), to.hashcode()); + this.onPeerReachable(to); + } - removeRouteConnection(from: PublicSignKey, to: PublicSignKey) { - const has = this.routes.hasNode(to.hashcode()); - if (!has) { - this.onPeerUnreachable(to); - } else { - const links = this.routes.deleteLink(from.hashcode(), to.hashcode()); - for (const deleted of links) { - const key = this.peerKeyHashToPublicKey.get(deleted)!; - this.peerKeyHashToPublicKey.delete(deleted); - if (key?.equals(this.publicKey) === false) { - this.onPeerUnreachable(key!); - } - } + removeRouteConnection(to: PublicSignKey) { + const unreachable = this.routes.removeNeighbour(to.hashcode()); + for (const node of unreachable) { + this.onPeerUnreachable(node) // TODO types } } @@ -845,16 +807,9 @@ export abstract class DirectStream< if (!this.publicKey.equals(peerKey)) { await this._removePeer(peerKey); - this.removeRouteConnection(this.publicKey, peerKey); + this.removeRouteConnection(peerKey); // Notify network - const earlyGoodBye = this.earlyGoodbyes.get(peerKeyHash); - if (earlyGoodBye) { - earlyGoodBye.early = false; - await earlyGoodBye.sign(this.sign); - await this.publishMessage(this.components.peerId, earlyGoodBye); - this.earlyGoodbyes.delete(peerKeyHash); - } this.peerIdToPublicKey.delete(peerId.toString()); } @@ -876,14 +831,21 @@ export abstract class DirectStream< * invoked when a new peer becomes unreachable * @param publicKeyHash */ - public onPeerUnreachable(publicKey: PublicSignKey) { + public onPeerUnreachable(hash: string) { + // override this fn - this.helloMap.delete(publicKey.hashcode()); - this.multiaddrsMap.delete(publicKey.hashcode()); - this.dispatchEvent( - new CustomEvent("peer:unreachable", { detail: publicKey }) - ); + this.multiaddrsMap.delete(hash); + + let wasReachable = this.routes.isReachable(hash); + if (wasReachable) { + this.dispatchEvent( + // TODO types + new CustomEvent("peer:unreachable", { detail: this.peerKeyHashToPublicKey.get(hash)! }) + ); + } + + } /** @@ -956,34 +918,7 @@ export abstract class DirectStream< try { await pipe(stream, async (source) => { for await (const data of source) { - const msgId = await getMsgId(data); - if (this.seenCache.has(msgId)) { - // we got message that WE sent? - - /** - * Most propobable reason why we arrive here is a race condition/issue - - ┌─┐ - │0│ - └△┘ - ┌▽┐ - │1│ - └△┘ - ┌▽┐ - │2│ - └─┘ - - from 2s perspective, - - if everyone conents to each other at the same time, then 0 will say hello to 1 and 1 will save that hello to resend to 2 if 2 ever connects - but two is already connected by onPeerConnected has not been invoked yet, so the hello message gets forwarded, - and later onPeerConnected gets invoked on 1, and the same message gets resent to 2 - */ - - continue; - } - this.seenCache.add(msgId); this.processRpc(peerId, peerStreams, data).catch((err) => logger.warn(err) ); @@ -992,9 +927,9 @@ export abstract class DirectStream< } catch (err: any) { logger.error( "error on processing messages to id: " + - peerStreams.peerId.toString() + - ". " + - err?.message + peerStreams.peerId.toString() + + ". " + + err?.message ); this.onPeerDisconnected(peerStreams.peerId); } @@ -1031,6 +966,13 @@ export abstract class DirectStream< return true; } + private async modifySeenCache(message: Uint8Array) { + const msgId = await getMsgId(message); + const seen = this.seenCache.get(msgId) + this.seenCache.add(msgId, seen ? seen + 1 : 1); + return seen || 0; + } + /** * Handles a message from a peer */ @@ -1049,7 +991,6 @@ export abstract class DirectStream< // Ensure the message is valid before processing it const message: Message | undefined = Message.from(msg); - this.dispatchEvent( new CustomEvent("message", { detail: message @@ -1057,23 +998,35 @@ export abstract class DirectStream< ); if (message instanceof DataMessage) { - await this.onDataMessage(from, peerStream, message); - } else if (message instanceof Hello) { - await this.onHello(from, peerStream, message); - } else if (message instanceof Goodbye) { - await this.onGoodbye(from, peerStream, message); - } else if (message instanceof PingPong) { - await this.onPing(from, peerStream, message); + await this.onDataMessage(from, peerStream, msg, message); } else { - throw new Error("Unsupported"); + + if (await this.modifySeenCache(msg instanceof Uint8ArrayList ? msg.subarray() : msg) > 0) { + logger.debug("Received message already seen of type: " + (message.constructor.name)) + return; + } + + /* if (message instanceof Hello) { + await this.onHello(from, peerStream, message); + } else if (message instanceof Goodbye) { + await this.onGoodbye(from, peerStream, message); + } else */ + if (message instanceof ACK) { + await this.onAck(from, peerStream, message); + } else { + throw new Error("Unsupported"); + } + } } async onDataMessage( from: PeerId, peerStream: PeerStreams, + messageBytes: Uint8ArrayList | Uint8Array, message: DataMessage ) { + const seenBefore = await this.modifySeenCache(messageBytes instanceof Uint8ArrayList ? messageBytes.subarray() : messageBytes); const isFromSelf = this.components.peerId.equals(from); if (!isFromSelf || this.emitSelf) { const isForAll = message.to.length === 0; @@ -1089,11 +1042,20 @@ export abstract class DirectStream< return false; } - this.dispatchEvent( - new CustomEvent("data", { - detail: message - }) - ); + if (!seenBefore) { + this.dispatchEvent( + new CustomEvent("data", { + detail: message + }) + ); + } + + + if (message.seekPath) { + + // Send ACK backwards + this.publishMessage(this.components.peerId, await new ACK({ messageIdToAcknowledge: message.id, seenCounter: seenBefore, to: message.signatures!.publicKeys.map(x => x.hashcode()) }).sign(this.sign), [peerStream]) + } } if (isForMe && message.to.length === 1) { // dont forward this message anymore because it was meant ONLY for me @@ -1102,19 +1064,17 @@ export abstract class DirectStream< } // Forward - await this.relayMessage(from, message); + if (!seenBefore) { + await this.relayMessage(from, message); + + } return true; } - async onHello(from: PeerId, peerStream: PeerStreams, message: Hello) { - if (!(await message.verify(false))) { - const a = message.header.verify(); - const b = - message.networkInfo.pingLatencies.length === - message.signatures.signatures.length - 1; + /* async onHello(from: PeerId, peerStream: PeerStreams, message: Hello) { + if (!(await message.verify(true))) { logger.warn( - `Recieved hello message that did not verify. Header: ${a}, Ping info ${b}, Signatures ${ - a && b + `Recieved hello message that did not verify. Header: ${a}, Ping info ${b}, Signatures ${a && b }` ); return false; @@ -1126,71 +1086,33 @@ export abstract class DirectStream< return false; } - const signatures = message.signatures; - for (let i = 0; i < signatures.signatures.length - 1; i++) { - this.addRouteConnection( - signatures.signatures[i].publicKey, - signatures.signatures[i + 1].publicKey, - message.networkInfo.pingLatencies[i] - ); - } - - message.networkInfo.pingLatencies.push( - peerStream.pingLatency || 4294967295 - ); // TODO don't propagate if latency is high? - - await message.sign(this.sign); // sign it so othere peers can now I have seen it (and can build a network graph from trace info) - - let hellos = this.helloMap.get(sender); - if (!hellos) { - hellos = new Map(); - this.helloMap.set(sender, hellos); - } - this.multiaddrsMap.set(sender, message.multiaddrs); - const helloSignaturHash = await message.signatures.hashPublicKeys(); - const existingHello = hellos.get(helloSignaturHash); - if (existingHello) { - if (existingHello.header.expires < message.header.expires) { - hellos.set(helloSignaturHash, message); - } - } else { - hellos.set(helloSignaturHash, message); - } - // Forward - await this.relayMessage(from, message); + await this.relayMessage(from, message); return true; } async onGoodbye(from: PeerId, peerStream: PeerStreams, message: Goodbye) { - if (!(await message.verify(false))) { + + if (!(await message.verify(true))) { logger.warn("Recieved message with invalid signature or timestamp"); return false; } - const sender = message.sender?.hashcode(); - if (!sender) { + if (!message.signatures || message.signatures.signatures.length === 0) { logger.warn("Recieved hello without sender"); return false; } const peerKey = getPublicKeyFromPeerId(from); const peerKeyHash = peerKey.hashcode(); + if (message.early) { this.earlyGoodbyes.set(peerKeyHash, message); } else { const signatures = message.signatures; - /* TODO Should we update routes on goodbye? - for (let i = 1; i < signatures.signatures.length - 1; i++) { - this.addRouteConnection( - signatures.signatures[i].publicKey, - signatures.signatures[i + 1].publicKey - ); - } - */ - + //let neighbour = message.trace[1] || this.peerIdStr; this.removeRouteConnection( signatures.signatures[0].publicKey, @@ -1206,19 +1128,40 @@ export abstract class DirectStream< } await message.sign(this.sign); // sign it so othere peers can now I have seen it (and can build a network graph from trace info) - const hellos = this.helloMap.get(sender); - if (hellos) { - const helloSignaturHash = await message.signatures.hashPublicKeys(); - hellos.delete(helloSignaturHash); - } - // Forward await this.relayMessage(from, message); } return true; } +*/ + + async onAck(from: PeerId, peerStream: PeerStreams, message: ACK) { + if (!(await message.verify(true))) { + logger.warn( + `Recieved ACK message that did not verify`) + return false; + } + + const messageIdString = await sha256Base64(message.messageIdToAcknowledge) + const next = this.traces.get(messageIdString) + let nextStream = next ? this.peers.get(next) : undefined + this._ackCallbacks.get(messageIdString)?.callback(message, peerStream, nextStream) + + + + // relay ACK ? + // send exactly backwards same route we got this message + if (!message.to.includes(this.publicKeyHash)) { // if not end destination + if (nextStream) { + this.publishMessage(this.peerId, message, [nextStream]); + } + } + + + + } - async onPing(from: PeerId, peerStream: PeerStreams, message: PingPong) { + /* async onPing(from: PeerId, peerStream: PeerStreams, message: PingPong) { if (message instanceof Ping) { // respond with pong await this.publishMessage( @@ -1233,7 +1176,7 @@ export abstract class DirectStream< throw new Error("Unsupported"); } } - + async ping(stream: PeerStreams): Promise { return new Promise((resolve, reject) => { stream.pingJob?.abort(); @@ -1245,7 +1188,7 @@ export abstract class DirectStream< const resolver = () => { const end = +new Date(); clearTimeout(timeout); - + // TODO what happens if a peer send a ping back then leaves? Any problems? const latency = end - start; stream.pingLatency = latency; @@ -1267,7 +1210,7 @@ export abstract class DirectStream< ); }); } - + */ /** * Whether to accept a message from a peer * Override to create a graylist @@ -1282,6 +1225,7 @@ export abstract class DirectStream< ) { // dispatch the event if we are interested let toHashes: string[]; + let seek = false if (options?.to) { if (options.to instanceof Set) { toHashes = new Array(options.to.size); @@ -1291,19 +1235,28 @@ export abstract class DirectStream< let i = 0; for (const to of options.to) { - toHashes[i++] = - to instanceof PublicSignKey - ? to.hashcode() - : typeof to === "string" + const hash = to instanceof PublicSignKey + ? to.hashcode() + : typeof to === "string" ? to - : getPublicKeyFromPeerId(to).hashcode(); + : getPublicKeyFromPeerId(to).hashcode() + + if (!seek && !this.routes.isReachable(hash)) { + seek = true; + } + + toHashes[i++] = hash; } + + } else { + seek = true; toHashes = []; } const message = new DataMessage({ data: data instanceof Uint8ArrayList ? data.subarray() : data, - to: toHashes + to: toHashes, + seekPath: seek }); if (this.signaturePolicy === "StictSign") { await message.sign(this.sign); @@ -1321,6 +1274,7 @@ export abstract class DirectStream< throw new Error("Not started"); } + const message = await this.createMessage(data, options); if (this.emitSelf) { @@ -1331,82 +1285,80 @@ export abstract class DirectStream< ); } + // send to all the other peers await this.publishMessage(this.components.peerId, message, undefined); return message.id; } - public async hello(data?: Uint8Array): Promise { - if (!this.started) { - return; - } - - // send to all the other peers - await this.publishMessage( - this.components.peerId, - await new Hello({ - multiaddrs: this.components.addressManager - .getAddresses() - .map((x) => x.toString()), - data - }).sign(this.sign.bind(this)) - ); - } - public async relayMessage( from: PeerId, message: Message, to?: PeerStreams[] | PeerMap ) { if (this.canRelayMessage) { + + const fromHash = this.peerIdToPublicKey.get(from.toString())!.hashcode() + if (message instanceof DataMessage) { + if (!message.seekPath) { + message.to = message.to.filter(x => !this.badRoutes.get(fromHash)?.has(x)) + if (message.to.length === 0) { + logger.debug("Received a message to relay but canRelayMessage is false"); + return; + } + } + // TODO fix types + if (message.seekPath) { + this.traces.add(await sha256Base64Sync(message.id), fromHash) + } + } + + return this.publishMessage(from, message, to, true); + + } else { - logger.debug("received message we didn't subscribe to. Dropping."); + logger.debug("Received a message to relay but canRelayMessage is false"); } } - public async publishMessage( - from: PeerId, - message: Message, - to?: PeerStreams[] | PeerMap, - relayed?: boolean - ): Promise { - if (message instanceof DataMessage && !to) { - // message.to can be distant peers, but "to" are neighbours - const fanoutMap = new Map(); - - // Message to > 0 - if (message.to.length > 0) { - const missingPathsFor: string[] = []; - for (const to of message.to) { - const fromKey = this.peerIdToPublicKey - .get(from.toString()) - ?.hashcode(); - if (to === this.publicKeyHash || fromKey === to) { - continue; // don't send to me or backwards - } - const directStream = this.peers.get(to); - if (directStream) { - // always favor direct stream, even path seems longer - const fanout = fanoutMap.get(to); + // for all tos if + private resolveSendFanout(from: PeerId, tos: string[]) { + const fanoutMap = new Map(); + + // Message to > 0 + if (tos.length > 0) { + const missingPathsFor: string[] = []; + for (const to of tos) { + const fromKey = this.peerIdToPublicKey + .get(from.toString()) + ?.hashcode(); + if (to === this.publicKeyHash || fromKey === to) { + continue; // don't send to me or backwards + } + + const directStream = this.peers.get(to); + if (directStream) { + // always favor direct stream, even path seems longer + const fanout = fanoutMap.get(to); + if (!fanout) { + fanoutMap.set(to, [to]); + } else { + fanout.push(to); + } + continue; + } else { + const neighbour = this.routes.findNeighbor(to); + if (neighbour) { + const fanout = fanoutMap.get(neighbour); if (!fanout) { - fanoutMap.set(to, [to]); - } else { - fanout.push(to); + fanoutMap.set(neighbour, [to]); } - continue; - } else { - const fromMe = from.equals(this.components.peerId); - const block = !fromMe ? fromKey : undefined; - const path = this.routes.getPath(this.publicKeyHash, to, { - block // prevent send message backwards - }); + else { + fanout.push(to) - if (path && path.length > 0) { - const fanout = fanoutMap.get(path[1]); - if (!fanout) { - fanoutMap.set(path[1], [to]); - if ( + /// TODO + /* if ( this.connectionManagerOptions.autoDial && path.length >= 3 ) { @@ -1418,48 +1370,120 @@ export abstract class DirectStream< ); }); } - continue; - } else { - fanout.push(to); - continue; - } - } else { - missingPathsFor.push(to); + */ } + + continue; + } + else { + missingPathsFor.push(to); - // we can't find path, send message to all peers - fanoutMap.clear(); - break; + } } - // update to's - let sentOnce = false; - if (missingPathsFor.length === 0) { - if (fanoutMap.size > 0) { - for (const [neighbour, distantPeers] of fanoutMap) { - message.to = distantPeers; - const bytes = message.bytes(); - if (!sentOnce) { - // if relayed = true, we have already added it to seenCache - if (!relayed) { - this.seenCache.add(await getMsgId(bytes)); - } - sentOnce = true; - } + // we can't find path, send message to all peers + fanoutMap.clear(); + break; + } - const stream = this.peers.get(neighbour); - stream?.waitForWrite(bytes).catch((e) => { - logger.error("Failed to publish message: " + e.message); - }); + } + return fanoutMap + } + + public async publishMessage( + from: PeerId, + message: Message, + to?: PeerStreams[] | PeerMap, + relayed?: boolean + ): Promise { + + if (message instanceof DataMessage && message.seekPath) { + + const idString = await sha256Base64(message.id) + let reachableNodes = new Set() + let timeout = setTimeout(() => { + this._ackCallbacks.delete(idString); + + // peer not reachable! + if (message.to) { + for (const to of message.to) { + if (!reachableNodes.has(to)) { + // TODO types + this.routes.removeTarget(to); + this.onPeerUnreachable(to) } - return; // we are done sending the message in all direction with updates 'to' lists } - return; + } - // else send to all (fallthrough to code below) + }, 3000) + + this._ackCallbacks.set(idString, { + callback: (ack, neighbour, backPeer) => { + + // TODO types + const target = message.signatures!.publicKeys[0].hashcode(); + reachableNodes.add(target) + if (ack.seenCounter === 0) { + // The first ACK for a specific message will come here. I.e. will be the shortest path for a 2 way communication + + // Update the optimal path to the target + this.routes.removeTarget(target) + console.log("ADD TARGET", target) + this.routes.add(neighbour.publicKey.hashcode(), target) + + } + else { + // here is an IMPORTANT step for reducing duplicated relaying activities in a network + // DATA travelled A -> THIS PEER -> B + // and ACK travelled B -> THIS PEER -> A + // if seenCounter > 0 then that means A is sending messages to B in another ROUTE + // hence next time we receive a message from A with B as destination, drop it until A sends a message to B with a new acknowledge request (basically a reset) + if (backPeer) { + let set = this.badRoutes.get(backPeer.publicKey.hashcode()) + if (!set) { + set = new Set(); + this.badRoutes.set(backPeer.publicKey.hashcode(), set) + } + set.add(ack.signatures[0].publicKey.hashcode()) + } + } + + }, timeout + }) + } + + if (message instanceof DataMessage && !message.seekPath && !to) { + + // message.to can be distant peers, but "to" are neighbours + const fanout = this.resolveSendFanout(from, message.to) + + + // update to's + let sentOnce = false; + const bytes = message.bytes(); + if (!relayed) { + await this.modifySeenCache(bytes); } + + if (fanout.size > 0) { + for (const [neighbour, _distantPeers] of fanout) { + if (!sentOnce) { + // if relayed = true, we have already added it to seenCache + sentOnce = true; + } + + const stream = this.peers.get(neighbour); + stream?.waitForWrite(bytes).catch((e) => { + logger.error("Failed to publish message: " + e.message); + }); + } + return; // we are done sending the message in all direction with updates 'to' lists + } + // else send to all (fallthrough to code below) + + } // We fils to send the message directly, instead fallback to floodsub @@ -1469,7 +1493,7 @@ export abstract class DirectStream< (Array.isArray(peers) && peers.length === 0) || (peers instanceof Map && peers.size === 0) ) { - logger.debug("no peers are subscribed"); + logger.debug("No peers to send to"); return; } @@ -1498,79 +1522,79 @@ export abstract class DirectStream< throw new Error("Message did not have any valid receivers. "); } } - - async maybeConnectDirectly(toHash: string) { - if (this.peers.has(toHash)) { - return; // TODO, is this expected, or are we to dial more addresses? - } - - // Try to either connect directly - if (!this.recentDials.has(toHash)) { - this.recentDials.add(toHash); - const addrs = this.multiaddrsMap.get(toHash); - try { - if (addrs && addrs.length > 0) { - await this.components.connectionManager.openConnection( - addrs.map((x) => multiaddr(x)) - ); - return; + /* + async maybeConnectDirectly(toHash: string) { + if (this.peers.has(toHash)) { + return; // TODO, is this expected, or are we to dial more addresses? + } + + // Try to either connect directly + if (!this.recentDials.has(toHash)) { + this.recentDials.add(toHash); + const addrs = this.multiaddrsMap.get(toHash); + try { + if (addrs && addrs.length > 0) { + await this.components.connectionManager.openConnection( + addrs.map((x) => multiaddr(x)) + ); + return; + } + } catch (error) { + // continue regardless of error } - } catch (error) { - // continue regardless of error } - } - - // Connect through a closer relay that maybe does holepunch for us - const neighbours = this.routes.graph.neighbors(toHash); - outer: for (const neighbour of neighbours) { - const routeKey = neighbour + toHash; - if (!this.recentDials.has(routeKey)) { - this.recentDials.add(routeKey); - const to = this.peerKeyHashToPublicKey.get(toHash)! as Ed25519PublicKey; - const toPeerId = await to.toPeerId(); - const addrs = this.multiaddrsMap.get(neighbour); - if (addrs && addrs.length > 0) { - const addressesToDial = addrs.sort((a, b) => { - if (a.includes("/wss/")) { - if (b.includes("/wss/")) { - return 0; - } - return -1; - } - if (a.includes("/ws/")) { - if (b.includes("/ws/")) { - return 0; + + // Connect through a closer relay that maybe does holepunch for us + const neighbours = this.routes.graph.neighbors(toHash); + outer: for (const neighbour of neighbours) { + const routeKey = neighbour + toHash; + if (!this.recentDials.has(routeKey)) { + this.recentDials.add(routeKey); + const to = this.peerKeyHashToPublicKey.get(toHash)! as Ed25519PublicKey; + const toPeerId = await to.toPeerId(); + const addrs = this.multiaddrsMap.get(neighbour); + if (addrs && addrs.length > 0) { + const addressesToDial = addrs.sort((a, b) => { + if (a.includes("/wss/")) { + if (b.includes("/wss/")) { + return 0; + } + return -1; } - if (b.includes("/wss/")) { - return 1; + if (a.includes("/ws/")) { + if (b.includes("/ws/")) { + return 0; + } + if (b.includes("/wss/")) { + return 1; + } + return -1; } - return -1; - } - return 0; - }); - - for (const addr of addressesToDial) { - const circuitAddress = multiaddr( - addr + "/p2p-circuit/webrtc/p2p/" + toPeerId.toString() - ); - try { - await this.components.connectionManager.openConnection( - circuitAddress + return 0; + }); + + for (const addr of addressesToDial) { + const circuitAddress = multiaddr( + addr + "/p2p-circuit/webrtc/p2p/" + toPeerId.toString() ); - break outer; // We succeeded! that means we dont have to try anymore - } catch (error: any) { - logger.warn( - "Failed to connect directly to: " + + try { + await this.components.connectionManager.openConnection( + circuitAddress + ); + break outer; // We succeeded! that means we dont have to try anymore + } catch (error: any) { + logger.warn( + "Failed to connect directly to: " + circuitAddress.toString() + ". " + error?.message - ); + ); + } } } } } - } - } + } */ async waitFor(peer: PeerId | PublicSignKey) { const hash = ( @@ -1581,7 +1605,7 @@ export abstract class DirectStream< if (!this.peers.has(hash)) { return false; } - if (!this.routes.hasLink(this.publicKeyHash, hash)) { + if (!this.routes.isReachable(hash)) { return false; } @@ -1590,11 +1614,11 @@ export abstract class DirectStream< } catch (error) { throw new Error( "Stream to " + - hash + - " does not exist. Connection exist: " + - this.peers.has(hash) + - ". Route exist: " + - this.routes.hasLink(this.publicKeyHash, hash) + hash + + " does not exist. Connection exist: " + + this.peers.has(hash) + + ". Route exist: " + + this.routes.isReachable(hash) ); } const stream = this.peers.get(hash)!; @@ -1603,11 +1627,11 @@ export abstract class DirectStream< } catch (error) { throw new Error( "Stream to " + - stream.publicKey.hashcode() + - " not ready. Readable: " + - stream.isReadable + - ". Writable " + - stream.isWritable + stream.publicKey.hashcode() + + " not ready. Readable: " + + stream.isReadable + + ". Writable " + + stream.isWritable ); } } diff --git a/packages/transport/stream/src/routes.ts b/packages/transport/stream/src/routes.ts index ee8d85772..c51841de2 100644 --- a/packages/transport/stream/src/routes.ts +++ b/packages/transport/stream/src/routes.ts @@ -1,283 +1,55 @@ -import Graphs from "graphology"; -import type { MultiUndirectedGraph } from "graphology"; -import { dijkstra, unweighted } from "graphology-shortest-path"; -import { logger } from "./logger.js"; -import { MinimalEdgeMapper } from "graphology-utils/getters"; - -interface EdgeData { - weight: number; - time: number; -} export class Routes { - graph: MultiUndirectedGraph; - private peerId: string; - constructor(peerId: string) { - this.peerId = peerId; - this.graph = new (Graphs as any).UndirectedGraph(); - } - - get linksCount() { - return this.graph.edges().length; - } - - get nodeCount() { - return this.graph.nodes().length; - } - - /** - * - * @param from - * @param to - * @returns new nodes - */ - addLink( - from: string, - to: string, - weight: number, - origin: string = this.peerId - ): string[] { - const linkExisted = this.hasLink(from, to); - const newReachableNodesFromOrigin: string[] = []; - if (!linkExisted) { - const currentTime = +new Date(); - const fromWasReachable = - origin == from || - this.getPath(origin, from, { unweighted: true }).length; - const toWasReachable = - origin === to || this.getPath(origin, to, { unweighted: true }).length; - const fromIsNowReachable = toWasReachable; - const toIsNowReachable = fromWasReachable; - - const visited = new Set(); - const newReachableNodes: string[] = []; - if (fromIsNowReachable) { - newReachableNodes.push(from); - } - if (toIsNowReachable) { - newReachableNodes.push(to); - } - if (fromWasReachable) { - visited.add(from); - } - if (toWasReachable) { - visited.add(to); - } - - if (!this.graph.hasNode(from)) { - this.graph.addNode(from); - } - if (!this.graph.hasNode(to)) { - this.graph.addNode(to); - } - - this.graph.addUndirectedEdge(from, to, { weight, time: currentTime }); - - for (const newReachableNode of newReachableNodes) { - // get all nodes from this and add them to the new reachable set of nodes one can access from origin - - const stack = [newReachableNode]; // iterate from the not reachable node - while (stack.length > 0) { - const node = stack.shift(); - if (!node) { - continue; - } - if (visited.has(node)) { - continue; - } - - visited.add(node); - const neighbors = this.graph.neighbors(node); - for (const neighbor of neighbors) { - const edge = this.graph.undirectedEdge(node, neighbor); - if (!edge) { - logger.warn(`Missing edge between: ${node} - ${neighbor}`); - continue; - } - - const attributes = this.graph.getEdgeAttributes(edge); - if (attributes.time > currentTime) { - continue; // a new link has been added while we are iterating, dont follow this path - } - - if (visited.has(neighbor)) { - continue; - } - - stack.push(neighbor); - } - newReachableNodesFromOrigin.push(node); - } - } - } else { - // update weight - const edge = this.graph.undirectedEdge(from, to); - this.graph.setEdgeAttribute(edge, "weight", weight); - this.graph.setEdgeAttribute(edge, "time", +new Date()); - } - - return newReachableNodesFromOrigin; - } - - /** - * - * @param from - * @param to - * @param origin - * @returns nodes that are no longer reachable from origin - */ - deleteLink(from: string, to: string, origin: string = this.peerId): string[] { - const link = this.getLink(from, to); - if (link) { - const date = +new Date(); - const fromWasReachable = - origin == from || - this.getPath(origin, from, { unweighted: true }).length; - const toWasReachable = - origin === to || this.getPath(origin, to, { unweighted: true }).length; - this.graph.dropEdge(link); - - const unreachableNodesFromOrigin: string[] = []; - if ( - fromWasReachable && - origin !== from && - this.getPath(origin, from, { unweighted: true }).length === 0 - ) { - unreachableNodesFromOrigin.push(from); - } - if ( - toWasReachable && - origin !== to && - this.getPath(origin, to, { unweighted: true }).length === 0 - ) { - unreachableNodesFromOrigin.push(to); - } - - // remove subgraphs that are now disconnected from me - for (const disconnected of [...unreachableNodesFromOrigin]) { - const node = disconnected; - if (!this.graph.hasNode(node)) { - continue; - } - - const stack = [disconnected]; - const visited = new Set(); - while (stack.length > 0) { - const node = stack.shift(); - const nodeId = node; - if (!nodeId || !this.graph.hasNode(nodeId)) { - continue; - } - if (visited.has(nodeId)) { - continue; - } - - visited.add(nodeId); - - const neighbors = this.graph.neighbors(node); - - for (const neighbor of neighbors) { - const edge = this.graph.undirectedEdge(node, neighbor); - if (!edge) { - logger.warn(`Missing edge between: ${node} - ${neighbor}`); - continue; - } - const attributes = this.graph.getEdgeAttributes(edge); - if (attributes.time > date) { - continue; // don't follow path because this is a new link that might provide some new connectivity - } - - if (visited.has(neighbor)) { - continue; - } - - stack.push(neighbor); - } - this.graph.dropNode(nodeId); - if (disconnected !== nodeId) { - unreachableNodesFromOrigin.push(nodeId.toString()); - } - } - } - return unreachableNodesFromOrigin; - } - return []; - } - - getLink(from: string, to: string): string | undefined { - if (!this.graph.hasNode(from) || !this.graph.hasNode(to)) { - return undefined; - } - - const edges = this.graph.edges(from, to); - if (edges.length > 1) { - throw new Error("Unexpected edge count: " + edges.length); - } - if (edges.length > 0) { - return edges[0]; - } - return undefined; - } - - getLinkData(from: string, to: string): EdgeData | undefined { - const edgeId = this.getLink(from, to); - if (edgeId) return this.graph.getEdgeAttributes(edgeId); - return undefined; - } - - hasLink(from: string, to: string): boolean { - return this.graph.hasEdge(from, to); - } - hasNode(node: string): boolean { - return this.graph.hasNode(node); - } - - getPath( - from: string, - to: string, - options?: { unweighted?: boolean } | { block?: string } - ): unweighted.ShortestPath | dijkstra.BidirectionalDijstraResult { - try { - let getEdgeWeight: - | keyof EdgeData - | MinimalEdgeMapper = (edge) => - this.graph.getEdgeAttribute(edge, "weight"); - const blockId = (options as { block?: string })?.block; - if (blockId) { - const neighBourEdges = new Set( - this.graph - .inboundNeighbors(blockId) - .map((x) => this.graph.edges(x, blockId)) - .flat() - ); - getEdgeWeight = (edge) => { - if (neighBourEdges.has(edge)) { - return Number.MAX_SAFE_INTEGER; - } - return this.graph.getEdgeAttribute(edge, "weight"); - }; - } - - // TODO catching for network changes and resuse last result - const path = - ((options as { unweighted?: boolean })?.unweighted - ? unweighted.bidirectional(this.graph, from, to) - : dijkstra.bidirectional(this.graph, from, to, getEdgeWeight)) || []; - if (path?.length > 0 && path[0] !== from) { - path.reverse(); - } - - if (blockId) { - if (path.includes(blockId)) { - return []; // Path does not exist, as we go through a blocked node with inifite weight - } - } - return path as any; // TODO fix types - } catch (error) { - return []; - } - } - clear() { - this.graph.clear(); - } -} + // END receiver -> Neighbour + private routes: Map + + constructor(routes: Map = new Map()) { + this.routes = routes; + } + + clear() { + this.routes.clear() + } + + add(neighbour: string, target: string) { + this.routes.set(target, neighbour) + } + + removeTarget(to: string) { + return this.routes.delete(to) + } + + removeNeighbour(neighbour: string) { + const removed: string[] = [] + for (const [k, v] of this.routes) { + if (v === neighbour) { + removed.push(k) + } + } + const set = this.routes.get(neighbour) + removed.push(neighbour) + this.routes.delete(neighbour) + + // Return all unreachable + return removed.filter(x => !this.routes.has(x)) + } + + findNeighbor(target: string) { + return this.routes.get(target) + } + + isReachable(target: string) { + return this.routes.has(target); + } + + count() { + let set: Set = new Set(); + for (const [k, v] of this.routes) { + set.add(k) + for (const el of v) { + set.add(el) + } + } + return set.size + } +} \ No newline at end of file diff --git a/packages/utils/any-store/test/app/src/index.tsx b/packages/utils/any-store/test/app/src/index.tsx index e4a227a96..6d1a11a65 100644 --- a/packages/utils/any-store/test/app/src/index.tsx +++ b/packages/utils/any-store/test/app/src/index.tsx @@ -1,6 +1,5 @@ import React from "react"; import ReactDOM from "react-dom/client"; -import { MemoryLevel } from "memory-level"; import { createStore } from "@peerbit/any-store"; /* import { expect } from '@jest/globals'; */ diff --git a/packages/utils/crypto/src/index.ts b/packages/utils/crypto/src/index.ts index d81825a16..ebe04fdba 100644 --- a/packages/utils/crypto/src/index.ts +++ b/packages/utils/crypto/src/index.ts @@ -14,4 +14,5 @@ export * from "./signer.js"; export * from "./keychain.js"; import libsodium from "libsodium-wrappers"; const ready = libsodium.ready; // TODO can we export ready directly ? + export { ready }; diff --git a/yarn.lock b/yarn.lock index 75ed847a3..19196f2d8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3453,11 +3453,6 @@ js-yaml "^3.10.0" tslib "^2.4.0" -"@yomguithereal/helpers@^1.1.1": - version "1.1.1" - resolved "https://registry.yarnpkg.com/@yomguithereal/helpers/-/helpers-1.1.1.tgz#185dfb0f88ca2beec53d0adf6eed15c33b1c549d" - integrity sha512-UYvAq/XCA7xoh1juWDYsq3W0WywOB+pz8cgVnE1b45ZfdMhBvHDrgmSFG3jXeZSr2tMTYLGHFHON+ekG05Jebg== - "@zkochan/js-yaml@0.0.6": version "0.0.6" resolved "https://registry.yarnpkg.com/@zkochan/js-yaml/-/js-yaml-0.0.6.tgz#975f0b306e705e28b8068a07737fa46d3fc04826" @@ -3493,7 +3488,7 @@ abortable-iterator@^5.0.1: get-iterator "^2.0.0" it-stream-types "^2.0.1" -abstract-level@^1.0.0, abstract-level@^1.0.2, abstract-level@^1.0.3: +abstract-level@^1.0.0, abstract-level@^1.0.2: version "1.0.3" resolved "https://registry.yarnpkg.com/abstract-level/-/abstract-level-1.0.3.tgz#78a67d3d84da55ee15201486ab44c09560070741" integrity sha512-t6jv+xHy+VYwc4xqZMn2Pa9DjcdzvzZmQGRjTFc8spIbRGHgBrEKbPq+rYXc7CCo0lxgYvSgKVg9qZAhpVQSjA== @@ -5759,42 +5754,6 @@ graphemer@^1.4.0: resolved "https://registry.yarnpkg.com/graphemer/-/graphemer-1.4.0.tgz#fb2f1d55e0e3a1849aeffc90c4fa0dd53a0e66c6" integrity sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag== -graphology-indices@^0.17.0: - version "0.17.0" - resolved "https://registry.yarnpkg.com/graphology-indices/-/graphology-indices-0.17.0.tgz#b93ad32162ff8b09814547aedb101248f0fcbd2e" - integrity sha512-A7RXuKQvdqSWOpn7ZVQo4S33O0vCfPBnUSf7FwE0zNCasqwZVUaCXePuWo5HBpWw68KJcwObZDHpFk6HKH6MYQ== - dependencies: - graphology-utils "^2.4.2" - mnemonist "^0.39.0" - -graphology-shortest-path@2.0.2: - version "2.0.2" - resolved "https://registry.yarnpkg.com/graphology-shortest-path/-/graphology-shortest-path-2.0.2.tgz#01385d2c0452ead890da2203b30d00177397678e" - integrity sha512-hlGvh4Yb1Vmd2J7wT8Q8+t4RQ6Tx+9wRYm0/fZB9PZJ4uW3nml5kJ7yXZ2+JYWT+7wLLmY5mg3o9bLSAWmv/jQ== - dependencies: - "@yomguithereal/helpers" "^1.1.1" - graphology-indices "^0.17.0" - graphology-utils "^2.4.3" - mnemonist "^0.39.0" - -graphology-types@^0.24.7: - version "0.24.7" - resolved "https://registry.yarnpkg.com/graphology-types/-/graphology-types-0.24.7.tgz#7d630a800061666bfa70066310f56612e08b7bee" - integrity sha512-tdcqOOpwArNjEr0gNQKCXwaNCWnQJrog14nJNQPeemcLnXQUUGrsCWpWkVKt46zLjcS6/KGoayeJfHHyPDlvwA== - -graphology-utils@^2.4.2, graphology-utils@^2.4.3: - version "2.5.2" - resolved "https://registry.yarnpkg.com/graphology-utils/-/graphology-utils-2.5.2.tgz#4d30d6e567d27c01f105e1494af816742e8d2440" - integrity sha512-ckHg8MXrXJkOARk56ZaSCM1g1Wihe2d6iTmz1enGOz4W/l831MBCKSayeFQfowgF8wd+PQ4rlch/56Vs/VZLDQ== - -graphology@0.25.1: - version "0.25.1" - resolved "https://registry.yarnpkg.com/graphology/-/graphology-0.25.1.tgz#f92b86294782522d3898ce4480e4a577c0c2568a" - integrity sha512-yYA7BJCcXN2DrKNQQ9Qf22zBHm/yTbyBR71T1MYBbGtywNHsv0QZtk8zaR6zxNcp2hCCZayUkHp9DyMSZCpoxQ== - dependencies: - events "^3.3.0" - obliterator "^2.0.2" - handlebars@^4.7.7: version "4.7.8" resolved "https://registry.yarnpkg.com/handlebars/-/handlebars-4.7.8.tgz#41c42c18b1be2365439188c77c6afae71c0cd9e9" @@ -7813,13 +7772,6 @@ mkdirp@^1.0.3: resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e" integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== -mnemonist@^0.39.0: - version "0.39.5" - resolved "https://registry.yarnpkg.com/mnemonist/-/mnemonist-0.39.5.tgz#5850d9b30d1b2bc57cc8787e5caa40f6c3420477" - integrity sha512-FPUtkhtJ0efmEFGpU14x7jGbTB+s18LrzRL2KgoWz9YvcY3cPomz8tih01GbHwnGk/OmkOKfqd/RAQoc8Lm7DQ== - dependencies: - obliterator "^2.0.1" - modify-values@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/modify-values/-/modify-values-1.0.1.tgz#b3939fa605546474e3e3e3c63d64bd43b4ee6022" @@ -8263,11 +8215,6 @@ object.assign@^4.1.4: has-symbols "^1.0.3" object-keys "^1.1.1" -obliterator@^2.0.1, obliterator@^2.0.2: - version "2.0.4" - resolved "https://registry.yarnpkg.com/obliterator/-/obliterator-2.0.4.tgz#fa650e019b2d075d745e44f1effeb13a2adbe816" - integrity sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ== - observable-webworkers@^2.0.1: version "2.0.1" resolved "https://registry.yarnpkg.com/observable-webworkers/-/observable-webworkers-2.0.1.tgz#7d9086ebc567bd318b46ba0506b10cedf3813878"