Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setup tiles mixin #108

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@types/node": "^20.12.12",
"@types/qs": "^6.9.15",
"@types/showdown": "^2.0.6",
"@types/supercluster": "^7.1.3",
"@types/transform-coordinates": "^1.0.2",
"@types/unzipper": "^0.10.9",
"@typescript-eslint/eslint-plugin": "^5.11.0",
Expand All @@ -45,6 +44,7 @@
"ts-node": "^10.9.2"
},
"dependencies": {
"@aplinkosministerija/moleculer-accounts": "^1.3.0",
"@moleculer/database": "github:ambrazasp/moleculerjs-database",
"@r2d2bzh/moleculer-cron": "^0.1.4",
"@sentry/node": "^7.114.0",
Expand Down Expand Up @@ -72,11 +72,9 @@
"pg-hstore": "^2.3.4",
"postmark": "^4.0.2",
"showdown": "^2.1.0",
"supercluster": "7.1.5",
"transform-coordinates": "^1.0.0",
"typescript": "^5.4.5",
"unzipper": "^0.11.6",
"vt-pbf": "^3.1.3"
"unzipper": "^0.11.6"
},
"engines": {
"node": ">=20.0.0 <21.0.0"
Expand Down
279 changes: 19 additions & 260 deletions services/tiles.events.service.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
'use strict';

import moleculer, { Context, GenericObject } from 'moleculer';
import { Action, Event, Method, Service } from 'moleculer-decorators';
import PostgisMixin from 'moleculer-postgis';
import DbConnection from '../mixins/database.mixin';
import moleculer, { Context } from 'moleculer';
import { Event, Method, Service } from 'moleculer-decorators';
import {
COMMON_DEFAULT_SCOPES,
COMMON_SCOPES,
CommonFields,
CommonPopulates,
EndpointType,
Table,
throwNotFoundError,
} from '../types';
import Supercluster from 'supercluster';
// @ts-ignore
import vtpbf from 'vt-pbf';
import _ from 'lodash';
import { LKS_SRID, parseToJsonIfNeeded } from '../utils';
import { applyEventsQueryBySubscriptions } from './events.service';
import { Subscription } from './subscriptions.service';
import { Knex } from 'knex';

import config from '../knexfile';
import { TilesMixin } from '@aplinkosministerija/moleculer-accounts';
interface Fields extends CommonFields {
name: string;
body: string;
Expand All @@ -41,40 +35,20 @@ export type TilesEvent<
F extends keyof (Fields & Populates) = keyof Fields,
> = Table<Fields, Populates, P, F>;

const superclusterOpts = {
radius: 64,
extent: 512,
generateId: true,
reduce: (acc: any, props: any) => acc,
};

const isLocalDevelopment = process.env.NODE_ENV === 'local';
const WGS_SRID = 4326;

function getSuperclusterHash(query: any = {}) {
if (typeof query !== 'string') {
query = JSON.stringify(query);
}
return query || 'default';
}

@Service({
name: 'tiles.events',
mixins: [
DbConnection({
collection: 'events',
createActions: {
create: false,
update: false,
createMany: false,
remove: false,
},
}),
PostgisMixin({
srid: WGS_SRID,
geojson: {
maxDecimalDigits: 5,
TilesMixin({
config,
opts: {
collection: 'events',
},
srid: LKS_SRID,
layerName: 'events',
maxClusteringZoomLevel: 12,
preloadClustersOnStart: !isLocalDevelopment,
}),
],
settings: {
Expand Down Expand Up @@ -127,6 +101,12 @@ function getSuperclusterHash(query: any = {}) {
get: {
auth: EndpointType.PUBLIC,
},
getTileItems: {
auth: EndpointType.PUBLIC,
},
getTile: {
auth: EndpointType.PUBLIC,
},
find: {
rest: null,
},
Expand All @@ -141,208 +121,11 @@ function getSuperclusterHash(query: any = {}) {
get: ['applyFilters'],
resolve: ['applyFilters'],
getEventsFeatureCollection: ['applyFilters'],
getTile: ['applyFilters'],
},
},
})
export default class TilesEventsService extends moleculer.Service {
@Action({
rest: 'GET /:z/:x/:y',
params: {
x: 'number|convert|min:0|integer',
z: 'number|convert|min:0|integer',
y: 'number|convert|min:0|integer',
query: ['object|optional', 'string|optional'],
},
auth: EndpointType.PUBLIC,

timeout: 0,
})
async getTile(
ctx: Context<
{ x: number; y: number; z: number; query: string | GenericObject },
{ $responseHeaders: any; $responseType: string }
>,
) {
const { x, y, z } = ctx.params;

ctx.params.query = parseToJsonIfNeeded(ctx.params.query);
ctx.meta.$responseType = 'application/x-protobuf';

// make clusters
if (z <= 12) {
const supercluster: Supercluster = await this.getSupercluster(ctx);

const tileEvents = supercluster.getTile(z, x, y);

const layers: any = {};

if (tileEvents) {
layers.events = tileEvents;
}

return Buffer.from(vtpbf.fromGeojsonVt(layers, { extent: superclusterOpts.extent }));
}

// show real geometries
const tileData = await this.getMVTTiles(ctx);
return tileData.tile;
}

@Action({
rest: 'GET /cluster/:cluster/items',
params: {
cluster: 'number|convert|positive|integer',
page: 'number|convert|positive|integer|optional',
pageSize: 'number|convert|positive|integer|optional',
},

auth: EndpointType.PUBLIC,
})
async getTileItems(
ctx: Context<
{
cluster: number;
query: string | GenericObject;
page?: number;
pageSize?: number;
populate?: string | string[];
sort?: string | string[];
},
{ $responseHeaders: any; $responseType: string }
>,
) {
const { cluster } = ctx.params;
const page = ctx.params.page || 1;
const pageSize = ctx.params.pageSize || 10;
const { sort, populate } = ctx.params;
const supercluster: Supercluster = await this.getSupercluster(ctx);

if (!supercluster) throwNotFoundError('No items!');

const ids = supercluster.getLeaves(cluster, Infinity).map((i) => i.properties.id);

if (!ids?.length) {
return {
rows: [],
total: 0,
page,
pageSize,
totalPages: 0,
};
}

return ctx.call('tiles.events.list', {
query: {
// knex support for `$in` is limited to 30K or smth
$raw: `id IN ('${ids.join("', '")}')`,
},
populate,
page,
pageSize,
sort,
});
}

@Method
async getMVTTiles(ctx: Context<{ query: any; x: number; y: number; z: number }>) {
ctx = await this.applyFilters(ctx);
const adapter = await this.getAdapter(ctx);
const table = adapter.getTable();
const knex: Knex = adapter.client;

const query = await this.getComputedQuery(ctx);

const fields = ['id'];
const { x, y, z } = ctx.params;

const WM_SRID = 3857;
const envelopeQuery = `ST_TileEnvelope(${z}, ${x}, ${y})`;
const transformedEnvelopeQuery = `ST_Transform(${envelopeQuery}, ${LKS_SRID})`;
const transformedGeomQuery = `ST_Transform(ST_CurveToLine("geom"), ${WM_SRID})`;

const asMvtGeomQuery = adapter
.computeQuery(table, query)
.whereRaw(`ST_Intersects(events.geom, ${transformedEnvelopeQuery})`)
.select(
...fields,
knex.raw(`ST_AsMVTGeom(${transformedGeomQuery}, ${envelopeQuery}, 4096, 64, true) AS geom`),
);

const tileQuery = knex
.select(knex.raw(`ST_AsMVT(tile, 'events', 4096, 'geom') as tile`))
.from(asMvtGeomQuery.as('tile'))
.whereNotNull('geom');

return tileQuery.first();
}

@Action({
timeout: 0,
})
async getEventsFeatureCollection(ctx: Context<{ query: any }>) {
const adapter = await this.getAdapter(ctx);
const table = adapter.getTable();
const knex = adapter.client;

const query = await this.getComputedQuery(ctx);
const fields = ['id'];

const eventsQuery = adapter
.computeQuery(table, query)
.select(...fields, knex.raw(`ST_Transform(ST_PointOnSurface(geom), ${WGS_SRID}) as geom`));

const res = await knex
.select(knex.raw(`ST_AsGeoJSON(e)::json as feature`))
.from(eventsQuery.as('e'));

return {
type: 'FeatureCollection',
features: res.map((i: any) => i.feature),
};
}

@Method
async getComputedQuery(ctx: Context<{ query: any }>) {
let { params } = ctx;
params = this.sanitizeParams(params);
params = await this._applyScopes(params, ctx);
params = this.paramsFieldNameConversion(params);

return parseToJsonIfNeeded(params.query) || {};
}

@Method
async getSupercluster(ctx: Context<{ query: any }>) {
const hash = getSuperclusterHash(ctx.params.query);

if (!this.superclusters?.[hash]) {
await this.renewSuperclusterIndex(ctx.params.query);
}

return this.superclusters[hash];
}

@Method
async renewSuperclusterIndex(query: any = {}) {
// TODO: apply to all superclusters (if exists)
const hash = getSuperclusterHash(query);

const supercluster = new Supercluster(superclusterOpts);

// Singleton!
if (this.superclustersPromises[hash]) {
return this.superclustersPromises[hash];
}

this.superclustersPromises[hash] = this.actions.getEventsFeatureCollection({ query });
const featureCollection: any = await this.superclustersPromises[hash];

supercluster.load(featureCollection.features || []);
this.superclusters[hash] = supercluster;

delete this.superclustersPromises[hash];
}

@Method
async applyFilters(ctx: Context<any>) {
ctx.params.query = parseToJsonIfNeeded(ctx.params.query) || {};
Expand All @@ -359,33 +142,9 @@ export default class TilesEventsService extends moleculer.Service {
return ctx;
}

@Event()
async '$broker.started'() {
this.superclusters = {};
this.superclustersPromises = {};
// This takes time
if (!isLocalDevelopment) {
try {
await this.renewSuperclusterIndex();
} catch (err) {
console.error('Cannot create super clusters', err);
}
}
}

@Event()
async 'cache.clean.tiles.events'() {
await this.broker.cacher?.clean(`${this.fullName}.**`);
}

@Event()
async 'integrations.sync.finished'() {
this.superclustersPromises = {};
await this.renewSuperclusterIndex();
}

started() {
this.superclusters = {};
this.superclustersPromises = {};
}
}
Loading