From fd4be1218b2abab11ceb39fcf3e6f5283cfcca89 Mon Sep 17 00:00:00 2001 From: Henry Hughes Date: Mon, 12 Jun 2023 15:04:59 -0700 Subject: [PATCH] JAVA-3062: Figure out a better solution for PreparedStatementIT tests around JAVA-3058 PreparedStatementIT.java: - Make tests resistant to JVM GC clearing items from prepared statement cache mid-test PreparedStatementCachingIT.java: - Move to IsolatedTests category because it uses system properties - Consolidate to single invalidationResultSetTest method - Verify exact set of types change events seen - Best-effort check no duplicated type-change/cache-removal events were fired SessionUtils.java - SESSION_BUILDER_CLASS_PROPERTY property should be read dynamically --- .../core/cql/PreparedStatementCachingIT.java | 254 ++++++++---------- .../driver/core/cql/PreparedStatementIT.java | 35 ++- .../api/testinfra/session/SessionUtils.java | 18 +- 3 files changed, 155 insertions(+), 152 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index c7720417e43..f29995386c7 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -27,10 +27,10 @@ import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric; import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; import com.datastax.oss.driver.api.core.session.SessionBuilder; -import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; +import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; -import com.datastax.oss.driver.categories.ParallelizableTests; +import com.datastax.oss.driver.categories.IsolatedTests; import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor; import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor; @@ -38,22 +38,25 @@ import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors; import com.datastax.oss.driver.internal.core.session.RequestProcessor; import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry; -import com.datastax.oss.driver.shaded.guava.common.cache.Cache; import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder; +import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.NonNull; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Function; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -64,12 +67,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category(ParallelizableTests.class) +// These tests must be isolated because setup modifies SessionUtils.SESSION_BUILDER_CLASS_PROPERTY +@Category(IsolatedTests.class) public class PreparedStatementCachingIT { - private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class); - - private CcmRule ccmRule = CcmRule.getInstance(); + private CustomCcmRule ccmRule = CustomCcmRule.builder().build(); private SessionRule sessionRule = SessionRule.builder(ccmRule) @@ -93,7 +95,7 @@ public PreparedStatementRemovalEvent(ByteBuffer queryId) { @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (o == null || !(o instanceof PreparedStatementRemovalEvent)) return false; PreparedStatementRemovalEvent that = (PreparedStatementRemovalEvent) o; return Objects.equals(queryId, that.queryId); } @@ -114,42 +116,30 @@ private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcess private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.TestCqlPrepareAsyncProcessor.class); - private static Function< - Optional, - Cache>> - buildCache = - (contextOption) -> { - - // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so - // to prevent cache - // entries from unexpectedly disappearing mid-test. - CacheBuilder builder = CacheBuilder.newBuilder(); - contextOption.ifPresent( - (ctx) -> { - builder.removalListener( - (evt) -> { - try { - - CompletableFuture future = - (CompletableFuture) evt.getValue(); - ByteBuffer queryId = - Uninterruptibles.getUninterruptibly(future).getId(); - ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)); - } catch (Exception e) { - LOG.error("Unable to register removal handler", e); - } - }); - }); - return builder.build(); - }; - - public TestCqlPrepareAsyncProcessor(@NonNull Optional context) { - super(TestCqlPrepareAsyncProcessor.buildCache.apply(context), context); + private static RemovalListener> + buildCacheRemoveCallback(@NonNull Optional context) { + return (evt) -> { + try { + CompletableFuture future = evt.getValue(); + ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); + context.ifPresent( + ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); + } catch (Exception e) { + LOG.error("Unable to register removal handler", e); + } + }; + } + + public TestCqlPrepareAsyncProcessor(@NonNull Optional context) { + // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so + // to prevent cache entries from unexpectedly disappearing mid-test. + super( + CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context)).build(), + context); } } private static class TestDefaultDriverContext extends DefaultDriverContext { - public TestDefaultDriverContext( DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) { super(configLoader, programmaticArguments); @@ -157,6 +147,7 @@ public TestDefaultDriverContext( @Override protected RequestProcessorRegistry buildRequestProcessorRegistry() { + // Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong prepared statement cache, see JAVA-3062 List> processors = BuiltInRequestProcessors.createDefaultProcessors(this); processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor); @@ -171,8 +162,6 @@ protected RequestProcessorRegistry buildRequestProcessorRegistry() { private static class TestSessionBuilder extends SessionBuilder { - private final Logger LOG = LoggerFactory.getLogger(TestSessionBuilder.class); - @Override protected Object wrap(@NonNull CqlSession defaultSession) { return defaultSession; @@ -200,116 +189,106 @@ public static SessionBuilder builder() { return new TestSessionBuilder(); } - private void invalidationResultSetTest(Consumer createFn) { - - try (CqlSession session = sessionWithCacheSizeMetric()) { - - assertThat(getPreparedCacheSize(session)).isEqualTo(0); - createFn.accept(session); - - session.prepare("select f from test_table_1 where e = ?"); - ByteBuffer queryId2 = session.prepare("select h from test_table_2 where g = ?").getId(); - assertThat(getPreparedCacheSize(session)).isEqualTo(2); - - CountDownLatch latch = new CountDownLatch(1); - DefaultDriverContext ctx = (DefaultDriverContext) session.getContext(); - AtomicReference> changeRef = new AtomicReference<>(Optional.empty()); - AtomicReference> idRef = new AtomicReference<>(Optional.empty()); - ctx.getEventBus() - .register( - TypeChangeEvent.class, - (e) -> { - if (!changeRef.compareAndSet( - Optional.empty(), Optional.of(e.oldType.getName().toString()))) - // TODO: Note that we actually do see this error for tests around nested UDTs. - // What's happening is that - // we see an event for the changed type itself and then we get an event for the - // nesting type that contains - // it. Upshot is that this logic is dependent on the order of event delivery; as - // long as the event for the - // type itself is first this test will behave as expected. - // - // We probably want something more robust here. - LOG.error("Unable to set reference for type change event " + e); - }); - ctx.getEventBus() - .register( - PreparedStatementRemovalEvent.class, - (e) -> { - if (!idRef.compareAndSet(Optional.empty(), Optional.of(e.queryId))) - LOG.error("Unable to set reference for PS removal event"); - latch.countDown(); - }); - - session.execute("ALTER TYPE test_type_2 add i blob"); - Uninterruptibles.awaitUninterruptibly(latch, 2, TimeUnit.SECONDS); + private void invalidationResultSetTest( + Consumer setupTestSchema, Set expectedChangedTypes) { + invalidationTestInner( + setupTestSchema, + "select f from test_table_1 where e = ?", + "select h from test_table_2 where g = ?", + expectedChangedTypes); + } - /* Okay, the latch triggered so cache processing should now be done. Let's validate :allthethings: */ - assertThat(changeRef.get()).isNotEmpty(); - assertThat(changeRef.get().get()).isEqualTo("test_type_2"); - assertThat(idRef.get()).isNotEmpty(); - assertThat(idRef.get().get()).isEqualTo(queryId2); - assertThat(getPreparedCacheSize(session)).isEqualTo(1); - } + private void invalidationVariableDefsTest( + Consumer setupTestSchema, + boolean isCollection, + Set expectedChangedTypes) { + String condition = isCollection ? "contains ?" : "= ?"; + invalidationTestInner( + setupTestSchema, + String.format("select e from test_table_1 where f %s allow filtering", condition), + String.format("select g from test_table_2 where h %s allow filtering", condition), + expectedChangedTypes); } - private void invalidationVariableDefsTest(Consumer createFn, boolean isCollection) { + private void invalidationTestInner( + Consumer setupTestSchema, + String preparedStmtQueryType1, + String preparedStmtQueryType2, + Set expectedChangedTypes) { - /* TODO: There's a lot more infrastructure in this test now, which means we're duplicating a lot of setup with - * the ResultSet test above. Probably worth while to see if we can merge the two. */ try (CqlSession session = sessionWithCacheSizeMetric()) { assertThat(getPreparedCacheSize(session)).isEqualTo(0); - createFn.accept(session); - - String fStr = isCollection ? "f contains ?" : "f = ?"; - session.prepare(String.format("select e from test_table_1 where %s allow filtering", fStr)); - String hStr = isCollection ? "h contains ?" : "h = ?"; - ByteBuffer queryId2 = - session - .prepare(String.format("select g from test_table_2 where %s allow filtering", hStr)) - .getId(); + setupTestSchema.accept(session); + + session.prepare(preparedStmtQueryType1); + ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId(); assertThat(getPreparedCacheSize(session)).isEqualTo(2); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); + CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); + DefaultDriverContext ctx = (DefaultDriverContext) session.getContext(); - AtomicReference> changeRef = new AtomicReference<>(Optional.empty()); - AtomicReference> idRef = new AtomicReference<>(Optional.empty()); + Map changedTypes = new ConcurrentHashMap<>(); + AtomicReference> removedQueryIds = + new AtomicReference<>(Optional.empty()); + AtomicReference> typeChangeEventError = + new AtomicReference<>(Optional.empty()); + AtomicReference> removedQueryEventError = + new AtomicReference<>(Optional.empty()); ctx.getEventBus() .register( TypeChangeEvent.class, (e) -> { - if (!changeRef.compareAndSet( - Optional.empty(), Optional.of(e.oldType.getName().toString()))) - // TODO: Note that we actually do see this error for tests around nested UDTs. - // What's happening is that - // we see an event for the changed type itself and then we get an event for the - // nesting type that contains - // it. Upshot is that this logic is dependent on the order of event delivery; as - // long as the event for the - // type itself is first this test will behave as expected. - // - // We probably want something more robust here. - LOG.error("Unable to set reference for type change event " + e); + // expect one event per type changed and for every parent type that nests it + if (Boolean.TRUE.equals( + changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { + // store an error if we see duplicate change event + // any non-empty error will fail the test so it's OK to do this multiple times + typeChangeEventError.set(Optional.of("Duplicate type change event " + e)); + } + typeChangeEventLatch.countDown(); }); ctx.getEventBus() .register( PreparedStatementRemovalEvent.class, (e) -> { - if (!idRef.compareAndSet(Optional.empty(), Optional.of(e.queryId))) - LOG.error("Unable to set reference for PS removal event"); - latch.countDown(); + if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) { + // store an error if we see multiple cache invalidation events + // any non-empty error will fail the test so it's OK to do this multiple times + removedQueryEventError.set( + Optional.of("Unable to set reference for PS removal event")); + } + preparedStmtCacheRemoveLatch.countDown(); }); + // alter test_type_2 to trigger cache invalidation and above events session.execute("ALTER TYPE test_type_2 add i blob"); - Uninterruptibles.awaitUninterruptibly(latch, 2, TimeUnit.SECONDS); + + // wait for latches and fail if they don't reach zero before timeout + assertThat( + Uninterruptibles.awaitUninterruptibly( + preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS)) + .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") + .isTrue(); + assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS)) + .withFailMessage("typeChangeEventLatch did not trigger before timeout") + .isTrue(); /* Okay, the latch triggered so cache processing should now be done. Let's validate :allthethings: */ - assertThat(changeRef.get()).isNotEmpty(); - assertThat(changeRef.get().get()).isEqualTo("test_type_2"); - assertThat(idRef.get()).isNotEmpty(); - assertThat(idRef.get().get()).isEqualTo(queryId2); + assertThat(changedTypes.keySet()).isEqualTo(expectedChangedTypes); + assertThat(removedQueryIds.get()).isNotEmpty().get().isEqualTo(queryId2); assertThat(getPreparedCacheSize(session)).isEqualTo(1); + + // check no errors were seen in callback (and report those as fail msgs) + // if something is broken these may still succeed due to timing + // but shouldn't intermittently fail if the code is working properly + assertThat(typeChangeEventError.get()) + .withFailMessage(() -> typeChangeEventError.get().get()) + .isEmpty(); + assertThat(removedQueryEventError.get()) + .withFailMessage(() -> removedQueryEventError.get().get()) + .isEmpty(); } } @@ -323,12 +302,12 @@ private void invalidationVariableDefsTest(Consumer createFn, boolean @Test public void should_invalidate_cache_entry_on_basic_udt_change_result_set() { - invalidationResultSetTest(setupCacheEntryTestBasic); + invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2")); } @Test public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() { - invalidationVariableDefsTest(setupCacheEntryTestBasic, false); + invalidationVariableDefsTest(setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2")); } Consumer setupCacheEntryTestCollection = @@ -343,12 +322,13 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() { @Test public void should_invalidate_cache_entry_on_collection_udt_change_result_set() { - invalidationResultSetTest(setupCacheEntryTestCollection); + invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2")); } @Test public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs() { - invalidationVariableDefsTest(setupCacheEntryTestCollection, true); + invalidationVariableDefsTest( + setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2")); } Consumer setupCacheEntryTestTuple = @@ -363,12 +343,12 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs @Test public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() { - invalidationResultSetTest(setupCacheEntryTestTuple); + invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2")); } @Test public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() { - invalidationVariableDefsTest(setupCacheEntryTestTuple, false); + invalidationVariableDefsTest(setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2")); } Consumer setupCacheEntryTestNested = @@ -383,12 +363,14 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() { @Test public void should_invalidate_cache_entry_on_nested_udt_change_result_set() { - invalidationResultSetTest(setupCacheEntryTestNested); + invalidationResultSetTest( + setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4")); } @Test public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() { - invalidationVariableDefsTest(setupCacheEntryTestNested, false); + invalidationVariableDefsTest( + setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4")); } /* ========================= Infrastructure copied from PreparedStatementIT ========================= */ diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java index 1b07edb53af..6e0f0446b9b 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java @@ -368,9 +368,14 @@ public void should_return_same_instance_when_repreparing_query() { assertThat(getPreparedCacheSize(session)).isEqualTo(0); String query = "SELECT * FROM prepared_statement_test WHERE a = ?"; - // When - PreparedStatement preparedStatement1 = session.prepare(query); - PreparedStatement preparedStatement2 = session.prepare(query); + // Send prepare requests, keep hold of CompletionStage objects to prevent them being removed from CqlPrepareAsyncProcessor#cache, see JAVA-3062 + CompletionStage preparedStatement1Future = session.prepareAsync(query); + CompletionStage preparedStatement2Future = session.prepareAsync(query); + + PreparedStatement preparedStatement1 = + CompletableFutures.getUninterruptibly(preparedStatement1Future); + PreparedStatement preparedStatement2 = + CompletableFutures.getUninterruptibly(preparedStatement2Future); // Then assertThat(preparedStatement1).isSameAs(preparedStatement2); @@ -385,11 +390,16 @@ public void should_create_separate_instances_for_differently_formatted_queries() // Given assertThat(getPreparedCacheSize(session)).isEqualTo(0); - // When + // Send prepare requests, keep hold of CompletionStage objects to prevent them being removed from CqlPrepareAsyncProcessor#cache, see JAVA-3062 + CompletionStage preparedStatement1Future = + session.prepareAsync("SELECT * FROM prepared_statement_test WHERE a = ?"); + CompletionStage preparedStatement2Future = + session.prepareAsync("select * from prepared_statement_test where a = ?"); + PreparedStatement preparedStatement1 = - session.prepare("SELECT * FROM prepared_statement_test WHERE a = ?"); + CompletableFutures.getUninterruptibly(preparedStatement1Future); PreparedStatement preparedStatement2 = - session.prepare("select * from prepared_statement_test where a = ?"); + CompletableFutures.getUninterruptibly(preparedStatement2Future); // Then assertThat(preparedStatement1).isNotSameAs(preparedStatement2); @@ -405,9 +415,16 @@ public void should_create_separate_instances_for_different_statement_parameters( SimpleStatement statement = SimpleStatement.newInstance("SELECT * FROM prepared_statement_test"); - // When - PreparedStatement preparedStatement1 = session.prepare(statement.setPageSize(1)); - PreparedStatement preparedStatement2 = session.prepare(statement.setPageSize(4)); + // Send prepare requests, keep hold of CompletionStage objects to prevent them being removed from CqlPrepareAsyncProcessor#cache, see JAVA-3062 + CompletionStage preparedStatement1Future = + session.prepareAsync(statement.setPageSize(1)); + CompletionStage preparedStatement2Future = + session.prepareAsync(statement.setPageSize(4)); + + PreparedStatement preparedStatement1 = + CompletableFutures.getUninterruptibly(preparedStatement1Future); + PreparedStatement preparedStatement2 = + CompletableFutures.getUninterruptibly(preparedStatement2Future); // Then assertThat(preparedStatement1).isNotSameAs(preparedStatement2); diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/session/SessionUtils.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/session/SessionUtils.java index b3d0cfc33b4..bc4aa0dbb7c 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/session/SessionUtils.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/session/SessionUtils.java @@ -67,36 +67,40 @@ public class SessionUtils { private static final Logger LOG = LoggerFactory.getLogger(SessionUtils.class); private static final AtomicInteger keyspaceId = new AtomicInteger(); private static final String DEFAULT_SESSION_CLASS_NAME = CqlSession.class.getName(); - private static final String SESSION_BUILDER_CLASS = - System.getProperty(SESSION_BUILDER_CLASS_PROPERTY, DEFAULT_SESSION_CLASS_NAME); + + private static String getSessionBuilderClass() { + return System.getProperty(SESSION_BUILDER_CLASS_PROPERTY, DEFAULT_SESSION_CLASS_NAME); + } @SuppressWarnings("unchecked") public static SessionBuilder baseBuilder() { + String sessionBuilderClass = getSessionBuilderClass(); try { - Class clazz = Class.forName(SESSION_BUILDER_CLASS); + Class clazz = Class.forName(sessionBuilderClass); Method m = clazz.getMethod("builder"); return (SessionBuilder) m.invoke(null); } catch (Exception e) { LOG.warn( "Could not construct SessionBuilder from {} using builder(), using default " + "implementation.", - SESSION_BUILDER_CLASS, + sessionBuilderClass, e); return (SessionBuilder) CqlSession.builder(); } } public static ProgrammaticDriverConfigLoaderBuilder configLoaderBuilder() { + String sessionBuilderClass = getSessionBuilderClass(); try { - Class clazz = Class.forName(SESSION_BUILDER_CLASS); + Class clazz = Class.forName(sessionBuilderClass); Method m = clazz.getMethod("configLoaderBuilder"); return (ProgrammaticDriverConfigLoaderBuilder) m.invoke(null); } catch (Exception e) { - if (!SESSION_BUILDER_CLASS.equals(DEFAULT_SESSION_CLASS_NAME)) { + if (!sessionBuilderClass.equals(DEFAULT_SESSION_CLASS_NAME)) { LOG.warn( "Could not construct ProgrammaticDriverConfigLoaderBuilder from {} using " + "configLoaderBuilder(), using default implementation.", - SESSION_BUILDER_CLASS, + sessionBuilderClass, e); } return DriverConfigLoader.programmaticBuilder();