Skip to content

Commit

Permalink
Merge pull request #577 from pdowler/master
Browse files Browse the repository at this point in the history
fix so DataNodeSizeWorker iterator query so it uses an index
  • Loading branch information
pdowler committed Apr 10, 2024
2 parents 01b6866 + 2beb148 commit c95786f
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cadc-inventory-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '1.0.1'
version = '1.0.2'

description = 'OpenCADC Storage Inventory database library'
def git_url = 'https://github.com/opencadc/storage-inventory'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ public void testArtifactIterator() {
log.info("count vs Namespace incremental from start...");
DateFormat df = DateUtil.getDateFormat(DateUtil.IVOA_DATE_FORMAT, DateUtil.UTC);
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, startDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, startDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand All @@ -1118,7 +1118,7 @@ public void testArtifactIterator() {

log.info("count vs Namespace incremental from mid...");
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, midDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, midDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand All @@ -1130,7 +1130,7 @@ public void testArtifactIterator() {

log.info("count vs Namespace incremental from end...");
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, endDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, endDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.opencadc.inventory.Artifact;
import org.opencadc.vospace.db.DataNodeSizeWorker;
import org.opencadc.inventory.Namespace;
import org.opencadc.inventory.StorageLocation;
import org.opencadc.inventory.db.ArtifactDAO;
import org.opencadc.inventory.db.HarvestState;
import org.opencadc.inventory.db.HarvestStateDAO;
Expand All @@ -106,7 +107,7 @@ public class DataNodeSizeWorkerTest {

static {
Log4jInit.setLevel("org.opencadc.inventory", Level.INFO);
Log4jInit.setLevel("org.opencadc.inventory.db", Level.INFO);
Log4jInit.setLevel("org.opencadc.inventory.db", Level.DEBUG);
Log4jInit.setLevel("ca.nrc.cadc.db", Level.INFO);
Log4jInit.setLevel("org.opencadc.vospace", Level.INFO);
Log4jInit.setLevel("org.opencadc.vospace.db", Level.INFO);
Expand Down Expand Up @@ -180,7 +181,16 @@ public void init_cleanup() throws Exception {
}

@Test
public void testSyncArtifact() throws Exception {
public void testSyncArtifactSite() throws Exception {
testSyncArtifact(true);
}

@Test
public void testSyncArtifactGlobal() throws Exception {
testSyncArtifact(false);
}

private void testSyncArtifact(boolean isStorageSite) throws Exception {
UUID rootID = new UUID(0L, 0L);
ContainerNode root = new ContainerNode(rootID, "root");

Expand All @@ -206,11 +216,15 @@ public void testSyncArtifact() throws Exception {
URI.create("md5:d41d8cd98f00b204e9800998ecf8427e"),
new Date(),
666L);
if (isStorageSite) {
expected.storageLocation = new StorageLocation(URI.create("id:" + UUID.randomUUID().toString()));
expected.storageLocation.storageBucket = "X";
}
log.info("expected: " + expected);

artifactDAO.put(expected);
Artifact actualArtifact = artifactDAO.get(expected.getID());
Assert.assertNotNull(actual);
Assert.assertNotNull(actualArtifact);
Assert.assertEquals(expected.getContentLength(), actualArtifact.getContentLength());

String hsName = "ArtifactSize";
Expand All @@ -219,25 +233,36 @@ public void testSyncArtifact() throws Exception {
harvestStateDAO.put(hs);
hs = harvestStateDAO.get(hsName, resourceID);

DataNodeSizeWorker asWorker = new DataNodeSizeWorker(harvestStateDAO, hs, artifactDAO, siNamespace);
DataNodeSizeWorker asWorker = new DataNodeSizeWorker(harvestStateDAO, hs, artifactDAO, siNamespace, isStorageSite);
log.info("*** DataNodeSizeWorker START");
asWorker.run();
log.info("*** DataNodeSizeWorker DONE");

actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertNotNull(actual);
log.info("found: " + actual.getID() + " aka " + actual);
Assert.assertEquals(expected.getContentLength(), actual.bytesUsed);

Thread.sleep(100L);

// update the artifact only
artifactDAO.delete(actualArtifact.getID());
expected = new Artifact(expected.getURI(), expected.getMetaChecksum(), new Date(), 333L);
artifactDAO.put(expected);
Artifact modified = new Artifact(expected.getURI(), expected.getMetaChecksum(), new Date(), 333L);
if (isStorageSite) {
modified.storageLocation = new StorageLocation(URI.create("id:" + UUID.randomUUID().toString()));
modified.storageLocation.storageBucket = "X";
}
artifactDAO.put(modified);
actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertNotEquals(expected.getContentLength(), actual.bytesUsed);
Assert.assertNotEquals(modified.getContentLength(), actual.bytesUsed);

// run the update
log.info("*** DataNodeSizeWorker START");
asWorker.run();
log.info("*** DataNodeSizeWorker DONE");

actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertEquals(expected.getContentLength(), actual.bytesUsed);
Assert.assertEquals(modified.getContentLength(), actual.bytesUsed);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public ResourceIterator<Artifact> storedIterator(String storageBucketPrefix) {
SQLGenerator.ArtifactIteratorQuery iter = (SQLGenerator.ArtifactIteratorQuery) gen.getEntityIteratorQuery(Artifact.class);
iter.setStorageLocationRequired(true);
iter.setStorageBucket(storageBucketPrefix);
iter.setOrderedOutput(true);
iter.setOrderByStorageLocation(true);
return iter.query(dataSource);
} catch (BadSqlGrammarException ex) {
handleInternalFail(ex);
Expand Down Expand Up @@ -250,31 +250,33 @@ public ResourceIterator<Artifact> iterator(String uriBucketPrefix, boolean order
* conditions on fields of the Artifact using the field names for column references.
* Example: <code>uri like 'ad:bar/%'</code>. The result is currently not ordered.
*
* <p>Use case: local cleanup by arbitrary criteria
* <p>Use case: local cleanup by ringhold
*
* @param ns namespace for selecting artifacts
* @param uriBucketPrefix null, prefix, or complete Artifact.uriBucket string
* @param ordered order by Artifact.uri (true) or not ordered (false)
* @return iterator over artifacts matching criteria
*/
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix, boolean ordered) {
return iterator(ns, uriBucketPrefix, null, ordered);
return iterator(ns, uriBucketPrefix, null, ordered, null);
}

/**
* Iterate over artifacts that match criteria. This method adds an optional Date argument to
* Iterate over artifacts in a specific namespace.This method adds an optional Date argument to
* support incremental processing. In this case, ordered will be in timestamp order rather than
* uri order.
*
* <p>Use case: process artifact events directly in the database
* <p>Use case: process artifact events directly in the database (DataNodeSizeWorker)
*
* @param ns namespace for selecting artifacts
* @param uriBucketPrefix null, prefix, or complete Artifact.uriBucket string
* @param minLastModified minimum Artifact.lastModified to consider (incremental mode)
* @param ordered order by Artifact.uri (true) or not ordered (false)
* @return iterator over artifacts matching criteria
* @param minLastModified minimum Artifact.lastModified to consider (incremental mode), null for all
* @param ordered order by Artifact.lastModified (true) or not ordered (false)
* @param isStored true to select stored artifacts, false for unstored, null for all
* @return iterator over artifacts
*/
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix, Date minLastModified, boolean ordered) {
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix,
Date minLastModified, boolean ordered, Boolean isStored) {
checkInit();
long t = System.currentTimeMillis();

Expand All @@ -284,6 +286,7 @@ public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix,
iter.setNamespace(ns);
iter.setMinLastModified(minLastModified);
iter.setOrderedOutput(ordered);
iter.setStorageLocationRequired(isStored);
return iter.query(dataSource);
} catch (BadSqlGrammarException ex) {
handleInternalFail(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ class ArtifactIteratorQuery implements EntityIteratorQuery<Artifact> {
private Namespace namespace;
private Date minLastModified;
private boolean ordered;
private boolean orderByStorageLocation;

private final Calendar utc = Calendar.getInstance(DateUtil.UTC);

Expand Down Expand Up @@ -730,6 +731,10 @@ public void setOrderedOutput(boolean ordered) {
this.ordered = ordered;
}

public void setOrderByStorageLocation(boolean obsl) {
this.orderByStorageLocation = obsl;
}

public void setSiteID(UUID siteID) {
this.siteID = siteID;
}
Expand All @@ -752,7 +757,7 @@ public ResourceIterator<Artifact> query(DataSource ds) {
// null storageBucket to come after non-null
// postgresql: the default order is equivalent to explicitly specifying ASC NULLS LAST
// default behaviour may not be db-agnostic
if (ordered) {
if (orderByStorageLocation) {
sb.append(" ORDER BY storageLocation_storageBucket, storageLocation_storageID");
}
} else if (storageLocationRequired != null && !storageLocationRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class DataNodeSizeWorker implements Runnable {
private final ArtifactDAO artifactDAO;
private final HarvestStateDAO harvestStateDAO;
private final Namespace storageNamespace;
private boolean isStorageSite;

private long numArtifactsProcessed;

Expand All @@ -111,12 +112,13 @@ public class DataNodeSizeWorker implements Runnable {
* @param namespace artifact namespace
*/
public DataNodeSizeWorker(HarvestStateDAO harvestStateDAO, HarvestState harvestState,
ArtifactDAO artifactDAO, Namespace namespace) {
ArtifactDAO artifactDAO, Namespace namespace, boolean isStorageSite) {
this.harvestState = harvestState;
this.harvestStateDAO = harvestStateDAO;
this.nodeDAO = new NodeDAO(harvestStateDAO);
this.artifactDAO = artifactDAO;
this.storageNamespace = namespace;
this.isStorageSite = isStorageSite;
}

public long getNumArtifactsProcessed() {
Expand Down Expand Up @@ -147,7 +149,7 @@ public void run() {
}

String uriBucket = null; // process all artifacts in a single thread
try (final ResourceIterator<Artifact> iter = artifactDAO.iterator(storageNamespace, uriBucket, startTime, true)) {
try (final ResourceIterator<Artifact> iter = artifactDAO.iterator(storageNamespace, uriBucket, startTime, true, isStorageSite)) {
TransactionManager tm = nodeDAO.getTransactionManager();
while (iter.hasNext()) {
Artifact artifact = iter.next();
Expand Down
19 changes: 14 additions & 5 deletions vault/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ specification designed to co-exist with other storage-inventory components. It p
organization layer on top of the storage management of storage-inventory.

The simplest configuration would be to deploy `vault` with `minoc` with a single metadata database and single
back end storage system. Details: TBD.
back end storage system. This is _isStoragerSite = true_ below.

The other option would be to deploy `vault` with `raven` and `luskan` in a global inventory database and make
use of one or more of the network of known storage sites to store files. Details: TBD.
use of one or more of the network of known storage sites to store files. This is _isStoragerSite = false_ below.
Details: TBD.

## deployment

Expand Down Expand Up @@ -78,6 +79,9 @@ A vault.properties file in /config is required to run this service. The followi
# service identity
org.opencadc.vault.resourceID = ivo://{authority}/{name}
# associated inventory type
org.opencadc.vault.inventory.isStorageSite = true|false
# consistency settings
org.opencadc.vault.consistency.preventNotFound=true|false
Expand All @@ -97,11 +101,16 @@ org.opencadc.vault.storage.namespace = {a storage inventory namespace to use}
```
The vault _resourceID_ is the resourceID of _this_ vault service.

the _isStorageSite_ key tells `vault` whether the associated inventory database is a storage
site or a global database. This effects the behaviour and performance of the background thread that
syncs Artifact.contentLength values to the DataNode.bytesUsed field.

The _preventNotFound_ key can be used to configure `vault` to prevent artifact-not-found errors that might
result due to the eventual consistency nature of the storage system by directly checking for the artifact at
_all known_ sites. It only makes sense to enable this when `vault` is running in a global inventory (along with
`raven` and/or `fenwick` instances syncing artifact metadata. This feature introduces an overhead for the
genuine not-found cases: transfer negotiation to GET the file that was never PUT.
_all known_ sites. It makes sense to enable this when `vault` is running in a global inventory (along with
`raven` and/or `fenwick` instances syncing artifact metadata, but even in a single _storage site_ deployment
this also helps hide some temporary inconsistencies. This feature introduces an overhead for the
genuine not-found cases: trying to GET the file that was never successfuly PUT but the DataNode was created.

The _allocationParent_ is a path to a container node (directory) which contains space allocations. An allocation
is owned by a user (usually different from the _rootOwner_ admin user) who is responsible for the allocation
Expand Down
14 changes: 13 additions & 1 deletion vault/src/main/java/org/opencadc/vault/VaultInitAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class VaultInitAction extends InitAction {
private static final String VAULT_KEY = "org.opencadc.vault";
static final String RESOURCE_ID_KEY = VAULT_KEY + ".resourceID";
static final String PREVENT_NOT_FOUND_KEY = VAULT_KEY + ".consistency.preventNotFound";
static final String IS_STORAGE_SITE_KEY = VAULT_KEY + ".inventory.isStorageSite";
static final String INVENTORY_SCHEMA_KEY = VAULT_KEY + ".inventory.schema";
static final String VOSPACE_SCHEMA_KEY = VAULT_KEY + ".vospace.schema";
static final String SINGLE_POOL_KEY = VAULT_KEY + ".singlePool";
Expand Down Expand Up @@ -201,6 +202,15 @@ static MultiValuedProperties getConfig() {
sb.append("OK");
}

String iss = mvp.getFirstPropertyValue(IS_STORAGE_SITE_KEY);
sb.append("\n\t" + IS_STORAGE_SITE_KEY + ": ");
if (iss == null) {
sb.append("MISSING");
ok = false;
} else {
sb.append("OK");
}

String pnf = mvp.getFirstPropertyValue(PREVENT_NOT_FOUND_KEY);
sb.append("\n\t" + PREVENT_NOT_FOUND_KEY + ": ");
if (pnf == null) {
Expand Down Expand Up @@ -479,8 +489,10 @@ private void initBackgroundWorkers() {
offline = true;
}

boolean isStorageSite = Boolean.parseBoolean(props.getFirstPropertyValue(IS_STORAGE_SITE_KEY));

terminateBackgroundWorkers();
DataNodeSizeSync async = new DataNodeSizeSync(hsDAO, artifactDAO, storageNamespace);
DataNodeSizeSync async = new DataNodeSizeSync(hsDAO, artifactDAO, storageNamespace, isStorageSite);
async.setOffline(offline);
this.dataNodeSizeSyncThread = new Thread(async);
dataNodeSizeSyncThread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ public class DataNodeSizeSync implements Runnable {
private final HarvestStateDAO stateDAO;
private final ArtifactDAO artifactDAO;
private final Namespace artifactNamespace;
private final boolean isStorageSite;

private boolean offline = false;

public DataNodeSizeSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace artifactNamespace) {
public DataNodeSizeSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace artifactNamespace, boolean isStorageSite) {
this.stateDAO = stateDAO;
this.artifactDAO = artifactDAO;
this.artifactNamespace = artifactNamespace;
this.isStorageSite = isStorageSite;

// we need continuous timestamp updates to retain leader status, so only schedule maintenance
stateDAO.setUpdateBufferCount(0);
Expand Down Expand Up @@ -153,7 +155,7 @@ public void run() {
boolean fail = false;
log.info(logInfo.start());
try {
DataNodeSizeWorker worker = new DataNodeSizeWorker(stateDAO, state, artifactDAO, artifactNamespace);
DataNodeSizeWorker worker = new DataNodeSizeWorker(stateDAO, state, artifactDAO, artifactNamespace, isStorageSite);
worker.run();
logInfo.setLastModified(state.curLastModified);
logInfo.processed = worker.getNumArtifactsProcessed();
Expand Down

0 comments on commit c95786f

Please sign in to comment.