Skip to content

Commit

Permalink
feat: mostly just duplicate audit logs and other clickhouse stuff (#2134
Browse files Browse the repository at this point in the history
)

* feat: mostly just duplicate audit logs and other clickhouse stuff

* chore: audit logs for trpc
  • Loading branch information
chronark authored and harshsbhat committed Sep 28, 2024
1 parent a1528d1 commit 99cd54d
Show file tree
Hide file tree
Showing 124 changed files with 3,818 additions and 3,852 deletions.
4 changes: 0 additions & 4 deletions apps/agent/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ tasks:
cmds:
- golangci-lint run

migrate:
cmds:
- goose -dir=./pkg/clickhouse/schema clickhouse "tcp://127.0.0.1:9000" up

generate:
cmds:
- go get github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen
Expand Down
10 changes: 5 additions & 5 deletions apps/agent/pkg/clickhouse/schema/000_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Examples:
### Aggregation Suffixes

For aggregated or summary tables, use suffixes like:
- `_daily`
- `_monthly`
- `_per_day`
- `_per_month`
- `_summary`

## Materialized View Naming Convention
Expand All @@ -54,19 +54,19 @@ Format: `mv_[description]_[aggregation]`
`raw_sales_transactions_v1`

2. Materialized View:
`mv_active_users_daily_v2`
`mv_active_users_per_day_v2`

3. Temporary Table:
`tmp_andreas_user_analysis_v1`

4. Aggregated Table:
`mv_sales_summary_daily_v1`
`mv_sales_summary_per_hour_v1`

## Consistency Across Related Objects

Maintain consistent naming across related tables, views, and other objects:

- `raw_user_activity_v1`
- `mv_user_activity_daily_v1`
- `mv_user_activity_per_day_v1`

