Skip to content

Commit

Permalink
Merge pull request #343 from dao-xyz/rateless
Browse files Browse the repository at this point in the history
pre: sync protocol update minor fixes
  • Loading branch information
marcus-pousette authored Nov 7, 2024
2 parents a26f602 + 4e58cd0 commit 1c77281
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 126 deletions.
19 changes: 13 additions & 6 deletions packages/transport/libp2p-test-utils/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,30 @@ export class TestSession<T> {
const result = async () => {
const definedOptions: Libp2pOptions<T> | undefined =
(options as any)?.[i] || options;

const services: any = {
identify: identify(),
...definedOptions?.services,
};
if (definedOptions?.services?.relay !== null) {
services.relay = relay();
} else {
delete services.relay;
}

const node = await createLibp2p<T>({
addresses: {
listen: listen(),
},
connectionManager: definedOptions?.connectionManager ?? {},
connectionManager: definedOptions?.connectionManager,
privateKey: definedOptions?.privateKey,
datastore: definedOptions?.datastore,
transports: definedOptions?.transports ?? transports(),
connectionMonitor: {
enabled: false,
},

services: {
relay: relay(),
identify: identify(),
...definedOptions?.services,
} as any,
services,
connectionEncrypters: [noise()],
streamMuxers: definedOptions?.streamMuxers || [yamux()],
start: definedOptions?.start,
Expand Down
19 changes: 19 additions & 0 deletions packages/transport/stream/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,25 @@ describe("start/stop", () => {
await session.peers[0].stop();
await session.peers[0].start();
});

it("streams are pruned on disconnect", async () => {
// https://github.com/libp2p/js-libp2p/issues/2794
session = await disconnected(2, {
services: {
relay: null,
directstream: (c: any) => new TestDirectStream(c),
},
} as any);
await session.connect([[session.peers[0], session.peers[1]]]);
await waitForResolved(() =>
expect(session.peers[0].services.directstream.peers.size).to.equal(1),
);

await session.peers[0].hangUp(session.peers[1].peerId);
await waitForResolved(() =>
expect(session.peers[0].services.directstream.peers.size).to.equal(0),
);
});
});

describe("multistream", () => {
Expand Down
29 changes: 24 additions & 5 deletions packages/utils/indexer/interface/src/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ export class BigUnsignedIntegerValue extends IntegerValue {

constructor(number: bigint) {
super();
if (number > 18446744073709551615n || number < 0) {
throw new Error("Number is not u32");
if (number > 18446744073709551615n || number < 0n) {
throw new Error("Number is not u64");
}
this.number = number;
}
Expand Down Expand Up @@ -129,10 +129,26 @@ export class IntegerKey extends IdKey {
}
}

@variant(3)
export class LargeIntegerKey extends IdKey {
@field({ type: "u64" }) // max value is 2^63 - 1 (9007199254740991)
key: bigint;

constructor(key: bigint) {
super();
this.key = key;
}

get primitive() {
return this.key;
}
}

export type Ideable = string | number | bigint | Uint8Array;

const idKeyTypes = new Set(["string", "number", "bigint"]);

const u64Max = 18446744073709551615n;
export const toId = (obj: Ideable): IdKey => {
if (typeof obj === "string") {
return new StringKey(obj);
Expand All @@ -141,11 +157,14 @@ export const toId = (obj: Ideable): IdKey => {
return new IntegerKey(obj);
}
if (typeof obj === "bigint") {
if (obj <= Number.MAX_SAFE_INTEGER && obj >= 0) {
return new IntegerKey(Number(obj));
if (obj <= u64Max && obj >= 0n) {
return new LargeIntegerKey(obj);
}
throw new Error(
"BigInt is not less than 2^53. Max value is 9007199254740991",
"BigInt is not less than 2^64 - 1. Max value is " +
(2 ** 64 - 1) +
". Provided value: " +
obj,
);
}
if (obj instanceof Uint8Array) {
Expand Down
6 changes: 3 additions & 3 deletions packages/utils/indexer/simple/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ export class HashmapIndex<T extends Record<string, any>, NestedType = any>
}

iterate<S extends types.Shape | undefined>(
query: types.IterateOptions,
properties: { shape?: S; reference?: boolean },
query?: types.IterateOptions,
properties?: { shape?: S; reference?: boolean },
): types.IndexIterator<T, S> {
let done: boolean | undefined = undefined;
let queue:
Expand All @@ -205,7 +205,7 @@ export class HashmapIndex<T extends Record<string, any>, NestedType = any>
const indexedDocuments = await this.queryAll(query);
if (indexedDocuments.length > 1) {
// Sort
if (query.sort) {
if (query?.sort) {
const sortArr = Array.isArray(query.sort)
? query.sort
: [query.sort];
Expand Down
5 changes: 4 additions & 1 deletion packages/utils/indexer/simple/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ import { tests } from "@peerbit/indexer-tests";
import { create } from "../src";

describe("all", () => {
tests(create, "transient", false);
tests(create, "transient", {
shapingSupported: false,
u64SumSupported: true,
});
});
54 changes: 36 additions & 18 deletions packages/utils/indexer/sqlite3/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
buildJoin,
convertCountRequestToQuery,
convertDeleteRequestToQuery,
convertFromSQLType,
convertSearchRequestToQuery,
/* getTableName, */
convertSumRequestToQuery,
convertToSQLType,
escapeColumnName,
generateSelectQuery,
getInlineTableFieldName,
Expand Down Expand Up @@ -251,9 +253,13 @@ export class SQLLiteIndex<T extends Record<string, any>>
table,
options?.shape,
);
const sql = `${generateSelectQuery(table, selects)} ${buildJoin(joinMap, true)} where ${this.primaryKeyString} = ? `;
const sql = `${generateSelectQuery(table, selects)} ${buildJoin(joinMap, true)} where ${this.primaryKeyString} = ? limit 1`;
const stmt = await this.properties.db.prepare(sql, sql);
const rows = await stmt.get([id.key]);
const rows = await stmt.get([
table.primaryField?.from?.type
? convertToSQLType(id.key, table.primaryField.from.type)
: id.key,
]);
if (!rows) {
continue;
}
Expand Down Expand Up @@ -324,12 +330,6 @@ export class SQLLiteIndex<T extends Record<string, any>>
): types.IndexIterator<T, S> {
// create a sql statement where the offset and the limit id dynamic and can be updated
// TODO don't use offset but sort and limit 'next' calls by the last value of the sort
let { sql: sqlFetch, bindable } = convertSearchRequestToQuery(
request,
this.tables,
this._rootTables,
options?.shape,
);

/* const totalCountKey = "count"; */
/* const sqlTotalCount = convertCountRequestToQuery(new types.CountRequest({ query: request.query }), this.tables, this.tables.get(this.rootTableName)!)
Expand All @@ -342,11 +342,25 @@ export class SQLLiteIndex<T extends Record<string, any>>

let stmt: Statement;
let kept: number | undefined = undefined;
let bindable: any[] = [];
let sqlFetch: string | undefined = undefined;

/* let totalCount: undefined | number = undefined; */
const fetch = async (amount: number) => {
const fetch = async (amount: number | "all") => {
kept = undefined;
if (!once) {
let { sql, bindable: toBind } = convertSearchRequestToQuery(
request,
this.tables,
this._rootTables,
{
shape: options?.shape,
stable: typeof amount === "number", // if we are to fetch all, we dont need stable sorting
},
);
sqlFetch = sql;
bindable = toBind;

stmt = await this.properties.db.prepare(sqlFetch, sqlFetch);
// stmt.reset?.(); // TODO dont invoke reset if not needed
/* countStmt.reset?.(); */
Expand All @@ -360,12 +374,11 @@ export class SQLLiteIndex<T extends Record<string, any>>
}

once = true;
const offsetStart = offset;

const allResults: Record<string, any>[] = await stmt.all([
...bindable,
amount,
offsetStart,
amount === "all" ? Number.MAX_SAFE_INTEGER : amount,
offset,
]);

let results: IndexedResult<types.ReturnTypeFromShape<T, S>>[] =
Expand All @@ -389,9 +402,12 @@ export class SQLLiteIndex<T extends Record<string, any>>
return {
value,
id: types.toId(
row[
getTablePrefixedField(selectedTable, this.primaryKeyString)
],
convertFromSQLType(
row[
getTablePrefixedField(selectedTable, this.primaryKeyString)
],
selectedTable.primaryField!.from!.type,
),
),
};
}),
Expand All @@ -410,7 +426,7 @@ export class SQLLiteIndex<T extends Record<string, any>>
iterator.kept = 0;
} */

if (results.length < amount) {
if (amount === "all" || results.length < amount) {
hasMore = false;
await this.clearupIterator(requestId);
clearTimeout(iterator.timeout);
Expand Down Expand Up @@ -548,10 +564,12 @@ export class SQLLiteIndex<T extends Record<string, any>>
const stmt = await this.properties.db.prepare(sql, sql);
const result = await stmt.get(bindable);
if (result != null) {
const value = result.sum as number;

if (ret == null) {
(ret as any) = result.sum as number;
ret = value;
} else {
(ret as any) += result.sum as number;
ret += value;
}
once = true;
}
Expand Down
Loading

0 comments on commit 1c77281

Please sign in to comment.