Skip to content

Commit

Permalink
Remove unused package, take 2 (#9229)
Browse files Browse the repository at this point in the history
* Revert "Revert "Remove unused dependencies (#9223)" (#9225)"

This reverts commit 30d241f.

* Fix: use the embedded pg-types instead of a mismatching dep

* Fix: do not use pg-types, but pg.types
  • Loading branch information
Fraggle authored Dec 10, 2024
1 parent bb91bc8 commit 627f745
Show file tree
Hide file tree
Showing 7 changed files with 807 additions and 2,066 deletions.
9 changes: 7 additions & 2 deletions connectors/src/resources/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { isDevelopment } from "@dust-tt/types";
import assert from "assert";
import types, { builtins } from "pg-types";
import { Sequelize } from "sequelize";

import logger from "@connectors/logger/logger";
import { dbConfig } from "@connectors/resources/storage/config";

// Directly require 'pg' here to make sure we are using the same version of the
// package as the one used by pg package.
// The doc recommends doing this : https://github.com/brianc/node-pg-types?tab=readme-ov-file#use
// eslint-disable-next-line @typescript-eslint/no-var-requires
const types = require("pg").types;

const acquireAttempts = new WeakMap();

const { DB_LOGGING_ENABLED = false } = process.env;
Expand All @@ -19,7 +24,7 @@ function sequelizeLogger(message: string) {
// prevents silent precision loss when handling large integers from the database.
// Throws an assertion error if a BIGINT value exceeds JavaScript's safe integer
// limits.
types.setTypeParser(builtins.INT8, function (val) {
types.setTypeParser(types.builtins.INT8, function (val: unknown) {
assert(
Number.isSafeInteger(Number(val)),
`Found a value stored as a BIGINT that is not a safe integer: ${val}`
Expand Down
9 changes: 7 additions & 2 deletions front/lib/resources/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { isDevelopment } from "@dust-tt/types";
import assert from "assert";
import types, { builtins } from "pg-types";
import { Sequelize } from "sequelize";

import { dbConfig } from "@app/lib/resources/storage/config";
import logger from "@app/logger/logger";

// Directly require 'pg' here to make sure we are using the same version of the
// package as the one used by pg package.
// The doc recommends doing this : https://github.com/brianc/node-pg-types?tab=readme-ov-file#use
// eslint-disable-next-line @typescript-eslint/no-var-requires
const types = require("pg").types;

const acquireAttempts = new WeakMap();

const { DB_LOGGING_ENABLED = false } = process.env;
Expand All @@ -19,7 +24,7 @@ function sequelizeLogger(message: string) {
// prevents silent precision loss when handling large integers from the database.
// Throws an assertion error if a BIGINT value exceeds JavaScript's safe integer
// limits.
types.setTypeParser(builtins.INT8, function (val) {
types.setTypeParser(types.builtins.INT8, function (val: unknown) {
assert(
Number.isSafeInteger(Number(val)),
`Found a value stored as a BIGINT that is not a safe integer: ${val}`
Expand Down
134 changes: 67 additions & 67 deletions front/migrations/20230614_timetttamp.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,73 @@
/* Following a typo on points payload in our Qdrant database in which the field
time*st*amp was written time*t*amp This migration creates a new field for every
point, correctly named time*st*amp and with the value of the existing time*t*amp
field*/
import { QdrantClient } from "@qdrant/js-client-rest";
// /* Following a typo on points payload in our Qdrant database in which the field
// time*st*amp was written time*t*amp This migration creates a new field for every
// point, correctly named time*st*amp and with the value of the existing time*t*amp
// field*/
// import { QdrantClient } from "@qdrant/js-client-rest";

// if reusing, set env vars to those values, don't hardcode the values in the code
// also as a safety delete the env vars (if persisted one way or another) after the migration
// !! Since the client used is the REST client, ensure the port in QDRANT URL is to the REST API (not gRPC), usually 6333 (not 6334)
const { QDRANT_URL, QDRANT_API_KEY } = process.env;
// // if reusing, set env vars to those values, don't hardcode the values in the code
// // also as a safety delete the env vars (if persisted one way or another) after the migration
// // !! Since the client used is the REST client, ensure the port in QDRANT URL is to the REST API (not gRPC), usually 6333 (not 6334)
// const { QDRANT_URL, QDRANT_API_KEY } = process.env;

const prodClient = new QdrantClient({
url: QDRANT_URL,
apiKey: QDRANT_API_KEY,
});
// const prodClient = new QdrantClient({
// url: QDRANT_URL,
// apiKey: QDRANT_API_KEY,
// });

async function migrateCollection(
client: QdrantClient,
collectionName: string,
batchSize = 250,
offset?: string | number | undefined | null | Record<string, unknown>
) {
const currentTime = Date.now();
let updated = 0;
const nb_points = await client.count(collectionName);
console.log(
"Migrating Collection ",
collectionName,
"\nNumber of points: ",
nb_points.count
);
// scroll points excluding those with timestamp field until none are left
do {
console.log("Current offset: ", offset);
const { points, next_page_offset } = await client.scroll(collectionName, {
limit: batchSize,
offset,
with_vector: false,
filter: {
must: [{ is_empty: { key: "timestamp" } }],
},
});
// async function migrateCollection(
// client: QdrantClient,
// collectionName: string,
// batchSize = 250,
// offset?: string | number | undefined | null | Record<string, unknown>
// ) {
// const currentTime = Date.now();
// let updated = 0;
// const nb_points = await client.count(collectionName);
// console.log(
// "Migrating Collection ",
// collectionName,
// "\nNumber of points: ",
// nb_points.count
// );
// // scroll points excluding those with timestamp field until none are left
// do {
// console.log("Current offset: ", offset);
// const { points, next_page_offset } = await client.scroll(collectionName, {
// limit: batchSize,
// offset,
// with_vector: false,
// filter: {
// must: [{ is_empty: { key: "timestamp" } }],
// },
// });

// update the points
const updatePromises = points.map(async (point) => {
const payload = { timestamp: point.payload?.timetamp };
await client.setPayload(collectionName, { payload, points: [point.id] });
});
// wait for all points of the batch to be updated to avoid flooding the server
await Promise.all(updatePromises);
offset = next_page_offset;
updated += points.length;
} while (offset !== null);
console.log(
"Updated points: ",
updated,
"\nTime: ",
Date.now() - currentTime
);
}
// // update the points
// const updatePromises = points.map(async (point) => {
// const payload = { timestamp: point.payload?.timetamp };
// await client.setPayload(collectionName, { payload, points: [point.id] });
// });
// // wait for all points of the batch to be updated to avoid flooding the server
// await Promise.all(updatePromises);
// offset = next_page_offset;
// updated += points.length;
// } while (offset !== null);
// console.log(
// "Updated points: ",
// updated,
// "\nTime: ",
// Date.now() - currentTime
// );
// }

/* Migrate all collections
* Points that do not need an update are ignored
* Therefore can be safely restarted if it fails
*/
async function migrateCollections(client: QdrantClient) {
const result = await client.getCollections();
for (const collection of result.collections) {
await migrateCollection(client, collection.name);
}
}
// /* Migrate all collections
// * Points that do not need an update are ignored
// * Therefore can be safely restarted if it fails
// */
// async function migrateCollections(client: QdrantClient) {
// const result = await client.getCollections();
// for (const collection of result.collections) {
// await migrateCollection(client, collection.name);
// }
// }

void migrateCollections(prodClient);
// void migrateCollections(prodClient);
164 changes: 82 additions & 82 deletions front/migrations/20230926_clean_up_qdrant_null_payloads.ts
Original file line number Diff line number Diff line change
@@ -1,94 +1,94 @@
import { QdrantClient } from "@qdrant/js-client-rest";
// import { QdrantClient } from "@qdrant/js-client-rest";

const { QDRANT_API_KEY, QDRANT_URL } = process.env;
// const { QDRANT_API_KEY, QDRANT_URL } = process.env;

const client = new QdrantClient({
url: QDRANT_URL,
apiKey: QDRANT_API_KEY,
});
// const client = new QdrantClient({
// url: QDRANT_URL,
// apiKey: QDRANT_API_KEY,
// });

async function cleanup(collections: { name: string }[]) {
console.log(`Cleaning up ${collections.length} collections.`);
for (const c of collections) {
let done = 0;
let offset: number | undefined | null | string | Record<string, unknown> =
0;
while (offset !== null && offset !== undefined) {
const res = await client.scroll(c.name, {
filter: {
must: [
{
is_empty: {
key: "document_id_hash",
},
},
],
},
with_payload: true,
limit: 1024,
offset,
});
// async function cleanup(collections: { name: string }[]) {
// console.log(`Cleaning up ${collections.length} collections.`);
// for (const c of collections) {
// let done = 0;
// let offset: number | undefined | null | string | Record<string, unknown> =
// 0;
// while (offset !== null && offset !== undefined) {
// const res = await client.scroll(c.name, {
// filter: {
// must: [
// {
// is_empty: {
// key: "document_id_hash",
// },
// },
// ],
// },
// with_payload: true,
// limit: 1024,
// offset,
// });

const points = res.points.map((p) => p.id);
const r = await client.delete(c.name, { wait: true, points });
console.log(`Deleted ${points.length} points from ${c.name} res=${r}`);
// const points = res.points.map((p) => p.id);
// const r = await client.delete(c.name, { wait: true, points });
// console.log(`Deleted ${points.length} points from ${c.name} res=${r}`);

offset = res.next_page_offset;
done += res.points.length;
}
// offset = res.next_page_offset;
// done += res.points.length;
// }

const rr = await client.getCollection(c.name);
console.log("DONE", done);
console.log(rr);
}
}
// const rr = await client.getCollection(c.name);
// console.log("DONE", done);
// console.log(rr);
// }
// }

async function run() {
const result = await client.getCollections();
const collections = result.collections;
// const collections = COLLECTION_NAMES;
// async function run() {
// const result = await client.getCollections();
// const collections = result.collections;
// // const collections = COLLECTION_NAMES;

const collectionsWithNulls: { name: string }[] = [];
// const collectionsWithNulls: { name: string }[] = [];

console.log(`Processing ${collections.length} collections.`);
let i = 0;
for (const c of collections) {
let done = 0;
let offset: number | undefined | null | string | Record<string, unknown> =
0;
while (offset !== null && offset !== undefined) {
const res = await client.scroll(c.name, {
filter: {
must: [
{
is_empty: {
key: "document_id_hash",
},
},
],
},
with_payload: true,
limit: 1024,
offset,
});
// console.log(`Processing ${collections.length} collections.`);
// let i = 0;
// for (const c of collections) {
// let done = 0;
// let offset: number | undefined | null | string | Record<string, unknown> =
// 0;
// while (offset !== null && offset !== undefined) {
// const res = await client.scroll(c.name, {
// filter: {
// must: [
// {
// is_empty: {
// key: "document_id_hash",
// },
// },
// ],
// },
// with_payload: true,
// limit: 1024,
// offset,
// });

offset = res.next_page_offset;
done += res.points.length;
}
if (done > 0) {
console.log(`NULL_FOUND [${c.name}] found=${done}`);
collectionsWithNulls.push(c);
}
i++;
if (i % 32 === 0) {
console.log(`PROCESS [${i}/${collections.length}] sleeping 1s`);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
// offset = res.next_page_offset;
// done += res.points.length;
// }
// if (done > 0) {
// console.log(`NULL_FOUND [${c.name}] found=${done}`);
// collectionsWithNulls.push(c);
// }
// i++;
// if (i % 32 === 0) {
// console.log(`PROCESS [${i}/${collections.length}] sleeping 1s`);
// await new Promise((resolve) => setTimeout(resolve, 1000));
// }
// }

return collectionsWithNulls;
}
// return collectionsWithNulls;
// }

void (async () => {
await cleanup(await run());
})();
// void (async () => {
// await cleanup(await run());
// })();
Loading

0 comments on commit 627f745

Please sign in to comment.