Skip to content

NATS streaming server strategy and client for nestjs microservice based on the stan.js package

Notifications You must be signed in to change notification settings

nestjs-ex/stan-strategy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build

Description

NATS streaming server strategy and client module for Nest based on the stan.js package.

Installation

$ npm i --save @nestjs-ex/stan-strategy

Usage

Server

To use the STAN transporter, pass the following options object to the createMicroservice() method:

import { NestFactory } from '@nestjs/core';
import { StanStrategy } from '@nestjs-ex/stan-strategy';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(
    AppModule,
    {
      strategy: new StanStrategy({
        url: 'nats://localhost:4222',
        group: 'example-group', // [DEPRECATED] Please use decorator @StanMessagePattern or @StanEventPattern
        clusterId: 'example',
        clientId: 'example-server',
        name: 'example-server',
        subscribe: { // [DEPRECATED][optional] Please use `defaultSubscriptionOptions`
          durableName: 'durable', // [optional] the real name is <durableName>-<subject>
          deliverAllAvailable: false, // [optional]
          maxInFlight: 100, // [optional]
          ackWait: 60 * 1000, // [optional] in millis
          startPosition: 0, // [optional] (0 mean new only)
          startSequence: 22, // [optional]
          startTime: new Date(2016, 7, 8), // [optional]
          manualAcks: false // [optional]
        },
        defaultSubscriptionOptions: { // [optional] the same as subscribe
          durableName: 'durable', // [optional] the real name is <durableName>-<subject>
          deliverAllAvailable: false, // [optional]
          maxInFlight: 100, // [optional]
          ackWait: 60 * 1000, // [optional] in millis
          startPosition: 0, // [optional] (0 mean new only)
          startSequence: 22, // [optional]
          startTime: new Date(2016, 7, 8), // [optional]
          manualAcks: false // [optional]
        },
        serializer: Serializer,
        deserializer: Deserializer
      })
    },
  );
  app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

Request-response

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { StanMessagePattern } from '@nestjs-ex/stan-strategy';

@Controller()
export class MathController {
  @MessagePattern('math.sum')
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }

  @MessagePattern({
    subject: 'math.sum',
    qGroup: 'math-group',
    opts: { // See `defaultSubscriptionOptions`
      durableName: 'durable',
    }
  })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }

  @StanMessagePattern('math.sum', { durableName: 'durable' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }

  @StanMessagePattern('math.sum', 'math-group', { durableName: 'durable' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }
}

Event-based

@EventPattern('user.user_created')
async handleUserCreated(data: Record<string, unknown>) {
  // business logic
}

@EventPattern({
  subject: 'user.user_created',
  qGroup: 'user-group',
  opts: {
    manualAcks: false
  }
})
async handleUserCreated(data: Record<string, unknown>) {
  // business logic
}

@StanEventPattern('user.user_created', 'user-group', { manualAcks: false })
async handleUserCreated(data: Record<string, unknown>) {
  // business logic
}

Client

To create a client instance with the StanClientModule, import it and use the register() method to pass an options object with the same properties shown above in the createMicroservice() method.

@Module({
  imports: [
    StanClientModule.register({
      url: 'nats://localhost:4222',
      clusterId: 'example',
      clientId: 'example-client',
      name: 'example-client'
    }),
  ]
  ...
})

Once the module has been imported, we can inject an instance of the StanClient shown above

constructor(
  private client: StanClient
) {}

Quite often you might want to asynchronously pass your module options instead of passing them beforehand. In such case, use registerAsync() method, that provides a couple of various ways to deal with async data.

1. Use factory

StanClientModule.registerAsync({
  useFactory: () => ({
    url: 'nats://localhost:4222',
    clusterId: 'example',
    clientId: 'example-client',
    name: 'example-client'
  })
});

Obviously, our factory behaves like every other one (might be async and is able to inject dependencies through inject).

StanClientModule.registerAsync({
  imports: [ConfigModule],
  useFactory: async (configService: ConfigService) => ({
    url: configService.getString('STAN_URL'),
    clusterId: configService.getString('STAN_CLUSER_ID'),
    clientId: configService.getString('STAN_CLIENT_ID'),
    name: configService.getString('STAN_NAME')
  }),
  inject: [ConfigService],
}),

2. Use class

StanClientModule.registerAsync({
  useClass: StanClientConfigService
});

Above construction will instantiate JwtConfigService inside JwtModule and will leverage it to create options object.

class StanClientConfigService implements StanClientOptionsFactory {
  createStanClientOptions(): StanClientModuleOptions {
    return {
      url: 'nats://localhost:4222',
      clusterId: 'example',
      clientId: 'example-client',
      name: 'example-client'
    };
  }
}

3. Use existing

StanClientModule.registerAsync({
  imports: [ConfigModule],
  useExisting: ConfigService,
}),

It works the same as useClass with one critical difference - StanClientModule will lookup imported modules to reuse already created ConfigService, instead of instantiating it on its own.

Sending messages

accumulate(): Observable<number> {
  const payload = [1, 2, 3];
  return this.client.send<number>('math.sum', payload);
}

or

accumulate(): Observable<number> {
  const pattern = { subject: 'math.sum', qGroup: 'math-group' };
  const payload = [1, 2, 3];
  return this.client.send<number>(pattern, payload);
}

Notes

The package's major version is following NestJS. The package version is 8.x.y will correspond to NestJs 8.a.b.

Stay in touch

License

Nest is MIT licensed.