Skip to content

Commit

Permalink
Put it a dedicated worker
Browse files Browse the repository at this point in the history
  • Loading branch information
PopDaph committed Jan 16, 2025
1 parent 112684b commit 2ee1d3a
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 139 deletions.
5 changes: 4 additions & 1 deletion front/start_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { hideBin } from "yargs/helpers";

import logger from "@app/logger/logger";
import { runPokeWorker } from "@app/poke/temporal/worker";
import { runDataRetentionWorker } from "@app/temporal/data_retention/worker";
import { runHardDeleteWorker } from "@app/temporal/hard_delete/worker";
import { runLabsWorker } from "@app/temporal/labs/worker";
import { runMentionsCountWorker } from "@app/temporal/mentions_count_queue/worker";
Expand Down Expand Up @@ -32,7 +33,8 @@ type WorkerName =
| "scrub_workspace_queue"
| "update_workspace_usage"
| "upsert_queue"
| "upsert_table_queue";
| "upsert_table_queue"
| "data_retention";

const workerFunctions: Record<WorkerName, () => Promise<void>> = {
hard_delete: runHardDeleteWorker,
Expand All @@ -47,6 +49,7 @@ const workerFunctions: Record<WorkerName, () => Promise<void>> = {
update_workspace_usage: runUpdateWorkspaceUsageWorker,
upsert_queue: runUpsertQueueWorker,
upsert_table_queue: runUpsertTableQueueWorker,
data_retention: runDataRetentionWorker,
};
const ALL_WORKERS = Object.keys(workerFunctions);

Expand Down
81 changes: 81 additions & 0 deletions front/temporal/data_retention/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import _ from "lodash";
import { Op } from "sequelize";

import { destroyConversation } from "@app/lib/api/assistant/conversation/destroy";
import { Authenticator } from "@app/lib/auth";
import { Conversation } from "@app/lib/models/assistant/conversation";
import { Workspace } from "@app/lib/models/workspace";
import logger from "@app/logger/logger";

/**
* Get workspace ids with conversations retention policy.
*/
export async function getWorkspacesWithConversationsRetentionActivity(): Promise<
number[]
> {
const workspaces = await Workspace.findAll({
attributes: ["id"],
where: {
conversationsRetentionDays: {
[Op.not]: null,
},
},
});
return workspaces.map((w) => w.id);
}

/**
* Purge conversations for workspaces with retention policy.
* We chunk the workspaces to avoid hitting the database with too many queries at once.
*/
export async function purgeConversationsBatchActivity({
workspaceIds,
}: {
workspaceIds: number[];
}) {
for (const workspaceId of workspaceIds) {
const workspace = await Workspace.findByPk(workspaceId);
if (!workspace) {
logger.error(
{ workspaceId },
"Workspace with retention policy not found."
);
continue;
}
if (!workspace.conversationsRetentionDays) {
logger.error(
{ workspaceId },
"Workspace with retention policy has no retention days."
);
continue;
}
const retentionDays = workspace.conversationsRetentionDays;
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

const conversations = await Conversation.findAll({
where: { workspaceId: workspace.id, updatedAt: { [Op.lt]: cutoffDate } },
});

logger.info(
{
workspaceId,
retentionDays,
cutoffDate,
nbConversations: conversations.length,
},
"Purging conversations for workspace."
);

const auth = await Authenticator.internalAdminForWorkspace(workspace.sId);

const conversationChunks = _.chunk(conversations, 4);
for (const conversationChunk of conversationChunks) {
await Promise.all(
conversationChunk.map(async (c) => {
await destroyConversation(auth, { conversationId: c.sId });
})
);
}
}
}
49 changes: 49 additions & 0 deletions front/temporal/data_retention/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { Result } from "@dust-tt/types";
import { Err, Ok } from "@dust-tt/types";
import {
ScheduleAlreadyRunning,
ScheduleOverlapPolicy,
} from "@temporalio/client";

import { getTemporalClient } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { getPurgeDataRetentionScheduleId } from "@app/temporal/data_retention/utils";
import { purgeDataRetentionWorkflow } from "@app/temporal/data_retention/workflows";

import { QUEUE_NAME } from "./config";

/**
* This function starts a schedule to purge workspaces set up with retention policy (only concern conversations at the moment).
*/
export async function launchPurgeDataRetentionSchedule(): Promise<
Result<undefined, Error>
> {
const client = await getTemporalClient();
const scheduleId = getPurgeDataRetentionScheduleId();

try {
await client.schedule.create({
action: {
type: "startWorkflow",
workflowType: purgeDataRetentionWorkflow,
args: [],
taskQueue: QUEUE_NAME,
},
scheduleId,
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
},
spec: {
intervals: [{ every: "24h" }],
},
});
} catch (err) {
if (!(err instanceof ScheduleAlreadyRunning)) {
logger.error({}, "Failed to start purge data retention schedule.");

return new Err(err as Error);
}
}

return new Ok(undefined);
}
3 changes: 3 additions & 0 deletions front/temporal/data_retention/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const QUEUE_VERSION = 1;

export const QUEUE_NAME = `data-retention-queue-v${QUEUE_VERSION}`;
7 changes: 7 additions & 0 deletions front/temporal/data_retention/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Purge Data retention logic.
*/

export function getPurgeDataRetentionScheduleId() {
return "purge-data-retention-schedule";
}
34 changes: 34 additions & 0 deletions front/temporal/data_retention/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";

