Skip to content

Commit

Permalink
fix(idea-backend): genesis request between services (#1513)
Browse files Browse the repository at this point in the history
  • Loading branch information
osipov-mit authored Mar 19, 2024
1 parent 3493886 commit 7e335b5
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 38 deletions.
13 changes: 6 additions & 7 deletions idea/api-gateway/src/rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class RMQService {
}

this.mainChannel = await this.connection.createChannel();
await this.mainChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic', { durable: true });
await this.mainChannel.assertExchange(RMQExchange.GENESISES, 'fanout', { durable: true });
await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true });
await this.mainChannel.assertQueue(RMQQueue.REPLIES, {
durable: true,
Expand All @@ -40,14 +40,14 @@ export class RMQService {

await this.mainChannel.bindQueue(RMQQueue.REPLIES, RMQExchange.DIRECT_EX, RMQQueue.REPLIES);

await this.mainChannel.assertQueue(RMQQueue.GENESISES, {
await this.mainChannel.assertQueue(RMQQueue.GENESIS, {
durable: true,
exclusive: false,
autoDelete: false,
messageTtl: 30_000,
});

await this.mainChannel.bindQueue(RMQQueue.GENESISES, RMQExchange.DIRECT_EX, RMQQueue.GENESISES);
await this.mainChannel.bindQueue(RMQQueue.GENESIS, RMQExchange.DIRECT_EX, RMQQueue.GENESIS);

this.metaChannel = await this.connection.createChannel();
this.metaChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true });
Expand Down Expand Up @@ -78,14 +78,14 @@ export class RMQService {

private async subscribeToGenesises() {
await this.mainChannel.consume(
RMQQueue.GENESISES,
RMQQueue.GENESIS,
async (message) => {
if (!message) {
return;
}

const { genesis, service, action } = JSON.parse(message.content.toString());
logger.info(RMQQueue.GENESISES, { genesis, service, action });
logger.info(RMQQueue.GENESIS, { genesis, service, action });

switch (service) {
case RMQServices.INDEXER: {
Expand Down Expand Up @@ -177,8 +177,7 @@ export class RMQService {
}

public requestActiveGenesises() {
this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchange.GENESISES, '', Buffer.from(''));
logger.info(`Genesises request sent`);
}

Expand Down
5 changes: 3 additions & 2 deletions idea/common/src/enums/rmq.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
export enum RMQQueue {
GENESISES = 'genesises',
GENESISES_REQUEST = 'genesises.request',
GENESIS = 'genesis',
REPLIES = 'replies',
}

export enum RMQExchange {
DIRECT_EX = 'direct.ex',
TOPIC_EX = 'topic.ex',
INDXR_META = 'indxr.meta.ex',
GENESISES = 'genesises',
}

export enum RMQServices {
Expand Down
17 changes: 8 additions & 9 deletions idea/indexer/src/rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class RMQService {
this.mainChannel = await this.connection.createChannel();

await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct');
await this.mainChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic');
await this.mainChannel.assertExchange(RMQExchange.GENESISES, 'fanout', { durable: true });
await this.mainChannel.assertExchange(RMQExchange.INDXR_META, 'fanout', { autoDelete: true });

await this.mainChannel.assertQueue('', {
Expand Down Expand Up @@ -162,19 +162,18 @@ export class RMQService {
}

private async genesisesQSetup(): Promise<void> {
const qName = `${RMQServices.INDEXER}.${RMQQueue.GENESISES}`;
const qName = RMQQueue.GENESISES_REQUEST;

await this.mainChannel.assertQueue(qName, {
durable: false,
exclusive: false,
await this.mainChannel.assertQueue('', {
exclusive: true,
autoDelete: true,
});

await this.mainChannel.bindQueue(qName, RMQExchange.TOPIC_EX, qName);
await this.mainChannel.bindQueue('', RMQExchange.GENESISES, '');

try {
await this.mainChannel.consume(
qName,
'',
async (message) => {
if (!message) {
return;
Expand All @@ -200,7 +199,7 @@ export class RMQService {
genesis: this.genesis,
});
logger.info('Send genesis', { genesis: this.genesis, correlationId });
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff), {
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESIS, Buffer.from(messageBuff), {
headers: { correlationId },
});
}
Expand All @@ -212,7 +211,7 @@ export class RMQService {
action: RMQServiceAction.DELETE,
genesis: this.genesis,
});
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff));
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESIS, Buffer.from(messageBuff));
}

@FormResponse
Expand Down
39 changes: 19 additions & 20 deletions idea/test-balance/src/services/rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { GearService } from './gear';
export class RMQService {
private connection: Connection;
private mainChannel: Channel;
private topicChannel: Channel;

constructor(private transferService: TransferService, private gearService: GearService) {}

Expand All @@ -27,34 +26,23 @@ export class RMQService {
});

this.mainChannel = await this.connection.createChannel();
this.topicChannel = await this.connection.createChannel();

const genesis = this.gearService.genesisHash;

const routingKey = `${RMQServices.TEST_BALANCE}.${genesis}`;

await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct');
await this.mainChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic');
await this.mainChannel.assertExchange(RMQExchange.GENESISES, 'fanout');

const assertTopicQueue = await this.topicChannel.assertQueue(`${RMQServices.TEST_BALANCE}t.${genesis}`, {
durable: false,
autoDelete: true,
exclusive: false,
});
await this.mainChannel.assertQueue(routingKey, {
durable: false,
autoDelete: false,
exclusive: false,
});
await this.mainChannel.bindQueue(routingKey, RMQExchange.DIRECT_EX, routingKey);
await this.mainChannel.bindQueue(
assertTopicQueue.queue,
RMQExchange.TOPIC_EX,
`${RMQServices.TEST_BALANCE}.genesises`,
);

await this.directMessageConsumer(routingKey);
await this.topicMessageConsumer(assertTopicQueue.queue);
await this.genesisesQSetup();

this.sendGenesis(this.gearService.genesisHash);
}
Expand All @@ -80,37 +68,48 @@ export class RMQService {
}
}

async topicMessageConsumer(queue: string): Promise<void> {
private async genesisesQSetup(): Promise<void> {
const qName = RMQQueue.GENESISES_REQUEST;

await this.mainChannel.assertQueue('', {
exclusive: true,
autoDelete: true,
});

await this.mainChannel.bindQueue('', RMQExchange.GENESISES, '');

try {
await this.topicChannel.consume(
queue,
await this.mainChannel.consume(
'',
async (message) => {
if (!message) {
return;
}

logger.info('Genesis request');
if (this.gearService.genesisHash !== null) {
this.sendGenesis(this.gearService.genesisHash);
}
},
{ noAck: true },
);
} catch (error) {
logger.error(`Topic exchange consumer error`, { error: error.message });
logger.error('Topic exchange consumer error.', { error });
}
}

sendGenesis(genesis: string): void {
const correlationId = randomUUID();
const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis });
logger.info('Send genesis', { genesis, correlationId });
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff), {
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESIS, Buffer.from(messageBuff), {
headers: { correlationId },
});
}

sendDeleteGenesis(genesis: string): void {
const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.DELETE, genesis });
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff));
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESIS, Buffer.from(messageBuff));
}

sendReply(correlationId: string, params: any): void {
Expand Down

0 comments on commit 7e335b5

Please sign in to comment.