By following these conventions, we ensure a clear, consistent, and scalable naming structure for our ClickHouse setup.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose up
CREATE TABLE default.raw_telemetry_sdks_v1(
-- the api request id, so we can correlate the telemetry with traces and logs
request_id String,

-- unix milli
time Int64,

-- ie: node@20
runtime String,
-- ie: vercel
platform String,

-- ie: [ "@unkey/api@1.2.3", "@unkey/ratelimit@4.5.6" ]
versions Array(String)
)
ENGINE = MergeTree()
ORDER BY (request_id, time)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +goose up
CREATE TABLE default.key_verifications_per_day_v1
(
time DateTime,
workspace_id String,
key_space_id String,
identity_id String,
key_id String,
outcome LowCardinality(String),
count AggregateFunction(count, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (workspace_id, key_space_id, time, identity_id, key_id)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose up
CREATE MATERIALIZED VIEW default.mv_key_verifications_per_day_v1 TO default.key_verifications_per_day_v1 AS
SELECT
workspace_id,
key_space_id,
identity_id,
key_id,
outcome,
countState() as count,
toStartOfDay(fromUnixTimestamp64Milli(time)) AS time
FROM default.raw_key_verifications_v1
GROUP BY
workspace_id,
key_space_id,
identity_id,
key_id,
outcome,
time
;
23 changes: 19 additions & 4 deletions apps/api/src/pkg/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ export class Analytics {
this.clickhouse = opts.clickhouse ? new ch.Client({ url: opts.clickhouse.url }) : new ch.Noop();
}

public get insertSdkTelemetry() {
return this.clickhouse.insert({
table: "default.raw_telemetry_sdks_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
runtime: z.string(),
platform: z.string(),
versions: z.array(z.string()),
}),
});
}
//tinybird, to be removed
public get ingestSdkTelemetry() {
return this.writeClient.buildIngestEndpoint({
datasource: "sdk_telemetry__v1",
Expand All @@ -54,7 +67,8 @@ export class Analytics {
});
}

public ingestUnkeyAuditLogs(logs: MaybeArray<UnkeyAuditLog>) {
//tinybird
public ingestUnkeyAuditLogsTinybird(logs: MaybeArray<UnkeyAuditLog>) {
return this.writeClient.buildIngestEndpoint({
datasource: "audit_logs__v2",
event: auditLogSchemaV1
Expand All @@ -78,7 +92,8 @@ export class Analytics {
})(logs);
}

public get ingestGenericAuditLogs() {
//tinybird
public get ingestGenericAuditLogsTinybird() {
return this.writeClient.buildIngestEndpoint({
datasource: "audit_logs__v2",
event: auditLogSchemaV1.transform((l) => ({
Expand All @@ -92,7 +107,7 @@ export class Analytics {
})),
});
}

//tinybird
public get ingestRatelimit() {
return this.writeClient.buildIngestEndpoint({
datasource: "ratelimits__v2",
Expand Down Expand Up @@ -146,7 +161,7 @@ export class Analytics {
}),
});
}

// replaced by insertKeyVerification
public get ingestKeyVerification() {
return this.writeClient.buildIngestEndpoint({
datasource: "key_verifications__v2",
Expand Down
106 changes: 106 additions & 0 deletions apps/api/src/pkg/audit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import type { Context } from "@/pkg/hono/app";
import { auditLogSchemaV1, unkeyAuditLogEvents } from "@unkey/schema/src/auditlog";
import { type Transaction, schema } from "./db";

import { newId } from "@unkey/id";
import { z } from "zod";
import type { UnkeyAuditLog } from "./analytics";

export async function insertUnkeyAuditLog(
c: Context,
tx: Transaction | undefined,
auditLogs: UnkeyAuditLog | Array<UnkeyAuditLog>,
): Promise<void> {
const schema = auditLogSchemaV1.merge(
z.object({
event: unkeyAuditLogEvents,
auditLogId: z.string().default(newId("auditLog")),
bucket: z.string().default("unkey_mutations"),
time: z.number().default(Date.now()),
}),
);

const arr = Array.isArray(auditLogs) ? auditLogs : [auditLogs];
return insertGenericAuditLogs(
c,
tx,
arr.map((l) => schema.parse(l)),
);
}

export async function insertGenericAuditLogs(
c: Context,
tx: Transaction | undefined,
auditLogs: z.infer<typeof auditLogSchemaV1> | z.infer<typeof auditLogSchemaV1>[],
): Promise<void> {
const arr = Array.isArray(auditLogs) ? auditLogs : [auditLogs];

if (arr.length === 0) {
return;
}

const { cache, logger, db } = c.get("services");

for (const log of arr) {
const { val: bucket, err } = await cache.auditLogBucketByWorkspaceIdAndName.swr(
[log.workspaceId, log.bucket].join(":"),
async () => {
const bucket = await (tx ?? db.primary).query.auditLogBucket.findFirst({
where: (table, { eq, and }) =>
and(eq(table.workspaceId, log.workspaceId), eq(table.name, log.bucket)),
});
if (!bucket) {
return undefined;
}
return {
id: bucket.id,
};
},
);
if (err) {
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
error: err.message,
});
continue;
}

if (!bucket) {
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
});
continue;
}

const auditLogId = newId("auditLog");
await (tx ?? db.primary).insert(schema.auditLog).values({
id: auditLogId,
workspaceId: log.workspaceId,
bucketId: bucket.id,
event: log.event,
time: log.time,

display: log.description ?? "",

remoteIp: log.context?.location,

userAgent: log.context?.userAgent,
actorType: log.actor.type,
actorId: log.actor.id,
actorName: log.actor.name,
actorMeta: log.actor.meta,
});
await (tx ?? db.primary).insert(schema.auditLogTarget).values(
log.resources.map((r) => ({
workspaceId: log.workspaceId,
bucketId: bucket.id,
auditLogId,
displayName: r.name ?? "",
type: r.type,
id: r.id,
name: r.name,
meta: r.meta,
})),
);
}
}
3 changes: 3 additions & 0 deletions apps/api/src/pkg/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export function initCache(c: Context<HonoEnv>, metrics: Metrics): C<CacheNamespa
c.executionCtx,
defaultOpts,
),
auditLogBucketByWorkspaceIdAndName: new Namespace<
CacheNamespaces["auditLogBucketByWorkspaceIdAndName"]
>(c.executionCtx, defaultOpts),
});
}

Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/pkg/cache/namespaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ export type CacheNamespaces = {
total: number;
};
identityByExternalId: Identity | null;
// uses a compound key of [workspaceId, name]
auditLogBucketByWorkspaceIdAndName: {
id: string;
};
};

