Section 15: Connecting to NATS in a Node JS World
Wow, this is a lot of boilerplate to publish/receive a message!
Let's try to refactor this to make it much easier to publish/receive
We'll write out an initial implementation in this test project, then move it to our common module
⬆ back to top
The Listener Abstract Class
abstract class Listener {
abstract subject : string ;
abstract queueGroupName : string ;
abstract onMessage ( data : any , msg : Message ) : void ;
private client : Stan ;
protected ackWait = 5 * 1000 ;
constructor ( client : Stan ) {
this . client = client ;
}
subscriptionOptions ( ) {
return this . client
. subscriptionOptions ( )
. setDeliverAllAvailable ( )
. setManualAckMode ( true )
. setAckWait ( this . ackWait )
. setDurableName ( this . queueGroupName ) ;
}
listen ( ) {
const subscription = this . client . subscribe (
this . subject ,
this . queueGroupName ,
this . subscriptionOptions ( )
)
subscription . on ( 'message' , ( msg : Message ) => {
console . log ( `Message received: ${ this . subject } / ${ this . queueGroupName } ` ) ;
const parsedData = this . parseMessage ( msg ) ;
this . onMessage ( parsedData , msg ) ;
} )
}
parseMessage ( msg : Message ) {
const data = msg . getData ( ) ;
return typeof data === 'string'
? JSON . parse ( data )
: JSON . parse ( data . toString ( 'utf8' ) )
}
}
⬆ back to top
class TicketCreatedListener extends Listener {
subject = 'ticket:created' ;
queueGroupName = 'payments-service' ;
onMessage ( data : any , msg : Message ) {
console . log ( 'Event data!' , data ) ;
msg . ack ( ) ;
}
}
⬆ back to top
// listener.ts
import nats from 'node-nats-streaming' ;
import { randomBytes } from 'crypto' ;
import { TicketCreatedListener } from './events/ticket-created-listener' ;
console . clear ( ) ;
const stan = nats . connect ( 'ticketing' , randomBytes ( 4 ) . toString ( 'hex' ) , {
url : 'http://localhost:4222' ,
} ) ;
stan . on ( 'connect' , ( ) => {
console . log ( 'Listener connected to NATS' ) ;
stan . on ( 'close' , ( ) => {
console . log ( 'NATS connection closed!' ) ;
process . exit ( ) ;
} ) ;
new TicketCreatedListener ( stan ) . listen ( ) ;
} ) ;
process . on ( 'SIGINT' , ( ) => stan . close ( ) ) ;
process . on ( 'SIGTERM' , ( ) => stan . close ( ) ) ;
⬆ back to top
Leveraging TypeScript for Listener Validation
⬆ back to top
export enum Subjects {
TicketCreated = 'ticket:created' ,
OrderUpdated = 'order:updated' ,
}
⬆ back to top
// ticket-created-event.ts
import { Subjects } from "./subjects" ;
export interface TicketCreatedEvent {
subject : Subjects . TicketCreated ;
data : {
id : string ;
title : string ;
price : number ;
} ;
}
⬆ back to top
Enforcing Listener Subjects
// base-listener.ts
import { Subjects } from './subjects' ;
interface Event {
subject : Subjects ;
data : any ;
}
export abstract class Listener < T extends Event > {
abstract subject : T [ 'subject' ] ;
abstract onMessage ( data : T [ 'data' ] , msg : Message ) : void ;
}
// ticket-created-listener.ts
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects' ;
export class TicketCreatedListener extends Listener < TicketCreatedEvent > {
subject : Subjects . TicketCreated = Subjects . TicketCreated ;
...
}
⬆ back to top
Quick Note: 'readonly' in Typescript
export class TicketCreatedListener extends Listener < TicketCreatedEvent > {
readonly subject = Subjects . TicketCreated ;
// ...everything else
}
⬆ back to top
// ticket-created-listener.ts
export class TicketCreatedListener extends Listener < TicketCreatedEvent > {
onMessage ( data : TicketCreatedEvent [ 'data' ] , msg : Message ) {
console . log ( 'Event data!' , data ) ;
console . log ( data . id ) ;
console . log ( data . title ) ;
console . log ( data . price ) ;
msg . ack ( ) ;
}
}
⬆ back to top
Where Does this Get Used?
⬆ back to top
// base-publisher.ts
import { Stan } from 'node-nats-streaming' ;
import { Subjects } from './subjects' ;
interface Event {
subject : Subjects ;
data : any ;
}
export abstract class Publisher < T extends Event > {
abstract subject : T [ 'subject' ] ;
private client : Stan ;
constructor ( client : Stan ) {
this . client = client ;
}
publish ( data : T [ 'data' ] ) {
this . client . publish ( this . subject , JSON . stringify ( data ) , ( ) => {
console . log ( 'Event published.' )
} )
}
}
// ticket-created-publisher.ts
import { Publisher } from './base-publisher' ;
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects' ;
export class TicketCreatedPublisher extends Publisher < TicketCreatedEvent > {
readonly subject = Subjects . TicketCreated ;
}
⬆ back to top
Using the Custom Publisher
// publisher.ts
stan . on ( 'connect' , ( ) => {
console . log ( 'Publisher connected to NATS' ) ;
const publisher = new TicketCreatedPublisher ( stan ) ;
publisher . publish ( {
id : '123' ,
title : 'concert' ,
price : 20
} ) ;
} ) ;
⬆ back to top
Awaiting Event Publication
// base-publisher.ts
publish ( data : T [ 'data' ] ) : Promise < void > {
return new Promise ( ( resolve , reject ) => {
this . client . publish ( this . subject , JSON . stringify ( data ) , ( err ) => {
if ( err ) {
return reject ( err ) ;
}
console . log ( 'Event published to subject' , this . subject ) ;
resolve ( ) ;
} ) ;
} ) ;
}
// publisher.ts
stan . on ( 'connect' , async ( ) => {
console . log ( 'Publisher connected to NATS' ) ;
const publisher = new TicketCreatedPublisher ( stan ) ;
try {
await publisher . publish ( {
id : '123' ,
title : 'concert' ,
price : 20
} ) ;
}
catch ( err ) {
console . error ( err ) ;
}
} ) ;
⬆ back to top
Common Event Definitions Summary
⬆ back to top
Updating the Common Module
base-listener.ts
base-publisher.ts
subjects.ts
ticket-created-event.ts
ticket-updated-event.ts
⬆ back to top
kubectl get pods
kubectl delete pod nats-depl-786b8cff8d-xd4tn
⬆ back to top