Skip to content

Commit

Permalink
[2.6.x] Concurrent artifact creation test & fixes (#5169)
Browse files Browse the repository at this point in the history
* Created a basic test for creating artifacts concurrently

* Remove unused imports

* Changes to artifact creation to better support concurrent creates

* Fixed some unused imports and added another test

* Fix KafkaSQL for concurrent creates
  • Loading branch information
EricWittmann committed Sep 17, 2024
1 parent 47c7e7a commit 017fef7
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 227 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void importContent(ContentEntity entity) {

// When we do not want to preserve contentId, the best solution to import content is create new one with the contentBytes
// It makes sure there won't be any conflicts
long newContentId = getRegistryStorage().createOrUpdateContent(getHandle(), ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
getRegistryStorage().ensureContent(ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
long newContentId = getRegistryStorage().getContentIdByHash(entity.contentHash);

getContentIdMapping().put(entity.contentId, newContentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class H2SqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public H2SqlStatements() {
}
Expand Down Expand Up @@ -55,21 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_name = 'APICURIO'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences) VALUES (?, ?, ?, ?, ?, ?)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand All @@ -83,38 +74,43 @@ public String upsertLogConfiguration() {
*/
@Override
public String getNextSequenceValue() {
return "UPDATE sequences sa SET seq_value = (SELECT sb.seq_value + 1 FROM sequences sb WHERE sb.tenantId = sa.tenantId AND sb.name = sa.name) WHERE sa.tenantId = ? AND sa.name = ?";
throw new RuntimeException("Not supported for H2: getNextSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#selectCurrentSequenceValue()
*/
@Override
public String selectCurrentSequenceValue() {
return "SELECT seq_value FROM sequences WHERE name = ? AND tenantId = ? ";
throw new RuntimeException("Not supported for H2: selectCurrentSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#resetSequenceValue()
*/
@Override
public String resetSequenceValue() {
return "MERGE INTO sequences (tenantId, name, seq_value) KEY (tenantId, name) VALUES(?, ?, ?)";
throw new RuntimeException("Not supported for H2: resetSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#insertSequenceValue()
*/
@Override
public String insertSequenceValue() {
return "INSERT INTO sequences (tenantId, name, seq_value) VALUES (?, ?, ?)";
throw new RuntimeException("Not supported for H2: insertSequenceValue");
}

/**
* @see SqlStatements#upsertReference()
*/
@Override
public String upsertReference() {
return "INSERT INTO artifactreferences (tenantId, contentId, groupId, artifactId, version, name) VALUES (?, ?, ?, ?, ?, ?)";
return "MERGE INTO artifactreferences AS target" +
" USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, groupId, artifactId, version, name)" +
" ON (target.tenantId = source.tenantId AND target.contentId = source.contentId AND target.name = source.name)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, contentId, groupId, artifactId, version, name)" +
" VALUES (source.tenantId, source.contentId, source.groupId, source.artifactId, source.version, source.name)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,6 @@ public String isDatabaseInitialized() {
);
}

/**
* @see SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"INSERT IGNORE INTO content",
"(tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (?, ?, ?, ?, ?, ?);"
);
}

/**
* @see SqlStatements#upsertReference()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class PostgreSQLSqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public PostgreSQLSqlStatements() {
}
Expand Down Expand Up @@ -55,21 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (tenantId, contentHash) DO NOTHING";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class SQLServerSqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public SQLServerSqlStatements() {
}
Expand Down Expand Up @@ -55,27 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"MERGE INTO content AS target",
"USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"ON (target.tenantId = source.tenantId AND target.contentHash = source.contentHash)",
"WHEN NOT MATCHED THEN",
"INSERT (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (source.tenantId, source.contentId, source.canonicalHash, source.contentHash, source.content, source.artifactreferences);");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ public interface SqlStatements {

/**
* Returns true if the given exception represents a primary key violation.
*
* @param error
*/
public boolean isPrimaryKeyViolation(Exception error);

/**
* Returns true if the given exception represents a foreign key violation.
*
* @param error
*/
public boolean isForeignKeyViolation(Exception error);

Expand All @@ -57,9 +53,6 @@ public interface SqlStatements {

/**
* A sequence of statements needed to upgrade the DB from one version to another.
*
* @param fromVersion
* @param toVersion
*/
public List<String> databaseUpgrade(int fromVersion, int toVersion);

Expand Down Expand Up @@ -189,11 +182,6 @@ public interface SqlStatements {
*/
public String selectArtifactContentIds();

/**
* A statement to "upsert" a row in the "content" table.
*/
public String upsertContent();

/**
* A statement to update canonicalHash value in a row in the "content" table
*/
Expand Down Expand Up @@ -652,5 +640,4 @@ public interface SqlStatements {
* A statement used to select all version #s for a given artifactId.
*/
public String selectArtifactVersionsSkipDisabled();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package io.apicurio.registry.noprofile.rest.v2;

import io.apicurio.registry.AbstractResourceTestBase;
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
import io.apicurio.registry.rest.v2.beans.ArtifactSearchResults;
import io.apicurio.registry.rest.v2.beans.SortBy;
import io.apicurio.registry.rest.v2.beans.SortOrder;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.tests.TestUtils;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

@QuarkusTest
public class ConcurrentCreateTest extends AbstractResourceTestBase {

@Test
public void testMultipleArtifacts() throws Exception {
String oaiArtifactContent = resourceToString("openapi-empty.json");
String groupId = TestUtils.generateGroupId();

Set<String> created = new HashSet<>();
Set<String> failed = new HashSet<>();
CountDownLatch latch = new CountDownLatch(5);

// Create artifacts
for (int i = 0; i < 5; i++) {
final int forkId = i;
TestUtils.fork(() -> {
String artifactId = "artifact-" + forkId;
System.out.println("[Fork-" + forkId + "] Starting");
System.out.println("[Fork-" + forkId + "] Artifact ID: " + artifactId);
try {
InputStream data = new ByteArrayInputStream(oaiArtifactContent.getBytes());

// Create the artifact
ArtifactMetaData amd = clientV2.createArtifact(groupId, artifactId, ArtifactType.OPENAPI, data);
System.out.println("[Fork-" + forkId + "] Artifact created.");
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

// Fetch the artifact and make sure it really got created.
amd = clientV2.getArtifactMetaData(groupId, artifactId);
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

System.out.println("[Fork-" + forkId + "] Completed successfully.");
created.add(artifactId);
} catch (Exception e) {
System.out.println("[Fork-" + forkId + "] FAILED: " + e.getMessage());
failed.add(artifactId);
}
latch.countDown();
});
}

latch.await();

Assertions.assertEquals(0, failed.size());
Assertions.assertEquals(5, created.size());

ArtifactSearchResults results = clientV2.searchArtifacts(groupId, null, null, null, null, SortBy.createdOn, SortOrder.asc, 0, 100);
Assertions.assertNotNull(results);
Assertions.assertEquals(5, results.getCount());

results = clientV2.listArtifactsInGroup(groupId);
Assertions.assertNotNull(results);
Assertions.assertEquals(5, results.getCount());

}

@Test
public void testSameArtifact() throws Exception {
String oaiArtifactContent = resourceToString("openapi-empty.json");
String groupId = TestUtils.generateGroupId();

Set<String> created = new HashSet<>();
Set<String> failed = new HashSet<>();
CountDownLatch latch = new CountDownLatch(5);

// Try to create the SAME artifact 5 times.
for (int i = 0; i < 5; i++) {
final int forkId = i;
TestUtils.fork(() -> {
String artifactId = "test-artifact";
System.out.println("[Fork-" + forkId + "] Starting");
try {
InputStream data = new ByteArrayInputStream(oaiArtifactContent.getBytes());

// Create the artifact
ArtifactMetaData amd = clientV2.createArtifact(groupId, artifactId, ArtifactType.OPENAPI, data);
System.out.println("[Fork-" + forkId + "] Artifact created.");
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

// Fetch the artifact and make sure it really got created.
amd = clientV2.getArtifactMetaData(groupId, artifactId);
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

System.out.println("[Fork-" + forkId + "] Completed successfully.");
created.add("" + forkId);
} catch (Exception e) {
System.out.println("[Fork-" + forkId + "] FAILED: " + e.getMessage());
failed.add("" + forkId);
}
latch.countDown();
});
}

latch.await();

Assertions.assertEquals(4, failed.size());
Assertions.assertEquals(1, created.size());

ArtifactSearchResults results = clientV2.searchArtifacts(groupId, null, null, null, null, SortBy.createdOn, SortOrder.asc, 0, 100);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());

results = clientV2.listArtifactsInGroup(groupId);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());

}

}
Loading

0 comments on commit 017fef7

Please sign in to comment.