Skip to content

Commit

Permalink
Switch on TS strict mode for server
Browse files Browse the repository at this point in the history
Gives proper type inference for Elysia.

Signed-off-by: Brian Evans <ebrian101@gmail.com>
  • Loading branch information
mrbrianevans committed Oct 26, 2024
1 parent d19d918 commit 6557557
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 31 deletions.
5 changes: 3 additions & 2 deletions server/src/api/companyNumber.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { getCompanyNumber } from "./saveCompanyNumber"
import { expect, describe, it } from "bun:test"
import { AnyEvent } from "../types/eventTypes"

describe("get company number from events", () => {
it("should get company number from filing event", () => {
Expand All @@ -16,7 +17,7 @@ describe("get company number from events", () => {
})
})

const sampleEvents = {
const sampleEvents: Record<string, AnyEvent> = {
filings: {
"resource_kind": "filing-history",
"resource_uri": "/company/10620794/filing-history/MzM5ODkyOTYzNmFkaXF6a2N4",
Expand Down Expand Up @@ -133,4 +134,4 @@ const sampleEvents = {
"type": "changed"
}
}
}
} as const
5 changes: 3 additions & 2 deletions server/src/api/maintainSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ Listen on the Redis stream for events, and generate JSON schemas for each event
Keeps updating the schema based on each event, so they stay up to date.
*/

import { redisClient } from "../utils/getRedisClient.js"
import { type RedisClient, redisClient } from "../utils/getRedisClient.js"
import { listenRedisStream } from "./listenRedisStream.js"
import { streamPaths } from "../streams/streamPaths.js"
import { extendSchema, createSchema } from "genson-js"
import { streamFromRedisLogger } from "../utils/loggers.js"
import { AnyEvent } from "../types/eventTypes"

/** When an event arrives, merge it with the existing schema in redis (IF EXISTS) and save new schema. */
export async function updateSchemaForEvent(event, commandClient) {
export async function updateSchemaForEvent(event: AnyEvent, commandClient: RedisClient) {
// schemas are specific to each resource kind
const { resource_kind, data } = event
// schemas are stored as stringified JSON in the `schemas` hash in redis
Expand Down
5 changes: 3 additions & 2 deletions server/src/api/routes/healthCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import { streamPaths } from "../../streams/streamPaths"

export const healthCheckRouter = (app: Elysia) => app.get("/health", async () => {
const health = { currentWsConnections: 0, connections: app.server?.pendingWebSockets }
const streamsHealth: Record<string, boolean> = {}
for (const streamPath of streamPaths) {
const lastHeartbeat = await redisClient.hGet("heartbeats", streamPath).then(t => new Date(parseInt(t || "0")))
health[streamPath] = Date.now() - lastHeartbeat.getTime() < 60_000 // more than 60 seconds indicates stream offline
streamsHealth[streamPath] = Date.now() - lastHeartbeat.getTime() < 60_000 // more than 60 seconds indicates stream offline
}
health.currentWsConnections = await redisClient.get("currentWsConnections").then(value => value ? parseInt(value) : 0)
return health
return { ...health, ...streamsHealth }
})
22 changes: 12 additions & 10 deletions server/src/api/saveCompanyNumber.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { RedisClient } from "../utils/getRedisClient.js"
import { AnyEvent, CompanyProfileEvent, FilingEvent } from "../types/eventTypes"

// number of random company numbers kept in redis
const MAX_RANDOM_SIZE = 5_000

/** returns true if a company number was added, false if it already existed or couldn't be parsed */
export async function saveCompanyNumber(redis: RedisClient, event, streamPath) {
export async function saveCompanyNumber(redis: RedisClient, event: AnyEvent, streamPath: string) {
const companyNumber = getCompanyNumber(event, streamPath)
if (companyNumber) {
const existed = await redis.sAdd("companyNumbers", companyNumber).then(res => res === 1)
Expand All @@ -14,7 +15,7 @@ export async function saveCompanyNumber(redis: RedisClient, event, streamPath) {
const size = await redis.sCard("companyNumbers")
if (size > MAX_RANDOM_SIZE) {
const remove = await redis.sRandMember("companyNumbers")
await redis.sRem("companyNumbers", remove)
await redis.sRem("companyNumbers", remove ?? "")
}
}
return existed
Expand All @@ -24,36 +25,37 @@ export async function saveCompanyNumber(redis: RedisClient, event, streamPath) {


/** Gets the company number from an event, based on which stream the event was sent on */
export function getCompanyNumber(event, streamPath: string) {
export function getCompanyNumber(event: AnyEvent, streamPath: string) {
switch (streamPath) {
case "companies":
return event.data.company_number
const companyEvent = event as CompanyProfileEvent.CompanyProfileEvent
return companyEvent.data.company_number
case "filings": {
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/filing-history/)
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/filing-history/) ?? []
return companyNumber
}
case "officers": {
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/appointments/)
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/appointments/) ?? []
return companyNumber
}
case "persons-with-significant-control": {
const [, companyNumber] = event.resource_uri.match(
/^\/company\/([A-Z0-9]{6,8})\/persons-with-significant-control/
)
) ?? []
return companyNumber
}
case "charges": {
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/charges/)
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\/charges/) ?? []
return companyNumber
}
case "insolvency-cases":
return event.resource_id
case "persons-with-significant-control-statements": {
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\//)
const [, companyNumber] = event.resource_uri.match(/^\/company\/([A-Z0-9]{8})\//) ?? []
return companyNumber
}
case "company-exemptions": {
const [, companyNumber] = event.resource_uri.match(/company\/([A-Z0-9]{8})\/exemptions/)
const [, companyNumber] = event.resource_uri.match(/company\/([A-Z0-9]{8})\/exemptions/) ?? []
return companyNumber
}
default:
Expand Down
2 changes: 1 addition & 1 deletion server/src/api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { redisClient } from "../utils/getRedisClient"
import { setTimeout } from "node:timers/promises"

const app = new Elysia()
.on('error', (err)=>console.error('Error', err.request.url, err.error.message))
.on("error", (err) => console.error("Error", err.request.url, err.error.message))
.use(healthCheckRouter)
.use(miscRouter)
.use(eventHistoryRouter)
Expand Down
18 changes: 10 additions & 8 deletions server/src/chStreamToRedis/streamToRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { setTimeout } from "node:timers/promises"
import pino from "pino"
import { streamPaths } from "../streams/streamPaths.js"
import { Transform } from "stream"
import { AnyEvent } from "../types/eventTypes"
import { Readable } from "node:stream"
/*
This file listens to the Companies House long polling streaming API, and when events are received, they are posted
Expand All @@ -18,19 +20,19 @@ streamKeyHolder.addKey(process.env.STREAM_KEY2)

const logger = pino()

const sendEvent = streamPath => event => redisClient.xAdd("events:" + streamPath, event.event.timepoint + "-*", { "event": JSON.stringify(event) }, {
const sendEvent = (streamPath: string) => (event: AnyEvent) => redisClient.xAdd("events:" + streamPath, event.event.timepoint + "-*", { "event": JSON.stringify(event) }, {
TRIM: {
strategy: "MAXLEN",
threshold: 10000,
strategyModifier: "~"
}
})
const incrEventCount = streamPath => event => redisClient.hIncrBy(`counts:${streamPath}:daily`, new Date().toISOString().split("T")[0], 1)
const incrResourceKindCount = streamPath => event => redisClient.hIncrBy(`resourceKinds:${streamPath}`, event.resource_kind, 1)
const updateTimepoint = streamPath => event => redisClient.set("timepoints:"+ streamPath, JSON.stringify(event.event),{EX:86400*7})
const heartbeat = streamPath => () => redisClient.hSet("heartbeats", streamPath, Date.now()) // keeps track of which are alive
const getMostRecentTimepoint = streamPath => redisClient.get("timepoints:"+ streamPath).then(r => r ? JSON.parse(r)?.timepoint : undefined)
const startStream = streamPath => getMostRecentTimepoint(streamPath)
const incrEventCount = (streamPath: string) => (event: AnyEvent) => redisClient.hIncrBy(`counts:${streamPath}:daily`, new Date().toISOString().split("T")[0], 1)
const incrResourceKindCount = (streamPath: string) => (event: AnyEvent) => redisClient.hIncrBy(`resourceKinds:${streamPath}`, event.resource_kind, 1)
const updateTimepoint = (streamPath: string) => (event: AnyEvent) => redisClient.set("timepoints:" + streamPath, JSON.stringify(event.event), { EX: 86400 * 7 })
const heartbeat = (streamPath: string) => () => redisClient.hSet("heartbeats", streamPath, Date.now()) // keeps track of which are alive
const getMostRecentTimepoint = (streamPath: string) => redisClient.get("timepoints:" + streamPath).then(r => r ? JSON.parse(r)?.timepoint : undefined)
const startStream = (streamPath: string): Promise<Readable> => getMostRecentTimepoint(streamPath)
.then((timepoint) => stream(streamPath, timepoint)
.on("data", sendEvent(streamPath))
.on("data", updateTimepoint(streamPath))
Expand All @@ -44,7 +46,7 @@ const startStream = streamPath => getMostRecentTimepoint(streamPath)
.then(() => logger.info({ streamPath }, "Restarting stream, after waiting 60 seconds since disconnected."))
.then(() => startStream(streamPath))))// restart on end

const streams = new Set<Transform>()
const streams = new Set<Readable>()
for (const streamPath of streamPaths) {
streams.add(await startStream(streamPath))
await setTimeout(5000) // space them out 5 seconds
Expand Down
8 changes: 4 additions & 4 deletions server/src/streams/jsonParseStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Transform } from "stream"
import { Transform, TransformCallback, TransformOptions } from "stream"
import { performance } from "perf_hooks"

/**
Expand All @@ -8,7 +8,7 @@ export class CustomJsonParse extends Transform {
private data: string
private readonly addTimestamp: boolean

constructor(options, addTimestamp: boolean = false) {
constructor(options: TransformOptions, addTimestamp: boolean = false) {
super({
...options,
decodeStrings: false, // stops the strings being converted to buffers
Expand All @@ -18,7 +18,7 @@ export class CustomJsonParse extends Transform {
this.addTimestamp = addTimestamp
}

_transform(chunk: Buffer, encoding, callback) {
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
this.emit("heartbeat") // this will emit even if the chunk is just a newline (heartbeat on the stream).
const received = performance.timeOrigin + performance.now() // collect timestamp that event was received
this.data += chunk.toString("utf8")
Expand All @@ -41,7 +41,7 @@ export class CustomJsonParse extends Transform {
callback()
}

_flush(callback) {
_flush(callback: TransformCallback) {
// console.debug("Flush called. This.data=", this.data)
try {
// there may still be unparsed data remaining in this.data
Expand Down
3 changes: 2 additions & 1 deletion server/src/streams/listenOnStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { request } from "https"
import { streamKeyHolder } from "../utils/KeyHolder.js"
import pino from "pino"
import { CustomJsonParse } from "./jsonParseStream.js"
import { Readable } from "node:stream"

export type StreamPath =
| "insolvency-cases"
Expand All @@ -17,7 +18,7 @@ export type StreamPath =
/**
* Returns a readable stream of events. The recommended way of listening to a stream in this application.
*/
export function stream<EventType>(streamPath: StreamPath, startFromTimepoint?: number) {
export function stream<EventType>(streamPath: StreamPath, startFromTimepoint?: number): Readable {
const logger = pino({ base: { streamPath } })
const streamKey = streamKeyHolder.useKey()
const timepointQueryString = typeof startFromTimepoint === "number" ? `?timepoint=${startFromTimepoint}` : ""
Expand Down
25 changes: 25 additions & 0 deletions server/src/types/eventTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,37 @@ declare module OfficerEvent {
}
}

declare module ExemptionsEvent {
//TODO: fill out this interface with an auto-generated one based on recent events on the stream
interface ExemptionsEvent {
data: IExemptionsData
event: IExemptionsEvent
resource_id: string
resource_kind: "company-exemptions"
resource_uri: string
}

interface IExemptionsEvent {
timepoint: number
published_at: string
type: string
}

interface IExemptionsData {
kind: "exemptions"

[key: string]: unknown
}
}

type AnyEvent =
CompanyProfileEvent.CompanyProfileEvent
| FilingEvent.FilingEvent
| PscEvent.PscEvent
| InsolvencyEvent.InsolvencyEvent
| ChargesEvent.ChargesEvent
| OfficerEvent.OfficerEvent
| ExemptionsEvent.ExemptionsEvent;
//TODO: add interfaces for other streams that have since been added. eg PSC statements.
export type { CompanyProfileEvent, FilingEvent, PscEvent, InsolvencyEvent, ChargesEvent, OfficerEvent, AnyEvent }

2 changes: 1 addition & 1 deletion server/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"allowJs": true,
"esModuleInterop": true,
// best practices
"strict": false,
"strict": true,
// TODO: put to true, fix errors
"forceConsistentCasingInFileNames": true,
"skipLibCheck": true,
Expand Down

0 comments on commit 6557557

Please sign in to comment.