Skip to content

Commit

Permalink
Feat/nestjs connectrpc (#328)
Browse files Browse the repository at this point in the history
* feat(nestjs-connectrpc): init

* refactor(nestjs-connectrpc): utils

* refactor(nestjs-connectrpc): decorators

* style: fix lint

* refactor: reduce nesting to improve code readability

* refactor: add utility function AddMethodMetadata

* refactor: method startServer return promise

* perf: added a return to avoid calling the set twice

* style: remove async from startServer method

* chore: dependency versions are fixed

---------

Co-authored-by: OsirisAnubis <olegshulgakalendar@gmail.com>
  • Loading branch information
TorinAsakura and OsirisAnubiz authored Nov 5, 2024
1 parent 3f4cb0b commit ce39ecb
Show file tree
Hide file tree
Showing 13 changed files with 1,226 additions and 1 deletion.
317 changes: 316 additions & 1 deletion .pnp.cjs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/nestjs-connectrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ConnectRpc migration @wolfcoded/nestjs-bufconnect
53 changes: 53 additions & 0 deletions packages/nestjs-connectrpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"name": "@atls/nestjs-connectrpc",
"version": "0.0.0",
"license": "BSD-3-Clause",
"type": "module",
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts"
},
"main": "src/index.ts",
"files": [
"dist"
],
"scripts": {
"build": "yarn library build",
"prepack": "yarn run build",
"postpack": "rm -rf dist"
},
"devDependencies": {
"@bufbuild/protobuf": "1.10.0",
"@connectrpc/connect": "1.6.1",
"@connectrpc/connect-node": "1.6.1",
"@nestjs/common": "10.0.5",
"@nestjs/core": "10.0.5",
"@nestjs/microservices": "10.2.4",
"@nestjs/platform-express": "10.2.4",
"reflect-metadata": "0.2.2",
"rxjs": "7.8.1"
},
"peerDependencies": {
"@bufbuild/protobuf": "^1",
"@connectrpc/connect": "^1",
"@connectrpc/connect-node": "^1",
"@nestjs/common": "^10",
"@nestjs/core": "^10",
"@nestjs/microservices": "^10",
"@nestjs/platform-express": "^10",
"reflect-metadata": "^0.2",
"rxjs": "^7"
},
"publishConfig": {
"exports": {
"./package.json": "./package.json",
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"main": "dist/index.js",
"typings": "dist/index.d.ts"
}
}
5 changes: 5 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const METHOD_DECORATOR_KEY = Symbol('METHOD_DECORATOR_KEY')

export const STREAM_METHOD_DECORATOR_KEY = Symbol('STREAM_METHOD_DECORATOR_KEY')

export const CONNECTRPC_TRANSPORT = Symbol('CONNECTRPC_TRANSPORT')
97 changes: 97 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.decorators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type { ServiceType } from '@bufbuild/protobuf'

import type { ConstructorWithPrototype } from './connectrpc.interfaces.js'
import type { FunctionPropertyDescriptor } from './connectrpc.interfaces.js'
import type { MethodKey } from './connectrpc.interfaces.js'
import type { MethodKeys } from './connectrpc.interfaces.js'

import { MessagePattern } from '@nestjs/microservices'

import { CONNECTRPC_TRANSPORT } from './connectrpc.constants.js'
import { METHOD_DECORATOR_KEY } from './connectrpc.constants.js'
import { STREAM_METHOD_DECORATOR_KEY } from './connectrpc.constants.js'
import { MethodType } from './connectrpc.interfaces.js'
import { CustomMetadataStore } from './custom-metadata.storage.js'
import { createConnectRpcMethodMetadata } from './utils/router.utils.js'

/**
* Type guard to check if a given descriptor is a function property descriptor.
* @param {PropertyDescriptor | undefined} descriptor - The descriptor to check.
* @returns {descriptor is FunctionPropertyDescriptor} - True if descriptor is for a function.
*/
function isFunctionPropertyDescriptor(
descriptor: PropertyDescriptor | undefined
): descriptor is FunctionPropertyDescriptor {
return descriptor !== undefined && typeof descriptor.value === 'function'
}

/**
* ConnectRpcService decorator to register RPC services and their handlers.
* @param {ServiceType} serviceName - The service type from protobuf.
* @returns {ClassDecorator} - Class decorator function.
*/
export const ConnectRpcService = (serviceName: ServiceType): ClassDecorator =>
(target: ConstructorWithPrototype): void => {
// Получаем все зарегистрированные методы и объединяем их
const unaryMethodKeys: MethodKeys = Reflect.getMetadata(METHOD_DECORATOR_KEY, target) || []
const streamMethodKeys: MethodKeys =
Reflect.getMetadata(STREAM_METHOD_DECORATOR_KEY, target) || []
const allMethodKeys = [...unaryMethodKeys, ...streamMethodKeys] as Array<MethodKey>

allMethodKeys.forEach((methodImpl) => {
const { key: functionName, methodType } = methodImpl
const descriptor = Object.getOwnPropertyDescriptor(target.prototype, functionName)

if (!descriptor || !isFunctionPropertyDescriptor(descriptor)) return

const metadata = createConnectRpcMethodMetadata(
descriptor.value,
functionName,
serviceName.typeName,
functionName,
methodType
)

CustomMetadataStore.getInstance().set(serviceName.typeName, serviceName)
MessagePattern(metadata, CONNECTRPC_TRANSPORT)(target.prototype, functionName, descriptor)
})
}

export const AddMethodMetadata = (
target: object,
key: string | symbol,
methodType: MethodType,
metadataKey: string | symbol
): void => {
const metadata: MethodKey = {
key: key.toString(),
methodType,
}

const existingMethods =
(Reflect.getMetadata(metadataKey, target.constructor) as Set<MethodKey>) || new Set()

if (existingMethods.has(metadata)) return

existingMethods.add(metadata)
Reflect.defineMetadata(metadataKey, existingMethods, target.constructor)
}

