Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventsCount caching #98

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions database/migrations/20240610104532_cacheSubscriptionsEvents.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex) {
return knex.schema.alterTable('subscriptions', (table) => {
table.jsonb('eventsCount');
});
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.alterTable('subscriptions', (table) => {
table.dropColumn('eventsCount');
});
};
2 changes: 1 addition & 1 deletion services/datagov.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export default class DatagovService extends moleculer.Service {
}
} while (response?._data?.length);

this.broker.emit('tiles.events.renew');
this.broker.emit('integrations.sync.finished');
return stats;
}

Expand Down
2 changes: 1 addition & 1 deletion services/integrations.fishStockings.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export default class IntegrationsFishStockingsService extends moleculer.Service
}
}

this.broker.emit('tiles.events.renew');
this.broker.emit('integrations.sync.finished');
return stats;
}
}
2 changes: 1 addition & 1 deletion services/integrations.lumbering.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export default class IntegrationsLumberingService extends moleculer.Service {
}
}

this.broker.emit('tiles.events.renew');
this.broker.emit('integrations.sync.finished');
return stats;
}
}
120 changes: 108 additions & 12 deletions services/subscriptions.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { FeatureCollection, parse } from 'geojsonjs';
import _ from 'lodash';
import moleculer, { Context, GenericObject } from 'moleculer';
import { Action, Method, Service } from 'moleculer-decorators';
import { Action, Event, Method, Service } from 'moleculer-decorators';
import PostgisMixin, { asGeoJsonQuery, intersectsQuery } from 'moleculer-postgis';
import { PopulateHandlerFn } from 'moleculer-postgis/src/mixin';
import DbConnection from '../mixins/database.mixin';
Expand All @@ -19,6 +19,7 @@ import {
Table,
throwNoRightsError,
UserAuthMeta,
EntityChangedParams,
} from '../types';
import { LKS_SRID } from '../utils';
import { App } from './apps.service';
Expand All @@ -32,6 +33,10 @@ interface Fields extends CommonFields {
frequency: Frequency;
active: boolean;
geomWithBuffer?: FeatureCollection;
eventsCount?: {
allTime: number;
new: number;
};
}

interface Populates extends CommonPopulates {
Expand All @@ -46,7 +51,13 @@ export type Subscription<

@Service({
name: 'subscriptions',
mixins: [DbConnection({ collection: 'subscriptions' }), PostgisMixin({ srid: LKS_SRID })],
mixins: [
DbConnection({
collection: 'subscriptions',
entityChangedOldEntity: true,
}),
PostgisMixin({ srid: LKS_SRID }),
],
settings: {
fields: {
id: {
Expand All @@ -67,9 +78,15 @@ export type Subscription<
populate: 'users.resolve',
onCreate: async ({ ctx }: FieldHookCallback) => ctx.meta.user?.id,
onUpdate: async ({ ctx, entity }: FieldHookCallback) => {
if (entity.userId !== ctx.meta.user?.id) {
// Allow service updates
if (!ctx.meta?.user?.id) {
return entity.userId;
}

if (entity.userId !== ctx.meta.user.id) {
return throwNoRightsError('Unauthorized');
}

return entity.userId;
},
},
Expand Down Expand Up @@ -124,14 +141,13 @@ export type Subscription<
},

eventsCount: {
virtual: true,
populate: {
keyField: 'id',
handler: (ctx: Context, values: any[]) => {
if (!values?.length) return;
return ctx.call('subscriptions.getEventsCount', { id: values, mapping: true });
},
type: 'object',
readonly: true,
properties: {
allTime: 'number',
new: 'number',
},
get: ({ value }: FieldHookCallback) => value || { allTime: 0, new: 0 },
},

frequency: {
Expand All @@ -144,8 +160,8 @@ export type Subscription<
},
scopes: {
user(query: any, ctx: Context<null, UserAuthMeta>, params: any) {
if (!ctx?.meta?.user?.id) return query;
const { user } = ctx.meta;
if (!user?.id) return query;
query.user = user.id;
return query;
},
Expand Down Expand Up @@ -260,7 +276,7 @@ export default class SubscriptionsService extends moleculer.Service {
const { id, mapping } = ctx.params;
const ids = Array.isArray(id) ? id : [id];

await ctx.call('subscriptions.resolve', { id: ids, throwIfNotExist: true });
await this.resolveEntities(ctx, { id: ids, throwIfNotExist: true });

const adapter = await this.getAdapter(ctx);

Expand Down Expand Up @@ -357,6 +373,63 @@ export default class SubscriptionsService extends moleculer.Service {
return countBySubscriptions;
}

@Method
cacheEventsCount(ctx: Context, id: Subscription['id'], eventsCount: Subscription['eventsCount']) {
return this.updateEntity(
ctx,
{
id,
$set: {
eventsCount,
},
},
{
// will set only eventsCount, without modifying updatedBy and other fields
raw: true,
// eventsCount - readonly field and modified only there
permissive: true,
},
);
}

@Event()
async 'subscriptions.*'(ctx: Context<EntityChangedParams<Subscription>>) {
const type = ctx.params.type;
const subscription = ctx.params.data;
const subscriptionOld = ctx.params.oldData;
const id = subscription.id;

if (!id) return;
if (subscription.geom === subscriptionOld?.geom) return;

switch (type) {
case 'create':
case 'update':
case 'replace':
const eventsCounts = await this.actions.getEventsCount({
id,
mapping: true,
});

await this.cacheEventsCount(ctx, id, eventsCounts[id]);
break;
}
}

@Event()
async 'integrations.sync.finished'(ctx: Context) {
const allSubscriptions: Array<Subscription<null, 'id'>> = await this.findEntities(ctx, {
fields: 'id',
});
arunas-smala marked this conversation as resolved.
Show resolved Hide resolved

const allIds = allSubscriptions.map((s) => s.id);
const eventsCounts = await this.actions.getEventsCount({ id: allIds, mapping: true });

for (const id in eventsCounts) {
await this.cacheEventsCount(ctx, Number(id), eventsCounts[id]);
}
}

@Method
async validateApps({ ctx, value, entity }: FieldHookCallback) {
const apps: App[] = await ctx.call('apps.find', {
Expand All @@ -379,4 +452,27 @@ export default class SubscriptionsService extends moleculer.Service {
throwNoRightsError();
}
}

async started() {
const subscriptionsWithoutCache: Array<Subscription<null, 'id'>> = await this.findEntities(
null,
{
query: {
eventsCount: {
$exists: false,
},
},
fields: ['id'],
},
);

if (!subscriptionsWithoutCache.length) return;

const allIds = subscriptionsWithoutCache.map((s) => s.id);
const eventsCounts = await this.actions.getEventsCount({ id: allIds, mapping: true });

for (const id in eventsCounts) {
await this.cacheEventsCount(null, Number(id), eventsCounts[id]);
}
}
}
2 changes: 1 addition & 1 deletion services/tiles.events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ export default class TilesEventsService extends moleculer.Service {
}

@Event()
async 'tiles.events.renew'() {
async 'integrations.sync.finished'() {
this.superclustersPromises = {};
await this.renewSuperclusterIndex();
}
Expand Down
Loading