import { getTemporalWorkerConnection } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import { launchPurgeDataRetentionSchedule } from "@app/temporal/data_retention/client";
import * as activities from "@app/temporal/hard_delete/activities";

import { QUEUE_NAME } from "./config";

export async function runDataRetentionWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: QUEUE_NAME,
maxConcurrentActivityTaskExecutions: 32,
connection,
namespace,
interceptors: {
activityInbound: [
(ctx: Context) => {
return new ActivityInboundLogInterceptor(ctx, logger);
},
],
},
});

// Start the schedule.
await launchPurgeDataRetentionSchedule();

await worker.run();
}
22 changes: 22 additions & 0 deletions front/temporal/data_retention/workflows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { proxyActivities } from "@temporalio/workflow";
import _ from "lodash";

import type * as activities from "@app/temporal/data_retention/activities";

const {
getWorkspacesWithConversationsRetentionActivity,
purgeConversationsBatchActivity,
} = proxyActivities<typeof activities>({
startToCloseTimeout: "15 minutes",
});

export async function purgeDataRetentionWorkflow(): Promise<void> {
const workspaceIds = await getWorkspacesWithConversationsRetentionActivity();
const workspaceChunks = _.chunk(workspaceIds, 4);

for (const workspaceChunk of workspaceChunks) {
await purgeConversationsBatchActivity({
workspaceIds: workspaceChunk,
});
}
}
73 changes: 1 addition & 72 deletions front/temporal/hard_delete/activities.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { Context } from "@temporalio/activity";
import _ from "lodash";
import { Op, QueryTypes, Sequelize } from "sequelize";
import { QueryTypes, Sequelize } from "sequelize";

import { destroyConversation } from "@app/lib/api/assistant/conversation/destroy";
import { Authenticator } from "@app/lib/auth";
import { Conversation } from "@app/lib/models/assistant/conversation";
import { Workspace } from "@app/lib/models/workspace";
import config from "@app/lib/production_checks/config";
import logger from "@app/logger/logger";
import type {
Expand Down Expand Up @@ -124,69 +119,3 @@ async function deleteRunExecutionBatch(
},
});
}

export async function getWorkspacesWithConversationsRetentionActivity(): Promise<
number[]
> {
const workspaces = await Workspace.findAll({
attributes: ["id"],
where: {
conversationsRetentionDays: {
[Op.not]: null,
},
},
});
return workspaces.map((w) => w.id);
}

export async function purgeConversationsBatchActivity({
workspaceIds,
}: {
workspaceIds: number[];
}) {
for (const workspaceId of workspaceIds) {
const workspace = await Workspace.findByPk(workspaceId);
if (!workspace) {
logger.error(
{ workspaceId },
"Workspace with retention policy not found."
);
continue;
}
if (!workspace.conversationsRetentionDays) {
logger.error(
{ workspaceId },
"Workspace with retention policy has no retention days."
);
continue;
}
const retentionDays = workspace.conversationsRetentionDays;
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

const conversations = await Conversation.findAll({
where: { workspaceId: workspace.id, updatedAt: { [Op.lt]: cutoffDate } },
});

logger.info(
{
workspaceId,
retentionDays,
cutoffDate,
nbConversations: conversations.length,
},
"Purging conversations for workspace."
);

const auth = await Authenticator.internalAdminForWorkspace(workspace.sId);

const conversationChunks = _.chunk(conversations, 4);
for (const conversationChunk of conversationChunks) {
await Promise.all(
conversationChunk.map(async (c) => {
await destroyConversation(auth, { conversationId: c.sId });
})
);
}
}
}
41 changes: 1 addition & 40 deletions front/temporal/hard_delete/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import {

import { getTemporalClient } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import {
getPurgeDataRetentionScheduleId,
getPurgeRunExecutionsScheduleId,
} from "@app/temporal/hard_delete/utils";
import { getPurgeRunExecutionsScheduleId } from "@app/temporal/hard_delete/utils";
import { purgeRunExecutionsCronWorkflow } from "@app/temporal/hard_delete/workflows";

import { QUEUE_NAME } from "./config";
Expand Down Expand Up @@ -50,39 +47,3 @@ export async function launchPurgeRunExecutionsSchedule(): Promise<

return new Ok(undefined);
}

/**
* This function starts a schedule to purge workspaces set up with retention policy (only concern conversations at the moment).
*/
export async function launchPurgeDataRetentionSchedule(): Promise<
Result<undefined, Error>
> {
const client = await getTemporalClient();
const scheduleId = getPurgeDataRetentionScheduleId();

try {
await client.schedule.create({
action: {
type: "startWorkflow",
workflowType: purgeRunExecutionsCronWorkflow,
args: [],
taskQueue: QUEUE_NAME,
},
scheduleId,
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
},
spec: {
intervals: [{ every: "24h" }],
},
});
} catch (err) {
if (!(err instanceof ScheduleAlreadyRunning)) {
logger.error({}, "Failed to start purge data retention schedule.");

return new Err(err as Error);
}
}

return new Ok(undefined);
}
8 changes: 0 additions & 8 deletions front/temporal/hard_delete/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,3 @@ export function getRunExecutionsDeletionCutoffDate(): number {

return cutoffDate.getTime();
}

/**
* Purge Data retention logic.
*/

export function getPurgeDataRetentionScheduleId() {
return "purge-data-retention-schedule";
}
Loading

0 comments on commit 2ee1d3a

Please sign in to comment.