Skip to content

Commit

Permalink
fix: upload stream monitoring (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Oct 3, 2024
1 parent 6bf02c7 commit d4a0cd8
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 172 deletions.
4 changes: 4 additions & 0 deletions src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export const storage = fastifyPlugin(
request.backend = storageBackend
request.storage = new Storage(storageBackend, database)
})

fastify.addHook('onClose', async () => {
storageBackend.close()
})
},
{ name: 'storage-init' }
)
25 changes: 19 additions & 6 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import {
import { TenantConnection, PubSub } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { NodeHttpHandler } from '@smithy/node-http-handler'
import { createAgent } from '@storage/backend'
import { ROUTE_OPERATIONS } from '../operations'
import * as https from 'node:https'
import { createAgent } from '@internal/http'

const {
storageS3MaxSockets,
storageS3Bucket,
storageS3Endpoint,
storageS3ForcePathStyle,
Expand All @@ -57,9 +59,8 @@ type MultiPartRequest = http.IncomingMessage & {
}
}

function createTusStore() {
function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) {
if (storageBackendType === 's3') {
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand All @@ -84,8 +85,11 @@ function createTusStore() {
})
}

function createTusServer(lockNotifier: LockNotifier) {
const datastore = createTusStore()
function createTusServer(
lockNotifier: LockNotifier,
agent: { httpsAgent: https.Agent; httpAgent: http.Agent }
) {
const datastore = createTusStore(agent)
const serverOptions: ServerOptions & {
datastore: DataStore
} = {
Expand Down Expand Up @@ -139,7 +143,16 @@ export default async function routes(fastify: FastifyInstance) {
const lockNotifier = new LockNotifier(PubSub)
await lockNotifier.subscribe()

const tusServer = createTusServer(lockNotifier)
const agent = createAgent('s3_tus', {
maxSockets: storageS3MaxSockets,
})
agent.monitor()

fastify.addHook('onClose', () => {
agent.close()
})

const tusServer = createTusServer(lockNotifier, agent)

// authenticated routes
fastify.register(async (fastify) => {
Expand Down
23 changes: 1 addition & 22 deletions src/internal/concurrency/stream.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
import { Transform, TransformCallback } from 'stream'

interface ByteCounterStreamOptions {
maxHistory?: number
onMaxHistory?: (history: Date[]) => void
rewriteHistoryOnMax?: boolean
}

export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
const { maxHistory = 100 } = options

export const createByteCounterStream = () => {
let bytes = 0
let history: Date[] = []

const transformStream = new Transform({
transform(chunk: Buffer, encoding: string, callback: TransformCallback) {
bytes += chunk.length
history.push(new Date())

if (history.length === maxHistory) {
if (options.rewriteHistoryOnMax) {
options.onMaxHistory?.(history)
history = []
}
}

callback(null, chunk)
},
})
Expand All @@ -33,8 +15,5 @@ export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
get bytes() {
return bytes
},
get history() {
return history
},
}
}
132 changes: 132 additions & 0 deletions src/internal/http/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import Agent, { HttpsAgent } from 'agentkeepalive'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'
import { getConfig } from '../../config'

const { region } = getConfig()

export interface InstrumentedAgent {
httpAgent: Agent
httpsAgent: HttpsAgent
monitor: () => NodeJS.Timeout | undefined
close: () => void
}

export interface AgentStats {
busySocketCount: number
freeSocketCount: number
pendingRequestCount: number
errorSocketCount: number
timeoutSocketCount: number
createSocketErrorCount: number
}

/**
* Creates an instrumented agent
* Adding prometheus metrics to the agent
*/
export function createAgent(name: string, options: { maxSockets: number }): InstrumentedAgent {
const agentOptions = {
maxSockets: options.maxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)
let watcher: NodeJS.Timeout | undefined = undefined

return {
httpAgent,
httpsAgent,
monitor: () => {
const agent = watchAgent(name, 'https', httpsAgent)
watcher = agent
return agent
},
close: () => {
if (watcher) {
clearInterval(watcher)
}
},
}
}

/**
* Metrics
*
* HttpPoolSockets
* HttpPoolFreeSockets
* HttpPoolPendingRequests
* HttpPoolError
*
* @param name
* @param protocol
* @param stats
*/
function updateHttpAgentMetrics(name: string, protocol: string, stats: AgentStats) {
// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, stats.busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, stats.freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, stats.pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, stats.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
stats.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
stats.createSocketErrorCount
)
}

export function watchAgent(name: string, protocol: 'http' | 'https', agent: Agent | HttpsAgent) {
return setInterval(() => {
const httpStatus = agent.getCurrentStatus()

const httpStats = gatherHttpAgentStats(httpStatus)

updateHttpAgentMetrics(name, protocol, httpStats)
}, 5000)
}

// Function to update Prometheus metrics based on the current status of the agent
export function gatherHttpAgentStats(status: Agent.AgentStatus) {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

return {
busySocketCount,
freeSocketCount,
pendingRequestCount,
errorSocketCount: status.errorSocketCount,
timeoutSocketCount: status.timeoutSocketCount,
createSocketErrorCount: status.createSocketErrorCount,
}
}
1 change: 1 addition & 0 deletions src/internal/http/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './agent'
8 changes: 8 additions & 0 deletions src/internal/queue/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
return this.queueName + '-slow'
}

static onClose() {
// no-op
}

static onStart() {
// no-op
}

static batchSend<T extends Event<any>[]>(messages: T) {
return Queue.getInstance().insert(
messages.map((message) => {
Expand Down
23 changes: 21 additions & 2 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export abstract class Queue {
opts.registerWorkers()
}

await Queue.callStart()
await Queue.startWorkers(opts.onMessage)

if (opts.signal) {
Expand All @@ -96,7 +97,8 @@ export abstract class Queue {
type: 'queue',
})
return Queue.stop()
.then(() => {
.then(async () => {
await Queue.callClose()
logSchema.info(logger, '[Queue] Exited', {
type: 'queue',
})
Expand Down Expand Up @@ -142,7 +144,8 @@ export abstract class Queue {
})

await new Promise((resolve) => {
boss.once('stopped', () => {
boss.once('stopped', async () => {
await this.callClose()
resolve(null)
})
})
Expand All @@ -166,6 +169,22 @@ export abstract class Queue {
return Promise.all(workers)
}

protected static callStart() {
const events = Queue.events.map((event) => {
return event.onStart()
})

return Promise.all(events)
}

protected static callClose() {
const events = Queue.events.map((event) => {
return event.onClose()
})

return Promise.all(events)
}

protected static registerTask(
queueName: string,
event: SubclassOfBaseClass,
Expand Down
9 changes: 6 additions & 3 deletions src/scripts/export-docs.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { promises as fs } from 'fs'
import app from '../app'
;(async () => {
const response = await app({
const storageApp = app({
exposeDocs: true,
}).inject({
})

const response = await storageApp.inject({
method: 'GET',
url: '/documentation/json',
})

await fs.writeFile('static/api.json', response.body)
process.exit(0)

await storageApp.close()
})()
4 changes: 4 additions & 0 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ export abstract class StorageBackendAdapter {
): Promise<{ eTag?: string; lastModified?: Date }> {
throw new Error('not implemented')
}

close(): void {
// do nothing
}
}

const { tusUseFileVersionSeparator } = getConfig()
Expand Down
4 changes: 4 additions & 0 deletions src/storage/backend/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ export class FileBackend implements StorageBackendAdapter {
])
}

close() {
// no-op
}

protected async getFileMetadata(file: string) {
const platform = process.platform == 'darwin' ? 'darwin' : 'linux'
const [cacheControl, contentType] = await Promise.all([
Expand Down
Loading

0 comments on commit d4a0cd8

Please sign in to comment.