Skip to content

Commit

Permalink
feat(broker): new BrokerKeyValue and broker documentation
Browse files Browse the repository at this point in the history
This completes the Broker API and adds the first version of documentation.
  • Loading branch information
marcj committed May 21, 2024
1 parent 6ad04d3 commit 1f53bc8
Show file tree
Hide file tree
Showing 16 changed files with 798 additions and 42 deletions.
1 change: 1 addition & 0 deletions packages/broker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export * from './src/kernel.js';
export * from './src/model.js';
export * from './src/broker.js';
export * from './src/broker-cache.js';
export * from './src/broker-key-value.js';
export * from './src/adapters/deepkit-adapter.js';
export * from './src/adapters/memory-adapter.js';
24 changes: 12 additions & 12 deletions packages/broker/src/adapters/deepkit-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
BrokerAdapterQueueProduceOptionsResolved,
BrokerQueueMessage,
BrokerTimeOptionsResolved,
Release
Release,
} from '../broker.js';
import { getTypeJitContainer, ReflectionKind, Type, TypePropertySignature } from '@deepkit/type';
import {
Expand All @@ -29,26 +29,21 @@ import {
brokerSet,
brokerSetCache,
BrokerType,
QueueMessageProcessing
QueueMessageProcessing,
} from '../model.js';
import {
ClientTransportAdapter,
createRpcMessage,
RpcBaseClient,
RpcMessage,
RpcMessageRouteType,
RpcWebSocketClientAdapter
RpcWebSocketClientAdapter,
} from '@deepkit/rpc';
import {
deserializeBSON,
deserializeBSONWithoutOptimiser,
getBSONDeserializer,
getBSONSerializer,
serializeBSON
} from '@deepkit/bson';
import { deserializeBSON, getBSONDeserializer, getBSONSerializer, serializeBSON } from '@deepkit/bson';
import { arrayRemoveItem } from '@deepkit/core';
import { BrokerCacheItemOptionsResolved } from '../broker-cache.js';
import { fastHash } from '../utils.js';
import { BrokerKeyValueOptionsResolved } from '../broker-key-value.js';

interface TypeSerialize {
encode(v: any): Uint8Array;
Expand Down Expand Up @@ -225,10 +220,10 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
return first.v && first.ttl !== undefined ? { value: serializer.decode(first.v, 0), ttl: first.ttl } : undefined;
}

async set(key: string, value: any, type: Type): Promise<void> {
async set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<void> {
const serializer = getSerializer(type);
const v = serializer.encode(value);
await this.pool.getConnection('key/' + key).sendMessage<brokerSet>(BrokerType.Set, { n: key, v }).ackThenClose();
await this.pool.getConnection('key/' + key).sendMessage<brokerSet>(BrokerType.Set, { n: key, v, ttl: options.ttl }).ackThenClose();
}

async get(key: string, type: Type): Promise<any> {
Expand All @@ -240,6 +235,11 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
}
}

async remove(key: string): Promise<any> {
await this.pool.getConnection('key/' + key)
.sendMessage<brokerGet>(BrokerType.Delete, { n: key }).ackThenClose();
}

async increment(key: string, value: any): Promise<number> {
const response = await this.pool.getConnection('increment/' + key)
.sendMessage<brokerIncrement>(BrokerType.Increment, { n: key, v: value })
Expand Down
122 changes: 122 additions & 0 deletions packages/broker/src/broker-key-value.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { parseTime } from './utils.js';
import { ReceiveType, resolveReceiveType, Type } from '@deepkit/type';
import { BrokerAdapterBase } from './broker.js';
import { ConsoleLogger, LoggerInterface } from '@deepkit/logger';

export interface BrokerKeyValueOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
}

export interface BrokerKeyValueOptionsResolved extends BrokerKeyValueOptions {
ttl: number;
}

export function parseBrokerKeyValueOptions(options: Partial<BrokerKeyValueOptions>): BrokerKeyValueOptionsResolved {
return {
ttl: parseTime(options.ttl) ?? 0,
};
}

export interface BrokerAdapterKeyValue extends BrokerAdapterBase {
get(key: string, type: Type): Promise<any>;

set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<any>;

remove(key: string): Promise<any>;

increment(key: string, value: any): Promise<number>;
}

export class BrokerKeyValueItem<T> {
constructor(
private key: string,
private type: Type,
private adapter: BrokerAdapterKeyValue,
private options: BrokerKeyValueOptionsResolved,
) {

}

/**
* @see BrokerKeyValue.get
*/
async get(): Promise<T> {
return this.adapter.get(this.key, this.type);
}

/**
* @see BrokerKeyValue.set
*/
async set(value: T): Promise<void> {
await this.adapter.set(this.key, value, this.options, this.type);
}

/**
* @see BrokerKeyValue.increment
*/
async increment(value: number): Promise<number> {
return this.adapter.increment(this.key, value);
}

async remove(): Promise<void> {
return this.adapter.remove(this.key);
}
}

export class BrokerKeyValue {
private config: BrokerKeyValueOptionsResolved;

constructor(
private adapter: BrokerAdapterKeyValue,
config: Partial<BrokerKeyValueOptions> = {},
private logger: LoggerInterface = new ConsoleLogger(),
) {
this.config = parseBrokerKeyValueOptions(config);
}

/**
* Returns a new BrokerKeyValueItem for the given key.
*/
item<T>(key: string, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): BrokerKeyValueItem<T> {
return new BrokerKeyValueItem(
key, resolveReceiveType(type), this.adapter,
parseBrokerKeyValueOptions(Object.assign({}, this.config, options)),
);
}

/**
* Returns the value for the given key.
*/
async get<T>(key: string, type?: ReceiveType<T>): Promise<T> {
return this.adapter.get(key, resolveReceiveType(type));
}

/**
* Sets the value for the given key.
*/
async set<T>(key: string, value: T, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): Promise<void> {
return this.adapter.set(key, value,
options ? parseBrokerKeyValueOptions(Object.assign({}, this.config, options)) : this.config,
resolveReceiveType(type),
);
}

async remove(key: string): Promise<void> {
return this.adapter.remove(key);
}

/**
* Increments the value for the given key by the given value.
* Note that this is not compatible to get/set, as it only works with numbers.
* Since this an atomic increment, there is no way to get the current value via `get` and then increment it,
* but you have to use `increment(0)` to get the current value.
*/
async increment(key: string, value: number): Promise<number> {
return this.adapter.increment(key, value);
}
}
9 changes: 1 addition & 8 deletions packages/broker/src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { EventToken } from '@deepkit/event';
import { parseTime } from './utils.js';
import { BrokerAdapterCache } from './broker-cache.js';
import { QueueMessageProcessing } from './model.js';
import { BrokerAdapterKeyValue } from './broker-key-value.js';

export interface BrokerTimeOptions {
/**
Expand Down Expand Up @@ -140,14 +141,6 @@ export interface BrokerAdapterQueue extends BrokerAdapterBase {
produce(name: string, message: any, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
}

export interface BrokerAdapterKeyValue extends BrokerAdapterBase {
get(key: string, type: Type): Promise<any>;

set(key: string, value: any, type: Type): Promise<any>;

increment(key: string, value: any): Promise<number>;
}

export const onBrokerLock = new EventToken('broker.lock');

export class BrokerQueueMessage<T> {
Expand Down
60 changes: 44 additions & 16 deletions packages/broker/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
brokerResponseGetCacheMeta,
brokerResponseIncrement,
brokerResponseIsLock,
brokerSet,
brokerSetCache,
BrokerType,
QueueMessage,
Expand Down Expand Up @@ -92,7 +93,10 @@ export class BrokerConnection extends RpcKernelBaseConnection {

for (const connection of this.connections.connections) {
if (connection === this) continue;
promises.push(connection.sendMessage<brokerEntityFields>(BrokerType.EntityFields, { name, fields }).ackThenClose());
promises.push(connection.sendMessage<brokerEntityFields>(BrokerType.EntityFields, {
name,
fields,
}).ackThenClose());
}

await Promise.all(promises);
Expand All @@ -108,7 +112,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
}
response.reply<brokerEntityFields>(
BrokerType.EntityFields,
{ name: body.name, fields: this.state.getEntityFields(body.name) }
{ name: body.name, fields: this.state.getEntityFields(body.name) },
);
break;
}
Expand All @@ -120,14 +124,17 @@ export class BrokerConnection extends RpcKernelBaseConnection {
}
response.reply<brokerEntityFields>(
BrokerType.EntityFields,
{ name: body.name, fields: this.state.getEntityFields(body.name) }
{ name: body.name, fields: this.state.getEntityFields(body.name) },
);
break;
}
case BrokerType.AllEntityFields: {
const composite = response.composite(BrokerType.AllEntityFields);
for (const name of this.state.entityFields.keys()) {
composite.add<brokerEntityFields>(BrokerType.EntityFields, { name, fields: this.state.getEntityFields(name) });
composite.add<brokerEntityFields>(BrokerType.EntityFields, {
name,
fields: this.state.getEntityFields(name),
});
}
composite.send();
break;
Expand Down Expand Up @@ -190,7 +197,11 @@ export class BrokerConnection extends RpcKernelBaseConnection {
}
case BrokerType.QueueMessageHandled: {
const body = message.parseBody<BrokerQueueMessageHandled>();
this.state.queueMessageHandled(body.c, this, body.id, { error: body.error, success: body.success, delay: body.delay });
this.state.queueMessageHandled(body.c, this, body.id, {
error: body.error,
success: body.success,
delay: body.delay,
});
response.ack();
break;
}
Expand Down Expand Up @@ -221,8 +232,8 @@ export class BrokerConnection extends RpcKernelBaseConnection {
break;
}
case BrokerType.Set: {
const body = message.parseBody<brokerSetCache>();
this.state.setKey(body.n, body.v);
const body = message.parseBody<brokerSet>();
this.state.setKey(body.n, body.v, body.ttl);
response.ack();
break;
}
Expand All @@ -246,7 +257,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
const message = createRpcMessage<brokerInvalidateCacheMessage>(
0, BrokerType.ResponseInvalidationCache,
{ key: body.n, ttl: entry.ttl },
RpcMessageRouteType.server
RpcMessageRouteType.server,
);

for (const connection of this.state.invalidationCacheMessageConnections) {
Expand All @@ -271,7 +282,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
case BrokerType.GetCacheMeta: {
const body = message.parseBody<brokerGetCache>();
const v = this.state.getCache(body.n);
response.reply<brokerResponseGetCacheMeta>(BrokerType.ResponseGetCacheMeta, v ? {ttl: v.ttl} : { missing: true });
response.reply<brokerResponseGetCacheMeta>(BrokerType.ResponseGetCacheMeta, v ? { ttl: v.ttl } : { missing: true });
break;
}
case BrokerType.Get: {
Expand Down Expand Up @@ -415,7 +426,7 @@ export class BrokerState {
if (!subscriptions) return;
const message = createRpcMessage<brokerBusResponseHandleMessage>(
0, BrokerType.ResponseSubscribeMessage,
{ c: channel, v: v }, RpcMessageRouteType.server
{ c: channel, v: v }, RpcMessageRouteType.server,
);

for (const connection of subscriptions) {
Expand Down Expand Up @@ -447,7 +458,16 @@ export class BrokerState {
public queuePublish(body: BrokerQueuePublish) {
const queue = this.getQueue(body.c);

const m: QueueMessage = { id: queue.currentId++, process: body.process, hash: body.hash, state: QueueMessageState.pending, tries: 0, v: body.v, delay: body.delay || 0, priority: body.priority };
const m: QueueMessage = {
id: queue.currentId++,
process: body.process,
hash: body.hash,
state: QueueMessageState.pending,
tries: 0,
v: body.v,
delay: body.delay || 0,
priority: body.priority,
};

if (body.process === QueueMessageProcessing.exactlyOnce) {
if (!body.deduplicationInterval) {
Expand All @@ -473,7 +493,7 @@ export class BrokerState {
m.lastError = undefined;
consumer.con.writer.write(createRpcMessage<BrokerQueueResponseHandleMessage>(
0, BrokerType.QueueResponseHandleMessage,
{ c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server
{ c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server,
));
}

Expand All @@ -483,7 +503,11 @@ export class BrokerState {
/**
* When a queue message has been sent to a consumer and the consumer answers.
*/
public queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: { error?: string, success: boolean, delay?: number }) {
public queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: {
error?: string,
success: boolean,
delay?: number
}) {
const queue = this.queues.get(queueName);
if (!queue) return;
const consumer = queue.consumers.find(v => v.con === connection);
Expand Down Expand Up @@ -518,13 +542,18 @@ export class BrokerState {
public increment(id: string, v?: number): number {
const buffer = this.keyStore.get(id);
const float64 = buffer ? new Float64Array(buffer.buffer, buffer.byteOffset) : new Float64Array(1);
float64[0] += v || 1;
float64[0] += v || 0;
if (!buffer) this.keyStore.set(id, new Uint8Array(float64.buffer));
return float64[0];
}

public setKey(id: string, data: Uint8Array) {
public setKey(id: string, data: Uint8Array, ttl: number) {
this.keyStore.set(id, data);
if (ttl > 0) {
setTimeout(() => {
this.keyStore.delete(id);
}, ttl);
}
}

public getKey(id: string): Uint8Array | undefined {
Expand All @@ -534,7 +563,6 @@ export class BrokerState {
public deleteKey(id: string) {
this.keyStore.delete(id);
}

}

export class BrokerKernel extends RpcKernel {
Expand Down
1 change: 1 addition & 0 deletions packages/broker/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export interface brokerResponseIncrement {
export interface brokerSet {
n: string,
v: Uint8Array,
ttl: number,
}

export interface brokerInvalidateCache {
Expand Down
Loading

0 comments on commit 1f53bc8

Please sign in to comment.