Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/nestjs connectrpc #328

Merged
merged 11 commits into from
Nov 5, 2024
317 changes: 316 additions & 1 deletion .pnp.cjs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/nestjs-connectrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ConnectRpc migration @wolfcoded/nestjs-bufconnect
53 changes: 53 additions & 0 deletions packages/nestjs-connectrpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"name": "@atls/nestjs-connectrpc",
"version": "0.0.0",
Nelfimov marked this conversation as resolved.
Show resolved Hide resolved
"license": "BSD-3-Clause",
"type": "module",
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts"
},
"main": "src/index.ts",
"files": [
"dist"
],
"scripts": {
"build": "yarn library build",
"prepack": "yarn run build",
"postpack": "rm -rf dist"
},
"devDependencies": {
"@bufbuild/protobuf": "1.10.0",
"@connectrpc/connect": "1.6.1",
"@connectrpc/connect-node": "1.6.1",
"@nestjs/common": "10.0.5",
"@nestjs/core": "10.0.5",
"@nestjs/microservices": "10.2.4",
"@nestjs/platform-express": "10.2.4",
"reflect-metadata": "0.2.2",
"rxjs": "7.8.1"
},
"peerDependencies": {
"@bufbuild/protobuf": "^1",
"@connectrpc/connect": "^1",
"@connectrpc/connect-node": "^1",
"@nestjs/common": "^10",
"@nestjs/core": "^10",
"@nestjs/microservices": "^10",
"@nestjs/platform-express": "^10",
"reflect-metadata": "^0.2",
"rxjs": "^7"
},
"publishConfig": {
"exports": {
"./package.json": "./package.json",
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"main": "dist/index.js",
"typings": "dist/index.d.ts"
}
}
5 changes: 5 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const METHOD_DECORATOR_KEY = Symbol('METHOD_DECORATOR_KEY')

export const STREAM_METHOD_DECORATOR_KEY = Symbol('STREAM_METHOD_DECORATOR_KEY')

export const CONNECTRPC_TRANSPORT = Symbol('CONNECTRPC_TRANSPORT')
97 changes: 97 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.decorators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type { ServiceType } from '@bufbuild/protobuf'

import type { ConstructorWithPrototype } from './connectrpc.interfaces.js'
import type { FunctionPropertyDescriptor } from './connectrpc.interfaces.js'
import type { MethodKey } from './connectrpc.interfaces.js'
import type { MethodKeys } from './connectrpc.interfaces.js'

import { MessagePattern } from '@nestjs/microservices'

import { CONNECTRPC_TRANSPORT } from './connectrpc.constants.js'
import { METHOD_DECORATOR_KEY } from './connectrpc.constants.js'
import { STREAM_METHOD_DECORATOR_KEY } from './connectrpc.constants.js'
import { MethodType } from './connectrpc.interfaces.js'
import { CustomMetadataStore } from './custom-metadata.storage.js'
import { createConnectRpcMethodMetadata } from './utils/router.utils.js'

/**
* Type guard to check if a given descriptor is a function property descriptor.
* @param {PropertyDescriptor | undefined} descriptor - The descriptor to check.
* @returns {descriptor is FunctionPropertyDescriptor} - True if descriptor is for a function.
*/
function isFunctionPropertyDescriptor(
descriptor: PropertyDescriptor | undefined
): descriptor is FunctionPropertyDescriptor {
return descriptor !== undefined && typeof descriptor.value === 'function'
}

/**
* ConnectRpcService decorator to register RPC services and their handlers.
* @param {ServiceType} serviceName - The service type from protobuf.
* @returns {ClassDecorator} - Class decorator function.
*/
export const ConnectRpcService = (serviceName: ServiceType): ClassDecorator =>
(target: ConstructorWithPrototype): void => {
// Получаем все зарегистрированные методы и объединяем их
const unaryMethodKeys: MethodKeys = Reflect.getMetadata(METHOD_DECORATOR_KEY, target) || []
Nelfimov marked this conversation as resolved.
Show resolved Hide resolved
const streamMethodKeys: MethodKeys =
Reflect.getMetadata(STREAM_METHOD_DECORATOR_KEY, target) || []
const allMethodKeys = [...unaryMethodKeys, ...streamMethodKeys] as Array<MethodKey>

allMethodKeys.forEach((methodImpl) => {
const { key: functionName, methodType } = methodImpl
const descriptor = Object.getOwnPropertyDescriptor(target.prototype, functionName)

if (!descriptor || !isFunctionPropertyDescriptor(descriptor)) return

const metadata = createConnectRpcMethodMetadata(
descriptor.value,
functionName,
serviceName.typeName,
functionName,
methodType
)

CustomMetadataStore.getInstance().set(serviceName.typeName, serviceName)
MessagePattern(metadata, CONNECTRPC_TRANSPORT)(target.prototype, functionName, descriptor)
})
}

export const AddMethodMetadata = (
target: object,
key: string | symbol,
methodType: MethodType,
metadataKey: string | symbol
): void => {
const metadata: MethodKey = {
key: key.toString(),
methodType,
}

const existingMethods =
(Reflect.getMetadata(metadataKey, target.constructor) as Set<MethodKey>) || new Set()

if (existingMethods.has(metadata)) return

existingMethods.add(metadata)
Reflect.defineMetadata(metadataKey, existingMethods, target.constructor)
}

/**
* Decorator for unary RPC methods.
* Registers the method as a unary RPC with no streaming.
* @returns {MethodDecorator} - Method decorator function.
*/
export const ConnectRpcMethod = (): MethodDecorator => (target: object, key: string | symbol) => {
AddMethodMetadata(target, key, MethodType.NO_STREAMING, METHOD_DECORATOR_KEY)
}

