Skip to content

Commit

Permalink
Merge pull request #15 from PRX/project-updates
Browse files Browse the repository at this point in the history
Project updates
  • Loading branch information
farski authored Jan 10, 2024
2 parents a16e2fb + 6151f15 commit 51632a7
Show file tree
Hide file tree
Showing 22 changed files with 1,484 additions and 1,191 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
root = true

[*]
charset = utf-8
indent_style = space
indent_size = 2
insert_final_newline = true
Expand Down
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
},
"rules": {
"no-console": ["off"],
"no-else-return": ["off"]
"no-else-return": ["off"],
"import/prefer-default-export": ["off"]
}
}
6 changes: 3 additions & 3 deletions .github/workflows/check-project-std.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ jobs:
check-javascript:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 18
node-version: 20
cache: yarn
- run: yarn install
- run: npm exec eslint -- "**/*.js"
Expand Down
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nodejs 18.15.0
nodejs 20.9.0
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"name": "dovetail-metrics-export",
"version": "1.0.0",
"engines": {
"node": ">= 18.0.0"
"node": ">= 20.0.0"
},
"type": "module",
"description": "Export Dovetail metrics data",
"repository": "git@github.com:PRX/dovetail-metrics-export.git",
"keywords": [
Expand All @@ -17,8 +18,8 @@
},
"homepage": "https://github.com/PRX/dovetail-metrics-export.org#readme",
"dependencies": {
"@google-cloud/bigquery": "^5.12.0",
"@aws-sdk/client-sns": "*"
"@aws-sdk/client-sns": "*",
"@google-cloud/bigquery": "^7.3.0"
},
"devDependencies": {
"@types/aws-lambda": "*",
Expand Down
231 changes: 115 additions & 116 deletions src/extraction.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
const MakeCopies = require("./make_copies");

const queryForDownloads = require("./query-jobs/downloads");
const queryForEpisodeMetadata = require("./query-jobs/episode_metadata");
const queryForGeoMetadata = require("./query-jobs/geo_metadata");
const queryForImpressions = require("./query-jobs/impressions");
const queryForBoostrImpressions = require("./query-jobs/boostr_impressions");
const queryForPodcastMetadata = require("./query-jobs/podcast_metadata");
const queryForUserAgentMetadata = require("./query-jobs/user_agent_metadata");
const queryForAdvertiserMetadata = require("./query-jobs/advertiser_metadata");
const queryForCampaignMetadata = require("./query-jobs/campaign_metadata");
// const queryForCreativeMetadata = require('./query-jobs/creative_metadata');
const queryForFlightMetadata = require("./query-jobs/flight_metadata");
const queryForPlacementMetadata = require("./query-jobs/placement_metadata");
import MakeCopies from "./make_copies";

import queryForDownloads from "./query-jobs/downloads";
import queryForEpisodeMetadata from "./query-jobs/episode_metadata";
import queryForGeoMetadata from "./query-jobs/geo_metadata";
import queryForImpressions from "./query-jobs/impressions";
import queryForBoostrImpressions from "./query-jobs/boostr_impressions";
import queryForPodcastMetadata from "./query-jobs/podcast_metadata";
import queryForUserAgentMetadata from "./query-jobs/user_agent_metadata";
import queryForAdvertiserMetadata from "./query-jobs/advertiser_metadata";
import queryForCampaignMetadata from "./query-jobs/campaign_metadata";
// import queryForCreativeMetadata from './query-jobs/creative_metadata';
import queryForFlightMetadata from "./query-jobs/flight_metadata";
import queryForPlacementMetadata from "./query-jobs/placement_metadata";

// The values should never include an "*", or output files could get real weird
const JOB_TYPES = {
Expand Down Expand Up @@ -85,109 +85,108 @@ async function queryForExtractionType(extractionType, config) {
}
}

module.exports = {
types: Object.values(JOB_TYPES),
/**
* Runs an extraction job of a specific type for a given configuration
* @param {string} extractionType
* @param {ExportConfig} config
* @returns {Promise<void>}
*/
run: async function main(extractionType, config) {
console.log(extractionType);
if (!config.extractions.includes(extractionType)) {
return;
}

const queryJob = await queryForExtractionType(extractionType, config);

if (!queryJob) {
return;
}

console.log(
JSON.stringify({
QueryJob: { Type: extractionType, Metadata: queryJob.metadata },
})
);
export const types = Object.values(JOB_TYPES);

/**
* Runs an extraction job of a specific type for a given configuration
* @param {string} extractionType
* @param {ExportConfig} config
* @returns {Promise<void>}
*/
export async function run(extractionType, config) {
console.log(extractionType);
if (!config.extractions.includes(extractionType)) {
return;
}

const queryJob = await queryForExtractionType(extractionType, config);

const queryMetadata = await new Promise((resolve, reject) => {
queryJob.on("complete", resolve);
queryJob.on("error", reject);
});

const bucketName = process.env.GCP_EXPORT_BUCKET;

// All extract jobs use multi-file wildcard output
const formatExt = { NEWLINE_DELIMITED_JSON: ".ndjson", CSV: ".csv" }[
config.destinationFormat
];
const compressionExt = { NONE: "", GZIP: ".gz" }[config.compression];
const fileExtension = `${formatExt}${compressionExt}`;
const filename = `${extractionType}-*${fileExtension}`;
const objectName = [gcsObjectPrefix(config), filename].join("");

const [extractJob] = await config.bigQueryClient.createJob({
configuration: {
extract: {
// https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
sourceTable: queryMetadata.configuration.query.destinationTable,
// Ensure that the filename after the prefix does not collide with any
// other files created by this function, or they may overwrite each
// other
destinationUri: `gs://${bucketName}/${objectName}`,
destinationFormat: config.destinationFormat,
printHeader: true,
compression: config.compression,
},
if (!queryJob) {
return;
}

console.log(
JSON.stringify({
QueryJob: { Type: extractionType, Metadata: queryJob.metadata },
}),
);

const queryMetadata = await new Promise((resolve, reject) => {
queryJob.on("complete", resolve);
queryJob.on("error", reject);
});

const bucketName = process.env.GCP_EXPORT_BUCKET;

// All extract jobs use multi-file wildcard output
const formatExt = { NEWLINE_DELIMITED_JSON: ".ndjson", CSV: ".csv" }[
config.destinationFormat
];
const compressionExt = { NONE: "", GZIP: ".gz" }[config.compression];
const fileExtension = `${formatExt}${compressionExt}`;
const filename = `${extractionType}-*${fileExtension}`;
const objectName = [gcsObjectPrefix(config), filename].join("");

const [extractJob] = await config.bigQueryClient.createJob({
configuration: {
extract: {
// https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
sourceTable: queryMetadata.configuration.query.destinationTable,
// Ensure that the filename after the prefix does not collide with any
// other files created by this function, or they may overwrite each
// other
destinationUri: `gs://${bucketName}/${objectName}`,
destinationFormat: config.destinationFormat,
printHeader: true,
compression: config.compression,
},
});

await new Promise((resolve, reject) => {
extractJob.on("complete", resolve);
extractJob.on("error", reject);
});

// destinationUriFileCounts is an array where each value corresponds to
// a URL provided in destinationUri in the job. We only ever give one URL,
// so we only get one value.
const outputFileCount =
extractJob.metadata.statistics.extract.destinationUriFileCounts[0];

console.log(
JSON.stringify({
ExtractJob: { Type: extractionType, Metadata: extractJob.metadata },
})
},
});

await new Promise((resolve, reject) => {
extractJob.on("complete", resolve);
extractJob.on("error", reject);
});

// destinationUriFileCounts is an array where each value corresponds to
// a URL provided in destinationUri in the job. We only ever give one URL,
// so we only get one value.
const outputFileCount =
extractJob.metadata.statistics.extract.destinationUriFileCounts[0];

console.log(
JSON.stringify({
ExtractJob: { Type: extractionType, Metadata: extractJob.metadata },
}),
);

const copyMachines = [];

// Because the extaction could have generated multiple files, we need to
// copy however many were created.
for (let i = 0; i < outputFileCount; i += 1) {
// BigQuery makes files with 12-digit numbers, starting at 000000000000
const fileSequenceId = `${i}`.padStart(12, "0");

// Replace the wildcard in the filename we gave to BigQuery in the job
// configuration with a 12-digit number, so it matches one of the files
// that was created in GCS.
const numberedObjectName = objectName.replace(
`-*${fileExtension}`,
`-${fileSequenceId}${fileExtension}`,
);

const copyMachines = [];

// Because the extaction could have generated multiple files, we need to
// copy however many were created.
for (let i = 0; i < outputFileCount; i += 1) {
// BigQuery makes files with 12-digit numbers, starting at 000000000000
const fileSequenceId = `${i}`.padStart(12, "0");

// Replace the wildcard in the filename we gave to BigQuery in the job
// configuration with a 12-digit number, so it matches one of the files
// that was created in GCS.
const numberedObjectName = objectName.replace(
`-*${fileExtension}`,
`-${fileSequenceId}${fileExtension}`
);

// Copy that specific numbered file
copyMachines.push(
MakeCopies(
extractionType,
config,
bucketName,
numberedObjectName,
fileSequenceId
)
);
}

await Promise.all(copyMachines);
},
};
// Copy that specific numbered file
copyMachines.push(
MakeCopies(
extractionType,
config,
bucketName,
numberedObjectName,
fileSequenceId,
),
);
}

await Promise.all(copyMachines);
}
13 changes: 6 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const { BigQuery } = require("@google-cloud/bigquery");

const extraction = require("./extraction");
import { BigQuery } from "@google-cloud/bigquery";
import { types, run } from "./extraction";

/**
* @typedef {object} ExportConfig
Expand Down Expand Up @@ -35,7 +34,7 @@ function defaultRangeStart() {
return new Date(d.setDate(d.getDate() - 1));
}

exports.handler = async (event, context) => {
export const handler = async (event, context) => {
console.log(JSON.stringify({ Event: event }));

const gcpConfig = JSON.parse(process.env.BIGQUERY_CLIENT_CONFIG);
Expand Down Expand Up @@ -106,7 +105,7 @@ exports.handler = async (event, context) => {
// Include all extraction types by default
const extractions =
!event.Extractions || !Array.isArray(event.Extractions)
? extraction.types.filter((t) => !["boostr_impressions"].includes(t))
? types.filter((t) => !["boostr_impressions"].includes(t))
: event.Extractions;

const inclusiveRangeStart = event.Range?.[0]
Expand All @@ -120,7 +119,7 @@ exports.handler = async (event, context) => {
JSON.stringify({
Extractions: extractions,
Range: [inclusiveRangeStart, exclusiveRangeEnd],
})
}),
);

// A prefix defined on the input, which should include a trailing slash if
Expand All @@ -147,6 +146,6 @@ exports.handler = async (event, context) => {
const doExport = true;

if (doExport) {
await Promise.all(extraction.types.map((t) => extraction.run(t, config)));
await Promise.all(types.map((t) => run(t, config)));
}
};
Loading

0 comments on commit 51632a7

Please sign in to comment.