export type CacheNamespace = keyof CacheNamespaces;
3 changes: 1 addition & 2 deletions apps/api/src/pkg/db.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Client } from "@planetscale/database";
import { type PlanetScaleDatabase, drizzle, schema } from "@unkey/db";
import { type Database, drizzle, schema } from "@unkey/db";
import type { Logger } from "@unkey/worker-logging";
import { instrumentedFetch } from "./util/instrument-fetch";
export type Database = PlanetScaleDatabase<typeof schema>;

type ConnectionOptions = {
host: string;
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/pkg/key_migration/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export async function migrateKey(
environment: message.environment,
});

await analytics.ingestUnkeyAuditLogs({
await analytics.ingestUnkeyAuditLogsTinybird({
workspaceId: message.workspaceId,
event: "key.create",
actor: {
Expand Down Expand Up @@ -149,7 +149,7 @@ export async function migrateKey(

await tx.insert(schema.keysRoles).values(roleConnections);

await analytics.ingestUnkeyAuditLogs(
await analytics.ingestUnkeyAuditLogsTinybird(
roleConnections.map((rc) => ({
workspaceId: message.workspaceId,
actor: { type: "key", id: message.rootKeyId },
Expand Down Expand Up @@ -184,7 +184,7 @@ export async function migrateKey(

await tx.insert(schema.keysPermissions).values(permissionConnections);

await analytics.ingestUnkeyAuditLogs(
await analytics.ingestUnkeyAuditLogsTinybird(
permissionConnections.map((pc) => ({
workspaceId: message.workspaceId,
actor: { type: "key", id: message.rootKeyId },
Expand Down
18 changes: 17 additions & 1 deletion apps/api/src/pkg/middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export function metrics(): MiddlewareHandler<HonoEnv> {

c.executionCtx.waitUntil(
analytics.ingestSdkTelemetry(event).catch((err) => {
logger.error("Error ingesting SDK telemetry", {
logger.error("Error ingesting SDK telemetry into tinybird", {
method: c.req.method,
path: c.req.path,
error: err.message,
Expand All @@ -62,6 +62,22 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
});
}),
);
c.executionCtx.waitUntil(
analytics
.insertSdkTelemetry({
...event,
request_id: event.requestId,
})
.catch((err) => {
logger.error("Error inserting SDK telemetry", {
method: c.req.method,
path: c.req.path,
error: err.message,
telemetry,
event,
});
}),
);
}

await next();
Expand Down
24 changes: 23 additions & 1 deletion apps/api/src/routes/legacy_keys_createKey.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { App } from "@/pkg/hono/app";
import { createRoute, z } from "@hono/zod-openapi";

import { insertUnkeyAuditLog } from "@/pkg/audit";
import { rootKeyAuth } from "@/pkg/auth/root_key";
import { UnkeyApiError, openApiErrorResponses } from "@/pkg/errors";
import { schema } from "@unkey/db";
Expand Down Expand Up @@ -227,7 +228,28 @@ export const registerLegacyKeysCreate = (app: App) =>
deletedAt: null,
});

await analytics.ingestUnkeyAuditLogs({
await analytics.ingestUnkeyAuditLogsTinybird({
workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
description: `Created ${keyId}`,
resources: [
{
type: "key",
id: keyId,
},
{
type: "keyAuth",
id: api.keyAuthId!,
},
{ type: "api", id: api.id },
],
context: {
location: c.get("location"),
userAgent: c.get("userAgent"),
},
});
await insertUnkeyAuditLog(c, tx, {
workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
Expand Down
Loading

0 comments on commit 99cd54d

Please sign in to comment.