/**
* Decorator for streaming RPC methods.
* Registers the method as a streaming RPC with RX_STREAMING type.
* @returns {MethodDecorator} - Method decorator function.
*/
export const ConnectRpcStreamMethod = (): MethodDecorator =>
(target: object, key: string | symbol) => {
AddMethodMetadata(target, key, MethodType.RX_STREAMING, STREAM_METHOD_DECORATOR_KEY)
}
74 changes: 74 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import type * as http from 'http'
import type * as http2 from 'http2'
import type * as https from 'https'
import type { ConnectRouterOptions } from '@connectrpc/connect'
import type { Observable } from 'rxjs'

export interface ConnectRpcPattern {
service: string
rpc: string
streaming: MethodType
}

export enum MethodType {
NO_STREAMING = 'no_stream',
RX_STREAMING = 'rx_stream',
}

export enum ServerProtocol {
HTTP = 'http',
HTTPS = 'https',
HTTP2 = 'http2',
HTTP2_INSECURE = 'http2_insecure',
}

export interface BaseServerOptions {
port: number
connectOptions?: ConnectRouterOptions
callback?: () => void
}

export interface HttpOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTP
serverOptions?: http.ServerOptions
}

export interface HttpsOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTPS
serverOptions: https.ServerOptions
}

export interface Http2Options extends BaseServerOptions {
protocol: ServerProtocol.HTTP2
serverOptions: http2.SecureServerOptions
}

export interface Http2InsecureOptions extends BaseServerOptions {
protocol: ServerProtocol.HTTP2_INSECURE
serverOptions?: http2.ServerOptions
}

export type ServerTypeOptions = Http2InsecureOptions | Http2Options | HttpOptions | HttpsOptions

export type ServerInstance = http.Server | http2.Http2Server | https.Server | null

export interface ConstructorWithPrototype {
prototype: Record<string, PropertyDescriptor>
}

export interface MethodKey {
key: string
methodType: MethodType
}

export type MethodKeys = Array<MethodKey>

export interface FunctionPropertyDescriptor extends PropertyDescriptor {
value: (...arguments_: Array<never>) => never
}

export type ResultOrDeferred<T> =
| Observable<T>
| T
| { subscribe: () => void }
| { toPromise: () => Promise<T> }
122 changes: 122 additions & 0 deletions packages/nestjs-connectrpc/src/connectrpc.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import type { ConnectRouter } from '@connectrpc/connect'

import type { Http2InsecureOptions } from './connectrpc.interfaces.js'
import type { Http2Options } from './connectrpc.interfaces.js'
import type { ServerTypeOptions } from './connectrpc.interfaces.js'
import type { HttpsOptions } from './connectrpc.interfaces.js'
import type { HttpOptions } from './connectrpc.interfaces.js'
import type { ServerInstance } from './connectrpc.interfaces.js'

import { connectNodeAdapter } from '@connectrpc/connect-node'
import * as http from 'http'
import * as http2 from 'http2'
import * as https from 'https'

import { ServerProtocol } from './connectrpc.interfaces.js'

export class HTTPServer {
private serverPrivate: ServerInstance = null

constructor(
private readonly options: ServerTypeOptions,
private readonly router: (router: ConnectRouter) => void
) {}

set server(value: http.Server | http2.Http2Server | https.Server | null) {
this.serverPrivate = value
}

get server(): http.Server | http2.Http2Server | https.Server | null {
return this.serverPrivate
}

async listen(): Promise<void> {
await this.startServer()
}

createHttpServer(): http.Server {
const { serverOptions = {}, connectOptions = {} } = this.options as HttpOptions

return http.createServer(
serverOptions,
connectNodeAdapter({
...connectOptions,
routes: this.router,
})
)
}

createHttpsServer(): https.Server {
const { serverOptions = {}, connectOptions = {} } = this.options as HttpsOptions

return https.createServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

createHttp2Server(): http2.Http2Server {
const { serverOptions = {}, connectOptions = {} } = this.options as Http2Options

return http2.createSecureServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

createHttp2InsecureServer(): http2.Http2Server {
const { serverOptions = {}, connectOptions = {} } = this.options as Http2InsecureOptions

return http2.createServer(
serverOptions,
connectNodeAdapter({ ...connectOptions, routes: this.router })
)
}

// eslint-disable-next-line @typescript-eslint/promise-function-async
startServer(): Promise<void> {
return new Promise((resolve, reject) => {
switch (this.options.protocol) {
case ServerProtocol.HTTP: {
this.server = this.createHttpServer()
break
}
case ServerProtocol.HTTPS: {
this.server = this.createHttpsServer()
break
}
case ServerProtocol.HTTP2: {
this.server = this.createHttp2Server()
break
}
case ServerProtocol.HTTP2_INSECURE: {
this.server = this.createHttp2InsecureServer()
break
}
default: {
reject(new Error('Invalid protocol option'))
return
}
}

this.server.listen(this.options.port, () => {
if (this.options.callback) this.options.callback()
resolve()
})
})
}

async close(callback?: () => void): Promise<void> {
Nelfimov marked this conversation as resolved.
Show resolved Hide resolved
return new Promise((resolve, reject) => {
if (this.server === null) {
reject(new Error('Server is not running'))
} else {
this.server.close(() => {
this.server = null
if (callback) callback()
resolve()
})
}
})
}
}
Loading