Skip to content

Commit

Permalink
feat!: rateless-iblt-sync protocol
Browse files Browse the repository at this point in the history
By defualt uses Rateless IBLT syncing protocol. To use previous syncing protocl use 'compatibility: 7' for the document store
  • Loading branch information
marcus-pousette committed Dec 28, 2024
1 parent cae61fb commit 43c3c9c
Show file tree
Hide file tree
Showing 55 changed files with 13,461 additions and 6,673 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ describe("index", () => {
it("replicates by default", async () => {
const s = new TestStore({ publicKey: session.peers[0].peerId });
const l0a = await session.peers[0].open(s);
const checkRole = async (log: SharedLog<any>) => {
const checkRole = async (log: SharedLog<any, any>) => {
expect(await log.isReplicating()).to.be.true;
expect(
(await log.getMyReplicationSegments()).reduce(
Expand Down
51 changes: 15 additions & 36 deletions packages/programs/data/document/document/benchmark/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { field, option, variant } from "@dao-xyz/borsh";
import { Program, type ProgramClient } from "@peerbit/program";
import { TestSession } from "@peerbit/test-utils";
import B from "benchmark";
import crypto from "crypto";
import * as B from "tinybench";
import { v4 as uuid } from "uuid";
import { Documents, type SetupOptions } from "../src/program.js";

Expand Down Expand Up @@ -68,41 +68,20 @@ await client.open(store, {
},
});

const resolver: Map<string, () => void> = new Map();
store.docs.events.addEventListener("change", (change) => {
change.detail.added.forEach((doc) => {
resolver.get(doc.id)!();
resolver.delete(doc.id);
const suite = new B.Bench({ name: "put", warmupIterations: 1000 });
suite.add("put", async () => {
const doc = new Document({
id: uuid(),
name: "hello",
number: 1n,
bytes: crypto.randomBytes(1200),
});
await store.docs.put(doc, {
unique: true,
});
});

const suite = new B.Suite();
suite
.add("put", {
fn: async (deferred: any) => {
const doc = new Document({
id: uuid(),
name: "hello",
number: 1n,
bytes: crypto.randomBytes(1200),
});
resolver.set(doc.id, () => {
deferred.resolve();
});
await store.docs.put(doc, { unique: true });
},

minSamples: 300,
defer: true,
})
.on("cycle", (event: any) => {
console.log(String(event.target));
})
.on("error", (err: any) => {
throw err;
})
.on("complete", async function (this: any, ...args: any[]) {
await store.drop();
await session.stop();
})
.run();
await suite.run();
console.table(suite.table());
await store.drop();
await session.stop();
5 changes: 3 additions & 2 deletions packages/programs/data/document/document/src/domain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ import {
} from "../src/index.js";

type RangeArgs = { from: number; to: number };
export type CustomDomain = ReplicationDomain<RangeArgs, Operation>;
export type CustomDomain = ReplicationDomain<RangeArgs, Operation, "u32">;

export const createDocumentDomain = <T extends object>(
db: Documents<T, any, CustomDomain>,
options: {
fromValue: (value: T) => number;
fromMissing?: (
entry: EntryReplicated | ShallowEntry | Entry<Operation>,
entry: EntryReplicated<"u32"> | ShallowEntry | Entry<Operation>,
) => number;
},
): CustomDomain => {
const fromValue = options.fromValue;
const fromMissing = options.fromMissing || (() => 0xffffffff);
return {
type: "custom",
resolution: "u32",
fromArgs(args, log) {
if (!args) {
return { offset: log.node.identity.publicKey };
Expand Down
28 changes: 15 additions & 13 deletions packages/programs/data/document/document/src/program.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export type CanPerform<T> = (
export type SetupOptions<
T,
I = T,
D extends ReplicationDomain<any, Operation> = any,
D extends ReplicationDomain<any, Operation, any> = any,
> = {
type: AbstractType<T>;
canOpen?: (program: T) => MaybePromise<boolean>;
Expand All @@ -93,17 +93,17 @@ export type SetupOptions<
log?: {
trim?: TrimOptions;
};
compatibility?: 6;
compatibility?: 6 | 7;
} & Exclude<SharedLogOptions<Operation, D>, "compatibility">;

export type ExtractArgs<T> =
T extends ReplicationDomain<infer Args, any> ? Args : never;
T extends ReplicationDomain<infer Args, any, any> ? Args : never;

@variant("documents")
export class Documents<
T,
I extends Record<string, any> = T extends Record<string, any> ? T : any,
D extends ReplicationDomain<any, Operation> = any,
D extends ReplicationDomain<any, Operation, any> = any,
> extends Program<SetupOptions<T, I, D>, DocumentEvents<T> & ProgramEvents> {
@field({ type: SharedLog })
log: SharedLog<Operation, D>;
Expand All @@ -121,7 +121,7 @@ export class Documents<

canOpen?: (program: T, entry: Entry<Operation>) => Promise<boolean> | boolean;

compatibility: 6 | undefined;
compatibility: 6 | 7 | undefined;

constructor(properties?: {
id?: Uint8Array;
Expand Down Expand Up @@ -183,6 +183,14 @@ export class Documents<
dbType: this.constructor,
});

// document v6 and below need log compatibility of v8 or below
// document v7 needs log compatibility of v9
let logCompatiblity: number | undefined = undefined;
if (options.compatibility === 6) {
logCompatiblity = 8;
} else if (options.compatibility === 7) {
logCompatiblity = 9;
}
await this.log.open({
encoding: BORSH_ENCODING_OPERATION,
canReplicate: options?.canReplicate,
Expand All @@ -192,10 +200,7 @@ export class Documents<
replicate: options?.replicate,
replicas: options?.replicas,
domain: options?.domain,

// document v6 and below need log compatibility of v8 or below
compatibility:
(options?.compatibility ?? Number.MAX_SAFE_INTEGER < 7) ? 8 : undefined,
compatibility: logCompatiblity,
});
}

Expand Down Expand Up @@ -562,10 +567,7 @@ export class Documents<
// Program specific
if (value instanceof Program) {
// if replicator, then open
if (
(await this.canOpen!(value, item)) &&
(await this.log.isReplicator(item)) // TODO types, throw runtime error if replicator is not provided
) {
if (await this.canOpen!(value, item)) {
value = (await this.node.open(value, {
parent: this as Program<any, any>,
existing: "reuse",
Expand Down
15 changes: 13 additions & 2 deletions packages/programs/data/document/document/src/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ const isTransformerWithFunction = <T, I>(
return (options as TransformerAsFunction<T, I>).transform != null;
};

export type OpenOptions<T, I, D extends ReplicationDomain<any, Operation>> = {
export type OpenOptions<
T,
I,
D extends ReplicationDomain<any, Operation, any>,
> = {
documentType: AbstractType<T>;
dbType: AbstractType<types.IDocumentStore<T>>;
log: SharedLog<Operation, D>;
Expand All @@ -212,7 +216,7 @@ type IndexableClass<I> = new (
export class DocumentIndex<
T,
I extends Record<string, any>,
D extends ReplicationDomain<any, Operation>,
D extends ReplicationDomain<any, Operation, any>,
> extends Program<OpenOptions<T, I, D>> {
@field({ type: RPC })
_query: RPC<types.AbstractSearchRequest, types.AbstractSearchResult<T>>;
Expand Down Expand Up @@ -435,6 +439,13 @@ export class DocumentIndex<
)?.[0]?.results[0]?.value;
}

public async getFromGid(gid: string) {
const iterator = this.index.iterate({ query: { gid } });
const one = await iterator.next(1);
await iterator.close();
return one[0];
}

public async put(value: T, entry: Entry<Operation>, id: indexerTypes.IdKey) {
const idString = id.primitive;
if (this._isProgramValues) {
Expand Down
4 changes: 3 additions & 1 deletion packages/programs/data/document/document/test/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ export class Document {

@variant("test_documents")
export class TestStore<
D extends ReplicationDomain<any, Operation> = ReplicationDomainHash,
D extends ReplicationDomain<any, Operation, any> = ReplicationDomainHash<
"u32" | "u64"
>,
> extends Program<Partial<SetupOptions<Document, Document, D>>> {
@field({ type: Uint8Array })
id: Uint8Array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ describe("domain", () => {
from: 2,
to: 3,
},
eager: true,
},
},
);
Expand Down
Loading

0 comments on commit 43c3c9c

Please sign in to comment.