Skip to content

Commit

Permalink
Merge pull request #225 from bcgov/sync
Browse files Browse the repository at this point in the history
Ensure duplicate versions are deleted during version sync process
  • Loading branch information
kyle1morel authored Nov 6, 2023
2 parents d12ee3f + d3b92d2 commit 8e4f854
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 50 deletions.
File renamed without changes.
13 changes: 12 additions & 1 deletion app/src/components/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const utils = {
*/
addDashesToUuid(str) {
if ((typeof str === 'string' || str instanceof String) && str.length === 32) {
return `${str.slice(0, 8)}-${str.slice(8, 12)}-${str.slice(12, 16)}-${str.slice(16, 20)}-${str.slice(20)}`.toLowerCase();
return `${str.slice(0, 8)}-${str.slice(8, 12)}-${str.slice(12, 16)}-${str.slice(16, 20)}-${str.slice(20)}`
.toLowerCase();
}
else return str;
},
Expand Down Expand Up @@ -247,6 +248,16 @@ const utils = {
return result;
},

/**
* @function getUniqueObjects
* @param {object[]} arr array of objects
* @param {string} key key of object property whose value we are comparing
* @returns array of unique objects based on value of a given property
*/
getUniqueObjects(array, key) {
return [...new Map(array.map(item => [item[key], item])).values()];
},

/**
* @function groupByObject
* Re-structure array of nested objects
Expand Down
16 changes: 12 additions & 4 deletions app/src/controllers/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ const controller = {
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER));

// get source S3 VersionId
const sourceS3VersionId = await getS3VersionId(req.query.s3VersionId, addDashesToUuid(req.query.versionId), objId);
const sourceS3VersionId = await getS3VersionId(
req.query.s3VersionId,
addDashesToUuid(req.query.versionId),
objId
);

// get version from S3
const source = await storageService.headObject({
Expand All @@ -115,7 +119,11 @@ const controller = {
throw new Error('Cannot copy an object larger than 5GB');
}
// get existing tags on source object, eg: { 'animal': 'bear', colour': 'black' }
const sourceObject = await storageService.getObjectTagging({ filePath: objPath, s3VersionId: sourceS3VersionId, bucketId: bucketId });
const sourceObject = await storageService.getObjectTagging({
filePath: objPath,
s3VersionId: sourceS3VersionId,
bucketId: bucketId
});
const sourceTags = Object.assign({},
...(sourceObject.TagSet?.map(item => ({ [item.Key]: item.Value })) ?? [])
);
Expand Down Expand Up @@ -463,11 +471,11 @@ const controller = {
// if request is to delete a version
if (data.s3VersionId) {
// delete version in DB
await versionService.delete(objId, s3Response.VersionId, userId);
await versionService.delete(objId, s3Response.VersionId);
// prune tags amd metadata
await metadataService.pruneOrphanedMetadata();
await tagService.pruneOrphanedTags();
// if other versions in DB, delete object record
// if no other versions in DB, delete object record
const remainingVersions = await versionService.list(objId);
if (remainingVersions.length === 0) await objectService.delete(objId);
} else { // else deleting the object
Expand Down
36 changes: 25 additions & 11 deletions app/src/services/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const log = require('../components/log')(module.filename);
const utils = require('../db/models/utils');

const { ObjectModel, Version } = require('../db/models');
const { getKeyValue, toLowerKeys } = require('../components/utils');
const { getKeyValue, getUniqueObjects, toLowerKeys } = require('../components/utils');

const metadataService = require('./metadata');
const objectService = require('./object');
Expand Down Expand Up @@ -207,12 +207,27 @@ const service = {
.map(dm => ({ DeleteMarker: true, ...dm }))
.concat(s3VersionsRaw.Versions);

// Drop versions in COMS that are no longer in S3
await Promise.all(comsVersions.map(async cv => {
if (cv.s3VersionId && !s3Versions.some(s3v => (s3v.VersionId === cv.s3VersionId))) {
await versionService.delete(comsObject.id, (cv.s3VersionId ?? null), userId, trx);
}
}));
// delete versions from COMS that are not in S3
// get list of unique coms versions
const uniqueCVIds = getUniqueObjects(comsVersions, 's3VersionId').map(v => v.id);

// get COMS versions that are not in S3 (matching on s3VersionId) OR not
// in list of unique COMS versions (matching on id)
const cVsToDelete = comsVersions.filter(cv => {
const notInS3 = !s3Versions.some(s3v => (s3v.VersionId === String(cv.s3VersionId)));
const isDuplicate = !uniqueCVIds.includes(cv.id);
return notInS3 || isDuplicate;
});

if(cVsToDelete.length){
await Version.query(trx)
.delete()
.where('objectId', comsObject.id)
.whereNotNull('s3VersionId')
.whereIn('id', cVsToDelete.map(cv => cv.id));
}
// delete versions from comsVersions array for further comparisons
const comsVersionsToKeep = comsVersions.filter(cv => !cVsToDelete.some(v => cv.id === v.id));

// Add and Update versions in COMS
const response = await Promise.all(s3Versions.map(async s3Version => {
Expand Down Expand Up @@ -252,14 +267,13 @@ const service = {
// Version record not modified
else return { version: existingVersion };
}

// S3 Object is in versioned bucket (ie: if VersionId is not 'null')
else {
const comsVersion = comsVersions.find(cv => cv.s3VersionId === s3Version.VersionId);
const comsVersion = comsVersionsToKeep.find(cv => cv.s3VersionId === s3Version.VersionId);

if (comsVersion) { // Version is in COMS
if (s3Version.IsLatest) { // Patch isLatest flags if changed
const updated = await versionService.updateIsLatest(comsVersion.id, trx);
const updated = await versionService.updateIsLatest(comsObject.id, trx);
return { modified: true, version: updated };
} else { // Version record not modified
return { version: comsVersion };
Expand Down Expand Up @@ -312,7 +326,7 @@ const service = {
trx = etrx ? etrx : await Version.startTransaction();
let response = [];

// Fetch COMS Object record if necessary
// Fetch COMS version record if necessary
const comsVersion = typeof version === 'object' ? version : await versionService.get({ versionId: version }, trx);

// Short circuit if version is a delete marker
Expand Down
42 changes: 27 additions & 15 deletions app/src/services/version.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
const { v4: uuidv4, NIL: SYSTEM_USER } = require('uuid');
const { Version } = require('../db/models');

const objectService = require('./object');
const storageService = require('./storage');

/**
* The Version DB Service
*/
Expand Down Expand Up @@ -112,7 +115,7 @@ const service = {
* @returns {Promise<integer>} The number of remaining versions in db after the delete
* @throws The error encountered upon db transaction failure
*/
delete: async (objId, s3VersionId, userId = undefined, etrx = undefined) => {
delete: async (objId, s3VersionId, etrx = undefined) => {
let trx;
try {
trx = etrx ? etrx : await Version.startTransaction();
Expand All @@ -125,9 +128,7 @@ const service = {
.returning('*')
.throwIfNotFound();

// sync other versions with isLatest
const { syncVersions } = require('./sync');
await syncVersions(objId, userId, trx);
await service.updateIsLatest(objId, trx);

if (!etrx) await trx.commit();
return Promise.resolve(response);
Expand Down Expand Up @@ -277,30 +278,41 @@ const service = {

/**
* @function updateIsLatest
* Set specified version as latest in COMS db
* and ensures only one version has isLatest: true
* @param {string} versionId COMS version uuid
* Set version as latest in COMS db.
* Determines latest by checking S3 and ensures only one version has isLatest: true
* @param {string} objectId COMS object uuid
* @param {object} [etrx=undefined] An optional Objection Transaction object
* @returns {object} Version model of provided version in db
* @returns {object} Version model of latest version
*/
updateIsLatest: async (versionId, etrx = undefined) => {
updateIsLatest: async (objectId, etrx = undefined) => {
// TODO: consider having accepting a `userId` argument for version.updatedBy when a version becomes 'latest'
let trx;
try {
trx = etrx ? etrx : await Version.startTransaction();

// get VersionId of latest version in S3
const object = await objectService.read(objectId, trx);
const s3Versions = await storageService.listAllObjectVersions({
filePath: object.path,
bucketId: object.bucketId
});
const latestS3VersionId = s3Versions.DeleteMarkers
.concat(s3Versions.Versions)
.filter((v) => v.IsLatest)[0].VersionId;

// get same version from COMS db
const current = await Version.query(trx)
.findById(versionId)
.first()
.where({ objectId: objectId, s3VersionId: latestS3VersionId })
.throwIfNotFound();

let updated;
// if the version is not already marked as isLatest
// update as latest if not already and fetch
if (!current.isLatest) {
// update this version as latest and fetch
updated = await Version.query(trx)
.updateAndFetchById(versionId, { isLatest: true });
.updateAndFetchById(current.id, { isLatest: true });
}
await service.removeDuplicateLatest(versionId, current.objectId, trx);
// set other versions in COMS db to isLatest=false
await service.removeDuplicateLatest(current.id, current.objectId, trx);

if (!etrx) await trx.commit();
return Promise.resolve(updated ?? current);
Expand Down
2 changes: 1 addition & 1 deletion app/tests/unit/controllers/object.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ describe('deleteObject', () => {

await controller.deleteObject(req, res, next);
expect(versionDeleteSpy).toHaveBeenCalledTimes(1);
expect(versionDeleteSpy).toHaveBeenCalledWith('xyz-789', '123', '456');
expect(versionDeleteSpy).toHaveBeenCalledWith('xyz-789', '123');
});

it('should delete object if object has no other remaining versions', async () => {
Expand Down
43 changes: 25 additions & 18 deletions app/tests/unit/services/sync.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ jest.mock('../../../src/db/models/tables/objectModel', () => ({

const versionTrx = trxBuilder();
jest.mock('../../../src/db/models/tables/version', () => ({
delete: jest.fn(),
query: jest.fn(),
startTransaction: jest.fn(),
then: jest.fn()
then: jest.fn(),
where: jest.fn(),
whereIn: jest.fn(),
whereNotNull: jest.fn(),
}));

const {
Expand Down Expand Up @@ -430,7 +435,6 @@ describe('syncObject', () => {

describe('syncVersions', () => {
const createSpy = jest.spyOn(versionService, 'create');
const deleteSpy = jest.spyOn(versionService, 'delete');
const listSpy = jest.spyOn(versionService, 'list');
const listAllObjectVersionsSpy = jest.spyOn(storageService, 'listAllObjectVersions');
const readSpy = jest.spyOn(objectService, 'read');
Expand All @@ -445,7 +449,6 @@ describe('syncVersions', () => {

beforeEach(() => {
createSpy.mockReset();
deleteSpy.mockReset();
headObjectSpy.mockReset();
listSpy.mockReset();
listAllObjectVersionsSpy.mockReset();
Expand All @@ -456,7 +459,6 @@ describe('syncVersions', () => {

afterAll(() => {
createSpy.mockRestore();
deleteSpy.mockRestore();
listSpy.mockRestore();
listAllObjectVersionsSpy.mockRestore();
readSpy.mockRestore();
Expand Down Expand Up @@ -484,7 +486,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(2);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand Down Expand Up @@ -522,7 +524,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(2);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand Down Expand Up @@ -564,7 +566,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(1);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand All @@ -585,7 +587,7 @@ describe('syncVersions', () => {

it('should update existing version if mimeType has changed', async () => {
headObjectSpy.mockResolvedValue({ ContentType: 'application/octet-stream' });
listSpy.mockResolvedValue([{ etag: 'etag', mimeType: 'text/plain' }]);
listSpy.mockResolvedValue([{ etag: 'etag', mimeType: 'text/plain', s3VersionId: null }]);
listAllObjectVersionsSpy.mockResolvedValue({
DeleteMarkers: [],
Versions: [{ ETag: 'etag', IsLatest: true, VersionId: 'null' }]
Expand All @@ -604,7 +606,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(0);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand All @@ -625,7 +627,7 @@ describe('syncVersions', () => {

it('should update existing version if etag has changed', async () => {
headObjectSpy.mockResolvedValue({ ContentType: 'application/octet-stream' });
listSpy.mockResolvedValue([{ etag: 'old', mimeType: 'application/octet-stream' }]);
listSpy.mockResolvedValue([{ etag: 'old', mimeType: 'application/octet-stream', s3VersionId: null }]);
listAllObjectVersionsSpy.mockResolvedValue({
DeleteMarkers: [],
Versions: [{ ETag: 'new', IsLatest: true, VersionId: 'null' }]
Expand All @@ -644,7 +646,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(0);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand All @@ -665,7 +667,7 @@ describe('syncVersions', () => {

it('should update nothing when version record not modified', async () => {
headObjectSpy.mockResolvedValue({ ContentType: 'application/octet-stream' });
listSpy.mockResolvedValue([{ etag: 'etag', mimeType: 'application/octet-stream' }]);
listSpy.mockResolvedValue([{ etag: 'etag', mimeType: 'application/octet-stream', s3VersionId: null }]);
listAllObjectVersionsSpy.mockResolvedValue({
DeleteMarkers: [],
Versions: [{ ETag: 'etag', IsLatest: true, VersionId: 'null' }]
Expand All @@ -683,7 +685,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(0);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand All @@ -707,7 +709,12 @@ describe('syncVersions', () => {
it('should drop COMS versions that are not in S3', async () => {
createSpy.mockResolvedValue({});
headObjectSpy.mockResolvedValue({});
listSpy.mockResolvedValue([{ s3VersionId: validUuidv4 }]);
// extra versions in coms to delete
listSpy.mockResolvedValue([
{ s3VersionId: validUuidv4 },
{ s3VersionId: validUuidv4 },
{ s3VersionId: validUuidv4 }
]);
listAllObjectVersionsSpy.mockResolvedValue({ DeleteMarkers: [{}], Versions: [{}] });

const result = await service.syncVersions(comsObject);
Expand All @@ -722,8 +729,8 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(2);
expect(deleteSpy).toHaveBeenCalledTimes(1);
expect(deleteSpy).toHaveBeenCalledWith(comsObject.id, validUuidv4, expect.any(String), expect.any(Object));
expect(Version.delete).toHaveBeenCalledTimes(1);

expect(headObjectSpy).toHaveBeenCalledTimes(1);
expect(headObjectSpy).toHaveBeenCalledWith(expect.objectContaining({
filePath: comsObject.path,
Expand Down Expand Up @@ -764,7 +771,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(1);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(0);
expect(listSpy).toHaveBeenCalledTimes(1);
expect(listSpy).toHaveBeenCalledWith(validUuidv4, expect.any(Object));
Expand Down Expand Up @@ -800,7 +807,7 @@ describe('syncVersions', () => {

expect(Version.startTransaction).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledTimes(1);
expect(deleteSpy).toHaveBeenCalledTimes(0);
expect(Version.delete).toHaveBeenCalledTimes(0);
expect(headObjectSpy).toHaveBeenCalledTimes(0);
expect(listSpy).toHaveBeenCalledTimes(1);
expect(listSpy).toHaveBeenCalledWith(validUuidv4, expect.any(Object));
Expand Down

0 comments on commit 8e4f854

Please sign in to comment.