Skip to content

Commit

Permalink
Add connectors temporal namespace to front production checks
Browse files Browse the repository at this point in the history
  • Loading branch information
flvndvd committed Jan 22, 2024
1 parent d47712e commit 65f6f46
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
46 changes: 29 additions & 17 deletions front/lib/temporal.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
import type { ConnectionOptions } from "@temporalio/client";
import { Client, Connection } from "@temporalio/client";
import { NativeConnection } from "@temporalio/worker";
import fs from "fs-extra";

type TemporalNamespaces = "connectors" | "front";
const temporalWorkspaceToEnvVar: Record<TemporalNamespaces, string> = {
connectors: "TEMPORAL_CONNECTORS_NAMESPACE",
front: "TEMPORAL_NAMESPACE",
};

// This is a singleton connection to the Temporal server.
let TEMPORAL_CLIENT: Client | undefined;
const TEMPORAL_CLIENTS: Partial<Record<TemporalNamespaces, Client>> = {};

export async function getTemporalClient() {
if (TEMPORAL_CLIENT) {
return TEMPORAL_CLIENT;
export async function getTemporalClientForNamespace(
namespace: TemporalNamespaces
) {
const cachedClient = TEMPORAL_CLIENTS[namespace];
if (cachedClient) {
return cachedClient;
}
const connectionOptions = await getConnectionOptions();
const envVarForTemporalNamespace = temporalWorkspaceToEnvVar[namespace];
const connectionOptions = await getConnectionOptions(
envVarForTemporalNamespace
);
const connection = await Connection.connect(connectionOptions);
const client = new Client({
connection,
namespace: process.env.TEMPORAL_NAMESPACE,
});
TEMPORAL_CLIENT = client;
TEMPORAL_CLIENTS[namespace] = client;

return client;
}

async function getConnectionOptions(): Promise<
async function getConnectionOptions(
envVarForTemporalNamespace: string
): Promise<
| {
address: string;
tls: ConnectionOptions["tls"];
Expand All @@ -35,8 +48,8 @@ async function getConnectionOptions(): Promise<
return {};
}

const { TEMPORAL_CERT_PATH, TEMPORAL_CERT_KEY_PATH, TEMPORAL_NAMESPACE } =
process.env;
const { TEMPORAL_CERT_PATH, TEMPORAL_CERT_KEY_PATH } = process.env;
const TEMPORAL_NAMESPACE = process.env[envVarForTemporalNamespace];
if (!TEMPORAL_CERT_PATH || !TEMPORAL_CERT_KEY_PATH || !TEMPORAL_NAMESPACE) {
throw new Error(
"TEMPORAL_CERT_PATH, TEMPORAL_CERT_KEY_PATH and TEMPORAL_NAMESPACE are required " +
Expand All @@ -58,11 +71,10 @@ async function getConnectionOptions(): Promise<
};
}

export async function getTemporalWorkerConnection(): Promise<{
connection: NativeConnection;
namespace: string | undefined;
}> {
const connectionOptions = await getConnectionOptions();
const connection = await NativeConnection.connect(connectionOptions);
return { connection, namespace: process.env.TEMPORAL_NAMESPACE };
export async function getTemporalClient() {
return getTemporalClientForNamespace("front");
}

export async function getTemporalConnectorsNamespaceConnection() {
return getTemporalClientForNamespace("connectors");
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Client, WorkflowHandle } from "@temporalio/client";
import { QueryTypes } from "sequelize";

import { getTemporalClient } from "@app/lib/temporal";
import { getTemporalConnectorsNamespaceConnection } from "@app/lib/temporal";
import { getConnectorReplicaDbConnection } from "@app/production_checks/lib/utils";
import type { CheckFunction } from "@app/production_checks/types/check";

Expand Down Expand Up @@ -58,15 +58,15 @@ export const checkNotionActiveWorkflows: CheckFunction = async (
) => {
const notionConnectors = await listAllNotionConnectors();

const client = await getTemporalClient();
const client = await getTemporalConnectorsNamespaceConnection();

logger.info(`Found ${notionConnectors.length} Notion connectors.`);

const missingActiveWorkflows: any[] = [];
for (const notionConnector of notionConnectors) {
heartbeat();

const isActive = isTemporalWorkflowRunning(client, notionConnector);
const isActive = await isTemporalWorkflowRunning(client, notionConnector);
if (!isActive) {
missingActiveWorkflows.push({
connectorId: notionConnector.id,
Expand Down

0 comments on commit 65f6f46

Please sign in to comment.