Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre: sync protocol update minor fixes #343

Merged
merged 6 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions packages/transport/libp2p-test-utils/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,30 @@ export class TestSession<T> {
const result = async () => {
const definedOptions: Libp2pOptions<T> | undefined =
(options as any)?.[i] || options;

const services: any = {
identify: identify(),
...definedOptions?.services,
};
if (definedOptions?.services?.relay !== null) {
services.relay = relay();
} else {
delete services.relay;
}

const node = await createLibp2p<T>({
addresses: {
listen: listen(),
},
connectionManager: definedOptions?.connectionManager ?? {},
connectionManager: definedOptions?.connectionManager,
privateKey: definedOptions?.privateKey,
datastore: definedOptions?.datastore,
transports: definedOptions?.transports ?? transports(),
connectionMonitor: {
enabled: false,
},

services: {
relay: relay(),
identify: identify(),
...definedOptions?.services,
} as any,
services,
connectionEncrypters: [noise()],
streamMuxers: definedOptions?.streamMuxers || [yamux()],
start: definedOptions?.start,
Expand Down
19 changes: 19 additions & 0 deletions packages/transport/stream/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,25 @@ describe("start/stop", () => {
await session.peers[0].stop();
await session.peers[0].start();
});

it("streams are pruned on disconnect", async () => {
// https://github.com/libp2p/js-libp2p/issues/2794
session = await disconnected(2, {
services: {
relay: null,
directstream: (c: any) => new TestDirectStream(c),
},
} as any);
await session.connect([[session.peers[0], session.peers[1]]]);
await waitForResolved(() =>
expect(session.peers[0].services.directstream.peers.size).to.equal(1),
);

await session.peers[0].hangUp(session.peers[1].peerId);
await waitForResolved(() =>
expect(session.peers[0].services.directstream.peers.size).to.equal(0),
);
});
});

describe("multistream", () => {
Expand Down
29 changes: 24 additions & 5 deletions packages/utils/indexer/interface/src/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ export class BigUnsignedIntegerValue extends IntegerValue {

constructor(number: bigint) {
super();
if (number > 18446744073709551615n || number < 0) {
throw new Error("Number is not u32");
if (number > 18446744073709551615n || number < 0n) {
throw new Error("Number is not u64");
}
this.number = number;
}
Expand Down Expand Up @@ -129,10 +129,26 @@ export class IntegerKey extends IdKey {
}
}

@variant(3)
export class LargeIntegerKey extends IdKey {
@field({ type: "u64" }) // max value is 2^63 - 1 (9007199254740991)
key: bigint;

constructor(key: bigint) {
super();
this.key = key;
}

get primitive() {
return this.key;
}
}

export type Ideable = string | number | bigint | Uint8Array;

const idKeyTypes = new Set(["string", "number", "bigint"]);

const u64Max = 18446744073709551615n;
export const toId = (obj: Ideable): IdKey => {
if (typeof obj === "string") {
return new StringKey(obj);
Expand All @@ -141,11 +157,14 @@ export const toId = (obj: Ideable): IdKey => {
return new IntegerKey(obj);
}
if (typeof obj === "bigint") {
if (obj <= Number.MAX_SAFE_INTEGER && obj >= 0) {
return new IntegerKey(Number(obj));
if (obj <= u64Max && obj >= 0n) {
return new LargeIntegerKey(obj);
}
throw new Error(
"BigInt is not less than 2^53. Max value is 9007199254740991",
"BigInt is not less than 2^64 - 1. Max value is " +
(2 ** 64 - 1) +
". Provided value: " +
obj,
);
}
if (obj instanceof Uint8Array) {
Expand Down
6 changes: 3 additions & 3 deletions packages/utils/indexer/simple/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ export class HashmapIndex<T extends Record<string, any>, NestedType = any>
}

iterate<S extends types.Shape | undefined>(
query: types.IterateOptions,
properties: { shape?: S; reference?: boolean },
query?: types.IterateOptions,
properties?: { shape?: S; reference?: boolean },
): types.IndexIterator<T, S> {
let done: boolean | undefined = undefined;
let queue:
Expand All @@ -205,7 +205,7 @@ export class HashmapIndex<T extends Record<string, any>, NestedType = any>
const indexedDocuments = await this.queryAll(query);
if (indexedDocuments.length > 1) {
// Sort
if (query.sort) {
if (query?.sort) {
const sortArr = Array.isArray(query.sort)
? query.sort
: [query.sort];
Expand Down
5 changes: 4 additions & 1 deletion packages/utils/indexer/simple/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ import { tests } from "@peerbit/indexer-tests";
import { create } from "../src";

describe("all", () => {
tests(create, "transient", false);
tests(create, "transient", {
shapingSupported: false,
u64SumSupported: true,
});
});
54 changes: 36 additions & 18 deletions packages/utils/indexer/sqlite3/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
buildJoin,
convertCountRequestToQuery,
convertDeleteRequestToQuery,
convertFromSQLType,
convertSearchRequestToQuery,
/* getTableName, */
convertSumRequestToQuery,
convertToSQLType,
escapeColumnName,
generateSelectQuery,
getInlineTableFieldName,
Expand Down Expand Up @@ -251,9 +253,13 @@ export class SQLLiteIndex<T extends Record<string, any>>
table,
options?.shape,
);
const sql = `${generateSelectQuery(table, selects)} ${buildJoin(joinMap, true)} where ${this.primaryKeyString} = ? `;
const sql = `${generateSelectQuery(table, selects)} ${buildJoin(joinMap, true)} where ${this.primaryKeyString} = ? limit 1`;
const stmt = await this.properties.db.prepare(sql, sql);
const rows = await stmt.get([id.key]);
const rows = await stmt.get([
table.primaryField?.from?.type
? convertToSQLType(id.key, table.primaryField.from.type)
: id.key,
]);
if (!rows) {
continue;
}
Expand Down Expand Up @@ -324,12 +330,6 @@ export class SQLLiteIndex<T extends Record<string, any>>
): types.IndexIterator<T, S> {
// create a sql statement where the offset and the limit id dynamic and can be updated
// TODO don't use offset but sort and limit 'next' calls by the last value of the sort
let { sql: sqlFetch, bindable } = convertSearchRequestToQuery(
request,
this.tables,
this._rootTables,
options?.shape,
);

/* const totalCountKey = "count"; */
/* const sqlTotalCount = convertCountRequestToQuery(new types.CountRequest({ query: request.query }), this.tables, this.tables.get(this.rootTableName)!)
Expand All @@ -342,11 +342,25 @@ export class SQLLiteIndex<T extends Record<string, any>>

let stmt: Statement;
let kept: number | undefined = undefined;
let bindable: any[] = [];
let sqlFetch: string | undefined = undefined;

/* let totalCount: undefined | number = undefined; */
const fetch = async (amount: number) => {
const fetch = async (amount: number | "all") => {
kept = undefined;
if (!once) {
let { sql, bindable: toBind } = convertSearchRequestToQuery(
request,
this.tables,
this._rootTables,
{
shape: options?.shape,
stable: typeof amount === "number", // if we are to fetch all, we dont need stable sorting
},
);
sqlFetch = sql;
bindable = toBind;

stmt = await this.properties.db.prepare(sqlFetch, sqlFetch);
// stmt.reset?.(); // TODO dont invoke reset if not needed
/* countStmt.reset?.(); */
Expand All @@ -360,12 +374,11 @@ export class SQLLiteIndex<T extends Record<string, any>>
}

once = true;
const offsetStart = offset;

const allResults: Record<string, any>[] = await stmt.all([
...bindable,
amount,
offsetStart,
amount === "all" ? Number.MAX_SAFE_INTEGER : amount,
offset,
]);

let results: IndexedResult<types.ReturnTypeFromShape<T, S>>[] =
Expand All @@ -389,9 +402,12 @@ export class SQLLiteIndex<T extends Record<string, any>>
return {
value,
id: types.toId(
row[
getTablePrefixedField(selectedTable, this.primaryKeyString)
],
convertFromSQLType(
row[
getTablePrefixedField(selectedTable, this.primaryKeyString)
],
selectedTable.primaryField!.from!.type,
),
),
};
}),
Expand All @@ -410,7 +426,7 @@ export class SQLLiteIndex<T extends Record<string, any>>
iterator.kept = 0;
} */

if (results.length < amount) {
if (amount === "all" || results.length < amount) {
hasMore = false;
await this.clearupIterator(requestId);
clearTimeout(iterator.timeout);
Expand Down Expand Up @@ -548,10 +564,12 @@ export class SQLLiteIndex<T extends Record<string, any>>
const stmt = await this.properties.db.prepare(sql, sql);
const result = await stmt.get(bindable);
if (result != null) {
const value = result.sum as number;

if (ret == null) {
(ret as any) = result.sum as number;
ret = value;
} else {
(ret as any) += result.sum as number;
ret += value;
}
once = true;
}
Expand Down
Loading
Loading