Skip to content

Commit

Permalink
Merge pull request #660 from energywebfoundation/queue_reaches_mem_limit
Browse files Browse the repository at this point in the history
Exceeded DID queue memory limit
  • Loading branch information
JGiter authored Aug 9, 2023
2 parents cf4cc6d + 4ff6281 commit dca0e34
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 6 deletions.
17 changes: 17 additions & 0 deletions docs/api/classes/modules_did_did_processor.DIDProcessor.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Methods

- [OnQueueWaiting](modules_did_did_processor.DIDProcessor.md#onqueuewaiting)
- [onActive](modules_did_did_processor.DIDProcessor.md#onactive)
- [onError](modules_did_did_processor.DIDProcessor.md#onerror)
- [onFailed](modules_did_did_processor.DIDProcessor.md#onfailed)
Expand All @@ -34,6 +35,22 @@

## Methods

### OnQueueWaiting

**OnQueueWaiting**(`job`): `Promise`<`void`\>

#### Parameters

| Name | Type |
| :------ | :------ |
| `job` | `Job`<`any`\> |

#### Returns

`Promise`<`void`\>

___

### onActive

**onActive**(`job`): `void`
Expand Down
17 changes: 17 additions & 0 deletions docs/api/classes/modules_ipfs_pin_processor.PinProcessor.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Methods

- [OnQueueWaiting](modules_ipfs_pin_processor.PinProcessor.md#onqueuewaiting)
- [onError](modules_ipfs_pin_processor.PinProcessor.md#onerror)
- [onFailed](modules_ipfs_pin_processor.PinProcessor.md#onfailed)
- [onStalled](modules_ipfs_pin_processor.PinProcessor.md#onstalled)
Expand All @@ -31,6 +32,22 @@

## Methods

### OnQueueWaiting

**OnQueueWaiting**(`job`): `Promise`<`void`\>

#### Parameters

| Name | Type |
| :------ | :------ |
| `job` | `Job`<`any`\> |

#### Returns

`Promise`<`void`\>

___

### onError

**onError**(`error`): `void`
Expand Down
6 changes: 6 additions & 0 deletions src/modules/did/did.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
OnQueueError,
OnQueueFailed,
OnQueueStalled,
OnQueueWaiting,
Process,
Processor,
} from '@nestjs/bull';
Expand Down Expand Up @@ -51,6 +52,11 @@ export class DIDProcessor {
this.logger.debug(`Failed ${job.name} document ${job.data}`);
}

@OnQueueWaiting()
async OnQueueWaiting(job: Job) {
this.logger.debug(`Waiting ${job.name} document ${job.data}`);
}

@Process(ADD_DID_DOC_JOB_NAME)
public async processDIDDocumentAddition(job: Job<string>) {
const doc = await this.didService.addCachedDocument(job.data);
Expand Down
17 changes: 15 additions & 2 deletions src/modules/did/did.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { Provider } from '../../common/provider';
import { SentryTracingService } from '../sentry/sentry-tracing.service';
import { isVerifiableCredential } from '@ew-did-registry/credentials-interface';
import { IPFSService } from '../ipfs/ipfs.service';
import { inspect } from 'util';

@Injectable()
export class DIDService implements OnModuleInit, OnModuleDestroy {
Expand Down Expand Up @@ -350,7 +351,7 @@ export class DIDService implements OnModuleInit, OnModuleDestroy {
// Only refreshing a DID that is already cached.
// Otherwise, cache could grow too large with DID Docs that aren't relevant to Switchboard
if (didDocEntity) {
await this.didQueue.add(UPDATE_DID_DOC_JOB_NAME, did);
await this.pinDocument(did);
}
});
}
Expand All @@ -359,7 +360,7 @@ export class DIDService implements OnModuleInit, OnModuleDestroy {
this.logger.debug(`Beginning sync of DID Documents`);
const cachedDIDs = await this.didRepository.find({ select: ['id'] });
cachedDIDs.forEach(async (did) => {
await this.didQueue.add(UPDATE_DID_DOC_JOB_NAME, did.id);
await this.pinDocument(did.id);
});
}

Expand Down Expand Up @@ -482,4 +483,16 @@ export class DIDService implements OnModuleInit, OnModuleDestroy {
})
);
}

private async pinDocument(did: string): Promise<void> {
try {
await this.didQueue.add(UPDATE_DID_DOC_JOB_NAME, did);
} catch (e) {
this.logger.warn(
`Error to add DID synchronization job for document ${did}: ${e}`
);
const jobsCounts = await this.didQueue.getJobCounts();
this.logger.debug(inspect(jobsCounts, { depth: 2, colors: true }));
}
}
}
15 changes: 11 additions & 4 deletions src/modules/ipfs/ipfs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { PIN_CLAIM_QUEUE_NAME, PIN_CLAIM_JOB_NAME } from './ipfs.types';
import { Logger } from '../logger/logger.service';
import { inspect } from 'util';

@Injectable()
export class IPFSService {
Expand Down Expand Up @@ -62,10 +63,16 @@ export class IPFSService {
throw new HttpException(`Claim ${cid} not found`, HttpStatus.NOT_FOUND);
}

await this.pinsQueue.add(
PIN_CLAIM_JOB_NAME,
JSON.stringify({ cid, claim })
);
try {
await this.pinsQueue.add(
PIN_CLAIM_JOB_NAME,
JSON.stringify({ cid, claim })
);
} catch (e) {
this.logger.debug(`Error to add pin job for cid ${cid}: ${e}`);
const jobsCounts = await this.pinsQueue.getJobCounts();
this.logger.debug(inspect(jobsCounts, { depth: 2, colors: true }));
}
return claim;
}

Expand Down
6 changes: 6 additions & 0 deletions src/modules/ipfs/pin.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
OnQueueError,
OnQueueFailed,
OnQueueStalled,
OnQueueWaiting,
Process,
Processor,
} from '@nestjs/bull';
Expand Down Expand Up @@ -31,6 +32,11 @@ export class PinProcessor {
this.logger.warn(`Stalled ${job.name} claim ${JSON.parse(job.data).cid}`);
}

@OnQueueWaiting()
async OnQueueWaiting(job: Job) {
this.logger.debug(`Waiting ${job.name} claim ${job.data}`);
}

@OnQueueFailed()
onFailed(job: Job, err: Error) {
this.logger.error(
Expand Down

0 comments on commit dca0e34

Please sign in to comment.