Skip to content

Commit

Permalink
fix!: simplify subscribe/unsubscribe messages
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Nov 1, 2023
1 parent 9bde1a8 commit 47577fe
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 115 deletions.
14 changes: 7 additions & 7 deletions packages/transport/pubsub-interface/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { PublicSignKey } from "@peerbit/crypto";
import { PubSubData, Subscription } from "./messages.js";
import { PubSubData } from "./messages.js";
import {
Message,
DataMessage,
Expand All @@ -15,10 +15,10 @@ export class SubscriptionEvent {
@field({ type: PublicSignKey })
from: PublicSignKey;

@field({ type: vec(Subscription) })
subscriptions: Subscription[];
@field({ type: vec("string") })
subscriptions: string[];

constructor(from: PublicSignKey, subscriptions: Subscription[]) {
constructor(from: PublicSignKey, subscriptions: string[]) {
this.from = from;
this.subscriptions = subscriptions;
}
Expand All @@ -28,10 +28,10 @@ export class UnsubcriptionEvent {
@field({ type: PublicSignKey })
from: PublicSignKey;

@field({ type: vec(Subscription) })
unsubscriptions: Subscription[];
@field({ type: vec("string") })
unsubscriptions: string[];

constructor(from: PublicSignKey, unsubscriptions: Subscription[]) {
constructor(from: PublicSignKey, unsubscriptions: string[]) {
this.from = from;
this.unsubscriptions = unsubscriptions;
}
Expand Down
45 changes: 10 additions & 35 deletions packages/transport/pubsub-interface/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
deserialize,
option
} from "@dao-xyz/borsh";
import { sha256Base64Sync } from "@peerbit/crypto";
export abstract class PubSubMessage {
abstract bytes(): Uint8Array | Uint8ArrayList;
static from(bytes: Uint8Array) {
Expand All @@ -24,6 +25,7 @@ export abstract class PubSubMessage {
if (first === 3) {
return GetSubscribers.from(bytes);
}

throw new Error("Unsupported");
}
}
Expand Down Expand Up @@ -77,27 +79,17 @@ export class PubSubData extends PubSubMessage {
}
}

@variant(0)
export class Subscription {
@field({ type: "string" })
topic: string;

constructor(topic: string) {
this.topic = topic;
}
}

@variant(1)
export class Subscribe extends PubSubMessage {
@field({ type: vec(Subscription) })
subscriptions: Subscription[];
@field({ type: vec("string") })
topics: string[];

constructor(options: { subscriptions: Subscription[] }) {
constructor(options: { topics: string[] }) {
super();
this.subscriptions = options.subscriptions;
this.topics = options.topics;
}

_serialized: Uint8ArrayList;
private _serialized: Uint8ArrayList;

bytes() {
if (this._serialized) {
Expand All @@ -115,29 +107,16 @@ export class Subscribe extends PubSubMessage {
}
return ret;
}

get topics() {
return this.subscriptions.map((x) => x.topic);
}
}

@variant(0)
export class Unsubscription {
@field({ type: "string" })
topic: string;
constructor(topic: string) {
this.topic = topic;
}
}

@variant(2)
export class Unsubscribe extends PubSubMessage {
@field({ type: vec(Unsubscription) })
unsubscriptions: Unsubscription[];
@field({ type: vec("string") })
topics: string[];

constructor(options: { topics: string[] }) {
super();
this.unsubscriptions = options.topics.map((x) => new Unsubscription(x));
this.topics = options.topics;
}

_serialized: Uint8ArrayList;
Expand All @@ -159,10 +138,6 @@ export class Unsubscribe extends PubSubMessage {
}
return ret;
}

get topics() {
return this.unsubscriptions.map((x) => x.topic);
}
}

@variant(3)
Expand Down
24 changes: 12 additions & 12 deletions packages/transport/pubsub/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ describe("pubsub", function () {
)
).toBeTrue();
expect(streams[2].subscriptionEvents[0].subscriptions).toHaveLength(1);
expect(streams[2].subscriptionEvents[0].subscriptions[0].topic).toEqual(
expect(streams[2].subscriptionEvents[0].subscriptions[0]).toEqual(
TOPIC_1
);

Expand Down Expand Up @@ -1072,9 +1072,9 @@ describe("pubsub", function () {
expect(streams[2].unsubscriptionEvents[0].unsubscriptions).toHaveLength(
1
);
expect(
streams[2].unsubscriptionEvents[0].unsubscriptions[0].topic
).toEqual(TOPIC_1);
expect(streams[2].unsubscriptionEvents[0].unsubscriptions[0]).toEqual(
TOPIC_1
);
});

it("can unsubscribe across peers", async () => {
Expand Down Expand Up @@ -1118,12 +1118,12 @@ describe("pubsub", function () {
)
).toBeTrue();
expect(streams[2].subscriptionEvents[0].subscriptions).toHaveLength(1);
expect(streams[2].subscriptionEvents[0].subscriptions[0].topic).toEqual(
expect(streams[2].subscriptionEvents[0].subscriptions[0]).toEqual(
TOPIC_1
);

expect(streams[2].subscriptionEvents[1].subscriptions).toHaveLength(1);
expect(streams[2].subscriptionEvents[1].subscriptions[0].topic).toEqual(
expect(streams[2].subscriptionEvents[1].subscriptions[0]).toEqual(
TOPIC_2
);

Expand Down Expand Up @@ -1162,9 +1162,9 @@ describe("pubsub", function () {
expect(streams[2].unsubscriptionEvents[0].unsubscriptions).toHaveLength(
1
);
expect(
streams[2].unsubscriptionEvents[0].unsubscriptions[0].topic
).toEqual(TOPIC_1);
expect(streams[2].unsubscriptionEvents[0].unsubscriptions[0]).toEqual(
TOPIC_1
);
streams[0].stream.unsubscribe(TOPIC_2);
await waitFor(
() =>
Expand Down Expand Up @@ -1200,9 +1200,9 @@ describe("pubsub", function () {
expect(streams[2].unsubscriptionEvents[1].unsubscriptions).toHaveLength(
1
);
expect(
streams[2].unsubscriptionEvents[1].unsubscriptions[0].topic
).toEqual(TOPIC_2);
expect(streams[2].unsubscriptionEvents[1].unsubscriptions[0]).toEqual(
TOPIC_2
);
});

it("can handle multiple subscriptions", async () => {
Expand Down
Loading

0 comments on commit 47577fe

Please sign in to comment.