From 6d2cda9af75bee473cfb1f203908124d2f9c318c Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Sun, 12 Jan 2025 19:48:05 +0100 Subject: [PATCH] feat: add merge functionality for replication domains for documents --- .../document/benchmark/iterate-replicate-2.ts | 151 ++++++++++++++ .../document/benchmark/iterate-replicate.ts | 193 ++++++++++++++++++ .../data/document/document/src/domain.ts | 190 ++++++++++++----- .../document/document/test/domain.spec.ts | 139 ++++++++++--- 4 files changed, 594 insertions(+), 79 deletions(-) create mode 100644 packages/programs/data/document/document/benchmark/iterate-replicate-2.ts create mode 100644 packages/programs/data/document/document/benchmark/iterate-replicate.ts diff --git a/packages/programs/data/document/document/benchmark/iterate-replicate-2.ts b/packages/programs/data/document/document/benchmark/iterate-replicate-2.ts new file mode 100644 index 000000000..55ee75e99 --- /dev/null +++ b/packages/programs/data/document/document/benchmark/iterate-replicate-2.ts @@ -0,0 +1,151 @@ +import { yamux } from "@chainsafe/libp2p-yamux"; +import { field, option, variant } from "@dao-xyz/borsh"; +import { tcp } from "@libp2p/tcp"; +import { SearchRequest } from "@peerbit/document-interface"; +import { Sort } from "@peerbit/indexer-interface"; +import { Program } from "@peerbit/program"; +import { DirectSub } from "@peerbit/pubsub"; +import { Peerbit, createLibp2pExtended } from "peerbit"; +import { v4 as uuid } from "uuid"; +import { createDocumentDomainFromProperty } from "../src/domain.js"; +import { Documents, type SetupOptions } from "../src/program.js"; + +// Run with "node --loader ts-node/esm ./benchmark/iterate-replicate-2.ts" + +@variant("document") +class Document { + @field({ type: "string" }) + id: string; + + @field({ type: option("string") }) + name?: string; + + @field({ type: option("u64") }) + number?: bigint; + + constructor(opts: Document) { + if (opts) { + this.id = opts.id; + this.name = opts.name; + this.number = opts.number; + } + } +} + +@variant("test_documents") +class TestStore extends Program>> { + @field({ type: Documents }) + docs: Documents; + + constructor() { + super(); + this.docs = new Documents(); + } + + async open(options?: Partial>): Promise { + await this.docs.open({ + ...options, + type: Document, + domain: createDocumentDomainFromProperty({ + property: "number", + resolution: "u64", + mergeSegmentMaxDelta: 1000, + }), + replicas: { + min: 1, + }, + }); + } +} + +const peers = await Promise.all( + [ + await createLibp2pExtended({ + transports: [tcp()], + streamMuxers: [yamux()], + services: { + pubsub: (sub: any) => + new DirectSub(sub, { + canRelayMessage: true, + /* connectionManager: true */ + }), + }, + }), + await createLibp2pExtended({ + connectionManager: {}, + transports: [tcp()], + streamMuxers: [yamux()], + services: { + pubsub: (sub: any) => + new DirectSub(sub, { + canRelayMessage: true, + /* connectionManager: true */ + }), + }, + }), + ].map((x) => Peerbit.create({ libp2p: x })), +); + +await peers[0].dial(peers[1].getMultiaddrs()); + +const host = await peers[0].open(new TestStore(), { + args: { + replicate: { + factor: 1, + }, + }, +}); + +const createDoc = (number: bigint) => { + return new Document({ + id: uuid(), + name: uuid(), + number, + }); +}; + +// warmup +console.log("Inserting"); +const insertionCount = 1e4; +for (let i = 0; i < insertionCount; i++) { + if (i % 1e3 === 0 && i > 0) { + console.log("... " + i + " ..."); + } + await host.docs.put(createDoc(BigInt(i)), { unique: true }); +} +console.log("Inserted: " + insertionCount); + +const client = await peers[1].open(host.clone(), { + args: { + replicate: false, + }, +}); + +await client.docs.log.waitForReplicator(host.node.identity.publicKey); + +let iterator = client.docs.index.iterate( + new SearchRequest({ sort: new Sort({ key: "number" }) }), + { + remote: { + replicate: true, + }, + }, +); + +let uniqueResults = new Set(); +let c = 0; +while (iterator.done() !== true) { + console.log("i:", c); + const results = await iterator.next(10); + c++; + for (const result of results) { + uniqueResults.add(result.id); + } +} + +await client.close(); +await host.close(); +await Promise.all(peers.map((x) => x.stop())); +await Promise.all(peers.map((x) => x.libp2p.stop())); + +console.log("done, fetched results: ", uniqueResults.size); diff --git a/packages/programs/data/document/document/benchmark/iterate-replicate.ts b/packages/programs/data/document/document/benchmark/iterate-replicate.ts new file mode 100644 index 000000000..cc6e5e8fd --- /dev/null +++ b/packages/programs/data/document/document/benchmark/iterate-replicate.ts @@ -0,0 +1,193 @@ +import { yamux } from "@chainsafe/libp2p-yamux"; +import { field, option, variant } from "@dao-xyz/borsh"; +import { tcp } from "@libp2p/tcp"; +import { SearchRequest } from "@peerbit/document-interface"; +import { Sort } from "@peerbit/indexer-interface"; +import { Program } from "@peerbit/program"; +import { DirectSub } from "@peerbit/pubsub"; +import { Peerbit, createLibp2pExtended } from "peerbit"; +import * as B from "tinybench"; +import { v4 as uuid } from "uuid"; +import { Documents, type SetupOptions } from "../src/program.js"; + +// Run with "node --loader ts-node/esm ./benchmark/iterate-replicate.ts" + +@variant("document") +class Document { + @field({ type: "string" }) + id: string; + + @field({ type: option("string") }) + name?: string; + + @field({ type: option("u64") }) + number?: bigint; + + constructor(opts: Document) { + if (opts) { + this.id = opts.id; + this.name = opts.name; + this.number = opts.number; + } + } +} + +@variant("test_documents") +class TestStore extends Program>> { + @field({ type: Documents }) + docs: Documents; + + constructor() { + super(); + this.docs = new Documents(); + } + + async open(options?: Partial>): Promise { + await this.docs.open({ ...options, type: Document }); + } +} + +const peers = await Promise.all( + [ + await createLibp2pExtended({ + transports: [tcp()], + streamMuxers: [yamux()], + services: { + pubsub: (sub: any) => + new DirectSub(sub, { + canRelayMessage: true, + /* connectionManager: true */ + }), + }, + }), + await createLibp2pExtended({ + connectionManager: {}, + transports: [tcp()], + streamMuxers: [yamux()], + services: { + pubsub: (sub: any) => + new DirectSub(sub, { + canRelayMessage: true, + /* connectionManager: true */ + }), + }, + }), + ].map((x) => Peerbit.create({ libp2p: x })), +); + +await peers[0].dial(peers[1].getMultiaddrs()); + +const host = await peers[0].open(new TestStore(), { + args: { + replicate: { + factor: 1, + }, + }, +}); + +const createDoc = (number: bigint) => { + return new Document({ + id: uuid(), + name: uuid(), + number, + }); +}; + +// warmup +console.log("Inserting"); +const insertionCount = 1e4; +for (let i = 0; i < insertionCount; i++) { + if (i % 1e3 === 0 && i > 0) { + console.log("... " + i + " ..."); + } + await host.docs.put(createDoc(BigInt(i)), { unique: true }); +} +console.log("Inserted: " + insertionCount); + +const client = await peers[1].open(host.clone(), { + args: { + replicate: false, + }, +}); + +await client.docs.log.waitForReplicator(host.node.identity.publicKey); + +let iterator: ReturnType | undefined = + undefined; + +const suite = new B.Bench(); +let x = 0; +let iterateBatchSize = 10; +suite.add( + "iterate", + async () => { + // wait for reading + const batch = await iterator!.next(iterateBatchSize); + if (batch.length !== 10 && iterator!.done() !== true) { + throw new Error("Missing results"); + } + x += batch.length; + if (iterator!.done() === true) { + iterator = client.docs.index.iterate( + new SearchRequest({ sort: new Sort({ key: "number" }) }), + ); + } + }, + { + beforeAll: () => { + iterator = client.docs.index.iterate( + new SearchRequest({ sort: new Sort({ key: "number" }) }), + ); + }, + afterAll: () => { + return iterator?.close(); + }, + }, +); + +let c = 0; +suite.add( + "iterate-replicate", + async () => { + // wait for reading + const batch = await iterator!.next(iterateBatchSize); + if (batch.length !== 10 && iterator!.done() !== true) { + throw new Error("Missing results"); + } + + c += batch.length; + if (iterator!.done() === true) { + iterator = client.docs.index.iterate( + new SearchRequest({ sort: new Sort({ key: "number" }) }), + { remote: { replicate: true } }, + ); + } + }, + { + beforeAll: () => { + // TODO this will have side effects on other tests + iterator = client.docs.index.iterate( + new SearchRequest({ sort: new Sort({ key: "number" }) }), + { remote: { replicate: true } }, + ); + }, + afterAll: async () => { + const numberOfSegments = ( + await client.docs.log.getMyReplicationSegments() + ).length; + if (numberOfSegments === 0) { + throw new Error("Missing ranges"); + } + console.log("Number of segmesnts after replication: " + numberOfSegments); + return iterator?.close(); + }, + }, +); + +await suite.run(); +console.table(suite.table()); + +await client.close(); +await host.close(); +await Promise.all(peers.map((x) => x.stop())); +await Promise.all(peers.map((x) => x.libp2p.stop())); diff --git a/packages/programs/data/document/document/src/domain.ts b/packages/programs/data/document/document/src/domain.ts index eeb4f26d7..b40cc58c4 100644 --- a/packages/programs/data/document/document/src/domain.ts +++ b/packages/programs/data/document/document/src/domain.ts @@ -1,56 +1,146 @@ +import type * as types from "@peerbit/document-interface"; import { Entry, type ShallowEntry } from "@peerbit/log"; +import { logger as loggerFn } from "@peerbit/logger"; import { type EntryReplicated, + MAX_U32, + MAX_U64, + type NumberFromType, type ReplicationDomain, + type SharedLog, } from "@peerbit/shared-log"; -import { - type Documents, - type Operation, - isPutOperation, -} from "../src/index.js"; - -type RangeArgs = { from: number; to: number }; -export type CustomDomain = ReplicationDomain; - -export const createDocumentDomain = ( - db: Documents, - options: { - fromValue: (value: T) => number; - fromMissing?: ( - entry: EntryReplicated<"u32"> | ShallowEntry | Entry, - ) => number; - }, -): CustomDomain => { - const fromValue = options.fromValue; - const fromMissing = options.fromMissing || (() => 0xffffffff); - return { - type: "custom", - resolution: "u32", - fromArgs(args, log) { - if (!args) { - return { offset: log.node.identity.publicKey }; - } - return { - offset: args.from, - length: args.to - args.from, - }; - }, - fromEntry: async (entry) => { - const item = await ( - entry instanceof Entry ? entry : await db.log.log.get(entry.hash) - )?.getPayloadValue(); - if (!item) { - // eslint-disable-next-line no-console - console.error("Item not found"); - return fromMissing(entry); - } - - if (isPutOperation(item)) { - const document = db.index.valueEncoding.decoder(item.data); - return fromValue(document); - } - - return fromMissing(entry); - }, - }; +import { type Operation, isPutOperation } from "./operation.js"; +import type { DocumentIndex } from "./search.js"; + +const logger = loggerFn({ module: "document-domain" }); + +type InferT = D extends Documents ? T : never; +type InferR = + D extends Documents> ? R : never; + +type Documents< + T, + D extends ReplicationDomain, + R extends "u32" | "u64" = D extends ReplicationDomain + ? I + : "u32", +> = { log: SharedLog; index: DocumentIndex }; + +type RangeArgs = { + from: NumberFromType; + to: NumberFromType; +}; +export type CustomDocumentDomain = ReplicationDomain< + RangeArgs, + Operation, + R +> & { canProjectToOneSegment: (request: types.SearchRequest) => boolean }; + +type FromEntry = { + fromEntry?: ( + entry: ShallowEntry | Entry | EntryReplicated, + ) => NumberFromType; }; +type FromValue = { + fromValue?: ( + value: T | undefined, + entry: ShallowEntry | Entry | EntryReplicated, + ) => NumberFromType; +}; + +type CreateArgs< + R extends "u32" | "u64", + DB extends Documents, +> = { + resolution: R; + canProjectToOneSegment: (request: types.SearchRequest) => boolean; + mergeSegmentMaxDelta?: number; +} & (FromEntry | FromValue, R>); + +export const createDocumentDomainFromProperty = < + R extends "u32" | "u64", + DB extends Documents, +>(properties: { + property: keyof InferT; + resolution: R; + mergeSegmentMaxDelta?: number; +}): ((db: DB) => CustomDocumentDomain>) => { + const coerceNumber = (number: number | bigint): NumberFromType => + (properties.resolution === "u32" + ? number + : BigInt(number)) as NumberFromType; + return createDocumentDomain({ + resolution: properties.resolution, + canProjectToOneSegment: (request) => + request.sort[0]?.key[0] === properties.property, + fromValue: (value) => coerceNumber(value![properties.property]), + mergeSegmentMaxDelta: properties.mergeSegmentMaxDelta, + }); +}; + +export const createDocumentDomain = + >( + args: CreateArgs, + ): ((db: DB) => CustomDocumentDomain>) => + (db: DB) => { + let maxValue = args.resolution === "u32" ? MAX_U32 : MAX_U64; + let fromEntry = (args as FromEntry>).fromEntry + ? (args as FromEntry>).fromEntry! + : async ( + entry: ShallowEntry | Entry | EntryReplicated, + ) => { + const item = await ( + entry instanceof Entry ? entry : await db.log.log.get(entry.hash) + )?.getPayloadValue(); + + let document: InferT | undefined = undefined; + if (!item) { + logger.error("Item not found"); + } else if (isPutOperation(item)) { + document = db.index.valueEncoding.decoder(item.data); + } + return (args as FromValue).fromValue!( + document, + entry, + ) as NumberFromType>; + }; + return { + type: "custom", + resolution: args.resolution as InferR, + canProjectToOneSegment: args.canProjectToOneSegment, + fromArgs(args) { + if (!args) { + return { + offset: db.log.node.identity.publicKey, + length: maxValue as NumberFromType>, + }; + } + return { + offset: args.from, + length: (args.to - args.from) as NumberFromType>, + }; + }, + fromEntry, + canMerge: + args.mergeSegmentMaxDelta == null + ? undefined + : (from, into) => { + if ( + Math.abs(Number(from.end2 - into.start1)) <= + args.mergeSegmentMaxDelta! + ) { + return true; + } + if ( + Math.abs(Number(from.start1 - into.end2)) <= + args.mergeSegmentMaxDelta! + ) { + return true; + } + if (from.overlaps(into)) { + return true; + } + return false; + }, + }; + }; diff --git a/packages/programs/data/document/document/test/domain.spec.ts b/packages/programs/data/document/document/test/domain.spec.ts index 874242bf5..c2e440432 100644 --- a/packages/programs/data/document/document/test/domain.spec.ts +++ b/packages/programs/data/document/document/test/domain.spec.ts @@ -1,16 +1,21 @@ import { field, variant } from "@dao-xyz/borsh"; import { SearchRequest } from "@peerbit/document-interface"; +import { Sort } from "@peerbit/indexer-interface"; import { Program } from "@peerbit/program"; import { TestSession } from "@peerbit/test-utils"; -import { delay, waitForResolved } from "@peerbit/time"; +import { waitForResolved } from "@peerbit/time"; import { expect } from "chai"; import { v4 as uuid } from "uuid"; -import { type CustomDomain, createDocumentDomain } from "../src/domain.js"; +import { + type CustomDocumentDomain, + createDocumentDomainFromProperty, +} from "../src/domain.js"; import { Documents, type SetupOptions } from "../src/program.js"; class Document { @field({ type: "string" }) id: string; + @field({ type: "u32" }) property: number; @@ -23,23 +28,27 @@ class Document { @variant("StoreWithCustomDomain") export class StoreWithCustomDomain extends Program { @field({ type: Documents }) - docs: Documents; + docs: Documents>; constructor(properties?: { - docs?: Documents; + docs?: Documents>; }) { super(); this.docs = - properties?.docs || new Documents(); + properties?.docs || + new Documents>(); } async open( - args?: Partial>, + args?: Partial< + SetupOptions> + >, ): Promise { return this.docs.open({ ...(args || {}), - domain: createDocumentDomain(this.docs, { - fromValue: (value) => value.property, + domain: createDocumentDomainFromProperty({ + resolution: "u64", + property: "property", }), type: Document, }); @@ -48,6 +57,7 @@ export class StoreWithCustomDomain extends Program { describe("domain", () => { let session: TestSession; + let store: StoreWithCustomDomain, store2: StoreWithCustomDomain; before(async () => { session = await TestSession.connected(2); @@ -57,8 +67,8 @@ describe("domain", () => { await session.stop(); }); - it("custom domain", async () => { - const store = await session.peers[0].open(new StoreWithCustomDomain(), { + beforeEach(async () => { + store = await session.peers[0].open(new StoreWithCustomDomain(), { args: { replicate: { normalized: false, @@ -68,7 +78,7 @@ describe("domain", () => { }, }, }); - const store2 = await session.peers[1].open(store.clone(), { + store2 = await session.peers[1].open(store.clone(), { args: { replicate: { normalized: false, @@ -83,35 +93,43 @@ describe("domain", () => { await store2.docs.put(new Document({ id: "2", property: 2 })); await store2.docs.put(new Document({ id: "3", property: 3 })); - await delay(3000); // wait for sometime so that potential replication could have happened + await waitForResolved(async () => { + expect(await store.docs.index.getSize()).to.equal(1); + expect(await store2.docs.index.getSize()).to.equal(2); + }); - expect(await store.docs.index.getSize()).to.equal(1); - expect(await store2.docs.index.getSize()).to.equal(2); + await store.docs.log.waitForReplicator(store2.docs.node.identity.publicKey); + await store2.docs.log.waitForReplicator(store.docs.node.identity.publicKey); + }); + + afterEach(async () => { + await store.close(); + await store2.close(); + }); + it("custom domain", async () => { // test querying with the same domain but different peers and assert results are correct - await waitForResolved(async () => { - const resultsWithRemoteRightDomain = await store.docs.index.search( - new SearchRequest(), - { - remote: { - domain: { - from: 2, - to: 3, - }, - eager: true, + const resultsWithRemoteRightDomain = await store.docs.index.search( + new SearchRequest(), + { + remote: { + domain: { + from: 2n, + to: 3n, }, + eager: true, }, - ); + }, + ); - expect(resultsWithRemoteRightDomain).to.have.length(3); - }); + expect(resultsWithRemoteRightDomain).to.have.length(3); const resultsWhenRemoteDoesNotHaveRightDomain = await store.docs.index.search(new SearchRequest(), { remote: { domain: { - from: 4, - to: 5, + from: 4n, + to: 5n, }, }, }); @@ -119,4 +137,67 @@ describe("domain", () => { expect(resultsWhenRemoteDoesNotHaveRightDomain).to.have.length(1); // only the loal result expect(resultsWhenRemoteDoesNotHaveRightDomain[0].id).to.equal("1"); }); + + it("will join with multiple segments when not sorting by mergeable property", async () => { + expect( + (await store.docs.log.getMyReplicationSegments()).map((x) => + [x.start1, x.end1].map(Number), + ), + ).to.deep.equal([[1, 2]]); + const resultsWithRemoteRightDomain = await store.docs.index.search( + new SearchRequest(), + { + remote: { + domain: { + from: 0n, + to: 5n, + }, + eager: true, + replicate: true, + }, + }, + ); + + expect(resultsWithRemoteRightDomain).to.have.length(3); + expect( + (await store.docs.log.getMyReplicationSegments()).map((x) => + [x.start1, x.end1].map(Number), + ), + ).to.deep.equal([ + [1, 2], + [2, 3], + [3, 4], + ]); + }); + + it("will join by separate segments when not sorting by non mergeable property", async () => { + expect( + (await store.docs.log.getMyReplicationSegments()).map((x) => + [x.start1, x.end1].map(Number), + ), + ).to.deep.equal([[1, 2]]); + const resultsWithRemoteRightDomain = await store.docs.index.search( + new SearchRequest({ sort: [new Sort({ key: ["property"] })] }), + { + remote: { + domain: { + from: 0n, + to: 5n, + }, + eager: true, + replicate: true, + }, + }, + ); + + expect(resultsWithRemoteRightDomain).to.have.length(3); + expect( + (await store.docs.log.getMyReplicationSegments()).map((x) => + [x.start1, x.end1].map(Number), + ), + ).to.deep.equal([ + [1, 2], + [2, 4], + ]); + }); });