/**
* Decorator for unary RPC methods.
* Registers the method as a unary RPC with no streaming.
* @returns {MethodDecorator} - Method decorator function.
*/
export const ConnectRpcMethod = (): MethodDecorator => (target: object, key: string | symbol) => {
AddMethodMetadata(target, key, MethodType.NO_STREAMING, METHOD_DECORATOR_KEY)
}

/**
* Decorator for streaming RPC methods.
* Registers the method as a streaming RPC with RX_STREAMING type.
* @returns {MethodDecorator} - Method decorator function.
*/
export const ConnectRpcStreamMethod = (): MethodDecorator =>
(target: object, key: string | symbol) => {
AddMethodMetadata(target, key, MethodType.RX_STREAMING, STREAM_METHOD_DECORATOR_KEY)
}
74 changes: 74 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import type * as http from 'http'
import type * as http2 from 'http2'
import type * as https from 'https'
import type { ConnectRouterOptions } from '@connectrpc/connect'
import type { Observable } from 'rxjs'

export interface ConnectRpcPattern {
service: string
rpc: string
streaming: MethodType
}

export enum MethodType {
NO_STREAMING = 'no_stream',
RX_STREAMING = 'rx_stream',
}

export enum ServerProtocol {
HTTP = 'http',
HTTPS = 'https',
HTTP2 = 'http2',
HTTP2_INSECURE = 'http2_insecure',
}

export interface BaseServerOptions {
port: number
connectOptions?: ConnectRouterOptions
callback?: () => void
}

export interface HttpOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTP
serverOptions?: http.ServerOptions
}

export interface HttpsOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTPS
serverOptions: https.ServerOptions
}

export interface Http2Options extends BaseServerOptions {
protocol: ServerProtocol.HTTP2
serverOptions: http2.SecureServerOptions
}

export interface Http2InsecureOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTP2_INSECURE
serverOptions?: http2.ServerOptions
}

export type ServerTypeOptions = Http2InsecureOptions | Http2Options | HttpOptions | HttpsOptions

export type ServerInstance = http.Server | http2.Http2Server | https.Server | null

export interface ConstructorWithPrototype {
prototype: Record<string, PropertyDescriptor>
}

export interface MethodKey {
key: string
methodType: MethodType
}

export type MethodKeys = Array<MethodKey>

export interface FunctionPropertyDescriptor extends PropertyDescriptor {
value: (...arguments_: Array<never>) => never
}

export type ResultOrDeferred<T> =
| Observable<T>
| T
| { subscribe: () => void }
| { toPromise: () => Promise<T> }
122 changes: 122 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import type { ConnectRouter } from '@connectrpc/connect'

import type { Http2InsecureOptions } from './connectrpc.interfaces.js'
import type { Http2Options } from './connectrpc.interfaces.js'
import type { ServerTypeOptions } from './connectrpc.interfaces.js'
import type { HttpsOptions } from './connectrpc.interfaces.js'
import type { HttpOptions } from './connectrpc.interfaces.js'
import type { ServerInstance } from './connectrpc.interfaces.js'

import { connectNodeAdapter } from '@connectrpc/connect-node'
import * as http from 'http'
import * as http2 from 'http2'
import * as https from 'https'

import { ServerProtocol } from './connectrpc.interfaces.js'

export class HTTPServer {
private serverPrivate: ServerInstance = null

constructor(
private readonly options: ServerTypeOptions,
private readonly router: (router: ConnectRouter) => void
) {}

set server(value: http.Server | http2.Http2Server | https.Server | null) {
this.serverPrivate = value
}

get server(): http.Server | http2.Http2Server | https.Server | null {
return this.serverPrivate
}

async listen(): Promise<void> {
await this.startServer()
}

createHttpServer(): http.Server {
const { serverOptions = {}, connectOptions = {} } = this.options as HttpOptions

return http.createServer(
serverOptions,
connectNodeAdapter({
...connectOptions,
routes: this.router,
})
)
}

createHttpsServer(): https.Server {
const { serverOptions = {}, connectOptions = {} } = this.options as HttpsOptions

return https.createServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

createHttp2Server(): http2.Http2Server {
const { serverOptions = {}, connectOptions = {} } = this.options as Http2Options

return http2.createSecureServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

createHttp2InsecureServer(): http2.Http2Server {
const { serverOptions = {}, connectOptions = {} } = this.options as Http2InsecureOptions

return http2.createServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

// eslint-disable-next-line @typescript-eslint/promise-function-async
startServer(): Promise<void> {
return new Promise((resolve, reject) => {
switch (this.options.protocol) {
case ServerProtocol.HTTP: {
this.server = this.createHttpServer()
break
}
case ServerProtocol.HTTPS: {
this.server = this.createHttpsServer()
break
}
case ServerProtocol.HTTP2: {
this.server = this.createHttp2Server()
break
}
case ServerProtocol.HTTP2_INSECURE: {
this.server = this.createHttp2InsecureServer()
break
}
default: {
reject(new Error('Invalid protocol option'))
return
}
}

this.server.listen(this.options.port, () => {
if (this.options.callback) this.options.callback()
resolve()
})
})
}

async close(callback?: () => void): Promise<void> {
return new Promise((resolve, reject) => {
if (this.server === null) {
reject(new Error('Server is not running'))
} else {
this.server.close(() => {
this.server = null
if (callback) callback()
resolve()
})
}
})
}
}
Loading

0 comments on commit ce39ecb

Please sign in to comment.