Skip to content

Commit

Permalink
Merge pull request #150 from linkedconnections/development
Browse files Browse the repository at this point in the history
v2.3.0
  • Loading branch information
julianrojas87 authored Jan 3, 2025
2 parents 0f5b267 + 9eafb4c commit d357226
Show file tree
Hide file tree
Showing 7 changed files with 1,422 additions and 1,154 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16.x, 18.x]
node-version: [18.x, 20.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/
steps:
- name: Checkout
Expand Down
69 changes: 33 additions & 36 deletions bin/gtfs2lc.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env node

var program = require('commander'),
gtfs2lc = require('../lib/gtfs2lc.js'),
fs = require('fs'),
del = require('del');
const program = require('commander');
const gtfs2lc = require('../lib/gtfs2lc.js');
const fs = require('fs');
const del = require('del');

console.error("GTFS to linked connections converter use --help to discover more functions");

Expand All @@ -30,7 +30,7 @@ if (program.path.endsWith('/')) {
program.path = program.path.slice(0, -1);
}

var output = program.output || program.path;
const output = program.output || program.path;
if (output.endsWith('/')) {
output = output.slice(0, -1);
}
Expand All @@ -40,41 +40,38 @@ if (program.baseUris) {
baseUris = JSON.parse(fs.readFileSync(program.baseUris, 'utf-8'));
}

var mapper = new gtfs2lc.Connections({
store: !program.store || program.store === 'undefined' ? 'MemStore' : program.store,
format: !program.format || program.format === 'undefined' ? 'json' : program.format,
compressed: program.compressed,
fresh: program.fresh,
baseUris: baseUris
});

var resultStream = null;
mapper.resultStream(program.path, output, function (path) {
if (program.stream) {
fs.createReadStream(path).pipe(process.stdout);
} else {
console.error('Linked Connections successfully created at ' + path + '!');
}
});

process.on('SIGINT', function () {
console.error("\nSIGINT Received – Cleaning up");
if (resultStream) {
resultStream.end();
} else {
del([
process.on('SIGINT', async () => {
console.error("\nSIGINT Received, cleaning up...");
await del(
[
output + '/.stops',
output + '/.routes',
output + '/.trips',
output + '/.services',
output + '/raw_*'
],
{ force: true })
.then(function () {
process.exit(0);
}, function (err) {
console.error(err);
process.exit(1);
});
}
{ force: true }
);
console.error("Cleaned up!");
});

async function run() {
console.error(`Converting GTFS to Linked Connections...`);
const mapper = new gtfs2lc.Connections({
store: !program.store || program.store === 'undefined' ? 'MemStore' : program.store,
format: !program.format || program.format === 'undefined' ? 'json' : program.format,
compressed: program.compressed,
fresh: program.fresh,
baseUris: baseUris
});

const connectionsFile = await mapper.convert(program.path, output);

if (program.stream) {
fs.createReadStream(connectionsFile).pipe(process.stdout);
} else {
console.error(`Linked Connections successfully created at ${connectionsFile}`);
}
}

run();
236 changes: 121 additions & 115 deletions lib/gtfs2connections.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const os = require('os');
const fs = require('fs');
const zlib = require('zlib');
const path = require('path');
Expand All @@ -20,111 +21,121 @@ const Connections2Triples = require('./Connections2Triples');
const readdir = util.promisify(fs.readdir);
const exec = util.promisify(ChildProcess.exec);

const numCPUs = require('os').cpus().length;


const Mapper = function (options) {
this._options = options;
if (!this._options.store) {
this._options.store = 'MemStore';
class GTFSMapper {
constructor(options) {
this._options = options;
if (!this.options.store) {
this.options.store = 'MemStore';
}
}
};

/**
* Returns a resultStream for connections
* Step 1: Clean up and sort source files by calling bin/gtfs2lc-sort.sh
* Step 2: Create index of stops.txt, routes.txt, trips.txt and,
* convert calendar_dates.txt and calendar.txt to service ids mapped to a long list of dates.
* Step 3: Produce (diff) connection rules based on available CPU cores
* Step 4: Use Node.js worker threads to process the connection rules in parallel.
* Step 5: Merge the files created in parallel and return the file path.
*/
Mapper.prototype.resultStream = async function (path, output, done) {
const t0 = new Date();
// Step 1: Clean up and sort source files by calling bin/gtfs2lc-sort.sh
console.error('Cleaning up and sorting source files');
await cleanUpSources(path);

// Step 2: Read all the required GTFS files and create reusable indexes
console.error('Creating index stores...');
const stores = await StoreManager(path, output, this._options.store);

// Step 3: Produce (diff) connection rules based on available CPU cores
console.error('Creating Connection rules...');
await StopTimes2Cxs(path, output, stores, this._options.fresh);

// Step 4: Materialize connections in parallel using worker threads
let w = 0;
const raws = [];
// Create as many worker threads as there are available CPUs
for (let i = 0; i < numCPUs; i++) {
const worker = new Worker(__filename, {
workerData: {
instance: i,
output,
options: this._options
}
});

console.error(`Materializing Connections in worker thread (PID ${worker.threadId})`);

worker.on('message', async () => {
raws.push(`raw_${w}`);
w++;
if (w === numCPUs) {
// Step 5: Merge all the created files into one
const format = this._options.format;
let ext = null;
let gz = '';
let mergeCommand = 'zcat';

if (!format || ['json', 'mongo', 'jsonld', 'mongold'].indexOf(format) >= 0) {
await appendLineBreaks(output);
ext = 'json';
} else if (format === 'csv') {
ext = 'csv';
} else if (format === 'turtle') {
await removePrefixes(output);
ext = 'ttl';
} else if (format === 'ntriples') {
ext = 'nt';
}

try {
console.error('Merging final Linked Connections file...');
if (this._options.compressed) {
mergeCommand = 'cat';
gz = '.gz';
/**
* Returns a resultStream for connections
* Step 1: Clean up and sort source files by calling bin/gtfs2lc-sort.sh
* Step 2: Create index of stops.txt, routes.txt, trips.txt and,
* convert calendar_dates.txt and calendar.txt to service ids mapped to a long list of dates.
* Step 3: Produce (diff) connection rules based on available CPU cores
* Step 4: Use Node.js worker threads to process the connection rules in parallel.
* Step 5: Merge the files created in parallel and return the file path.
*/
convert(path, output) {
const numCPUs = os.cpus().length;

return new Promise(async (resolve, reject) => {
const t0 = new Date();
// Step 1: Clean up and sort source files by calling bin/gtfs2lc-sort.sh
console.error('Cleaning up and sorting source files');
await cleanUpSources(path);

// Step 2: Read all the required GTFS files and create reusable indexes
console.error('Creating index stores...');
const stores = await StoreManager(path, output, this.options.store);

// Step 3: Produce (diff) connection rules based on available CPU cores
console.error('Creating Connection rules...');
await StopTimes2Cxs(path, output, stores, this.options.fresh);

// Step 4: Materialize connections in parallel using worker threads
let w = 0;
const raws = [];
// Create as many worker threads as there are available CPUs
for (let i = 0; i < numCPUs; i++) {
const worker = new Worker(__filename, {
workerData: {
instance: i,
output,
options: this.options
}

// Join all resulting files into one
await exec(`for i in ${raws.map(r => { return `${r}.${ext}.gz` }).join(" ")} ; do ${mergeCommand} "$i" >> linkedConnections.${ext}${gz} && rm "$i" || break ; done`, { cwd: output });
let t1 = new Date();
console.error('linkedConnections.' + ext + ' File created in ' + (t1.getTime() - t0.getTime()) + ' ms');
await del(
[
output + '/connections_*',
output + '/stops.db',
output + '/routes.db',
output + '/trips.db',
output + '/services.db'
],
{ force: true }
);
done(`${output}/linkedConnections.${ext}`);
} catch (err) {
throw err;
}
}
}).on('error', err => {
console.error(err);
}).on('exit', (code) => {
if (code !== 0) {
console.error(new Error(`Worker stopped with exit code ${code}`));
});

console.error(`Materializing Connections in worker thread (PID ${worker.threadId})`);

worker.on('message', async () => {
raws.push(`raw_${w}`);
w++;
if (w === numCPUs) {
// Step 5: Merge all the created files into one
const format = this.options.format;
let ext = null;
let gz = '';
let mergeCommand = 'zcat';

if (!format || ['json', 'mongo', 'jsonld', 'mongold'].indexOf(format) >= 0) {
await appendLineBreaks(output);
ext = 'json';
} else if (format === 'csv') {
ext = 'csv';
} else if (format === 'turtle') {
await removePrefixes(output);
ext = 'ttl';
} else if (format === 'ntriples') {
ext = 'nt';
}

try {
console.error('Merging final Linked Connections file...');
if (this.options.compressed) {
mergeCommand = 'cat';
gz = '.gz';
}

// Join all resulting files into one
const raws_joined = raws.map(r => { return `${r}.${ext}.gz` }).join(" ");
await exec(`${mergeCommand} ${raws_joined} > linkedConnections.${ext}${gz}`, { cwd: output });
let t1 = new Date();
console.error('linkedConnections.' + ext + ' File created in ' + (t1.getTime() - t0.getTime()) + ' ms');
await del(
[
output + '/connections_*',
output + '/stops.db',
output + '/routes.db',
output + '/trips.db',
output + '/services.db'
],
{ force: true }
);
resolve(`${output}/linkedConnections.${ext}`);
} catch (err) {
throw err;
}
}
}).on('error', err => {
console.error(err);
reject(err);
}).on('exit', (code) => {
if (code !== 0) {
console.error(new Error(`Worker stopped with exit code ${code}`));
reject(err);
}
});
}
});
}
};

get options() {
return this._options;
}
}

async function cleanUpSources(sources) {
try {
Expand All @@ -137,27 +148,23 @@ async function cleanUpSources(sources) {

async function appendLineBreaks(output) {
const files = (await readdir(output)).filter(raw => raw.startsWith('raw_'));
const promises = [];

for (const [i, f] of files.entries()) {
if (i < files.length - 1) {
promises.push(exec(`echo "" | gzip >> ${f}`, { cwd: output }));
// Make sure the file ends with a newline. For some reason sometimes it fails to append.
while (!(await exec(`zcat ${f} | tail -1`, { cwd: output }))["stdout"].endsWith("\n")) {
await exec(`echo "" | gzip >> ${f}`, { cwd: output });
}
}

await Promise.all(promises);
}

async function removePrefixes(output) {
const files = (await readdir(output)).filter(raw => raw.startsWith('raw_'));
const promises = [];
const files = (await readdir(output)).filter(raw => raw.startsWith('raw_') && raw.endsWith('.ttl.gz'));
for (const [i, f] of files.entries()) {
if (i > 0) {
// TODO: find a not hard-coded way to remove prefixes
promises.push(exec(`zcat ${f} | tail -n +4 | gzip > ${f}.temp && mv ${f}.temp ${f}`, { cwd: output }))
await exec(`zcat ${f} | tail -n +4 | gzip > ${f}.temp && mv ${f}.temp ${f}`, { cwd: output });
}
}

await Promise.all(promises);
}

// Code executed only on a Worker Thread
Expand Down Expand Up @@ -231,13 +238,12 @@ if (!isMainThread) {
.pipe(new N3.StreamWriter({ format: 'N-Triples' }));
}

connectionStream.on('finish', () => {
parentPort.postMessage('done');
});

connectionStream.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(`${workerData['output']}/raw_${workerData['instance']}.${fmt}.gz`));
.pipe(fs.createWriteStream(`${workerData['output']}/raw_${workerData['instance']}.${fmt}.gz`))
.on('finish', () => {
parentPort.postMessage('done');
});;

}

module.exports = Mapper;
module.exports = GTFSMapper;
Loading

0 comments on commit d357226

Please sign in to comment.