Skip to content

Commit

Permalink
Initial testing with CosmosDB, #762 (#763)
Browse files Browse the repository at this point in the history
* Initial testing with CosmosDB, #762

* Most things seems to work fine.
* Delete of events isn't supported because CosmosDB has a different behavior for clustering columns than Cassandra.
  This is probably a bug in CosmosDB.
* Documented configuration for compat and connection

Apart from tests failing because of lack of support for delete events and all persistenceIds the
following tests are failing, were slow, or overloaded the quota of the "try" account.

Failing:
* akka.persistence.cassandra.query.EventsByTagFindDelayedEventsSpec - live eventsByTag delayed messages must find delayed events when many other events: got unexpected message
* akka.persistence.cassandra.query.EventsByTagPersistenceIdCleanupSpec - drop state and trigger new persistence id lookup peridodically: unexpected message

Very slow
* akka.persistence.cassandra.query.EventsByTagStrictBySeqNoEarlyFirstOffsetSpec - 354s, not failing
* akka.persistence.cassandra.query.EventsByTagZeroEventualConsistencyDelaySpec - hangs
* akka.persistence.cassandra.journal.RecoveryLoadSpec - not failing
* akka.persistence.cassandra.compaction.CassandraCompactionStrategySpec - not failing
* akka.persistence.cassandra.EventsByTagCrashSpec - never completes

Overloaded:
* akka.persistence.cassandra.journal.ManyActorsLoadSpec
* akka.persistence.cassandra.journal.StartupLoadSpec
* akka.persistence.cassandra.journal.CassandraLoadSpec
* akka.persistence.cassandra.journal.CassandraJournalPerfSpec
* akka.persistence.cassandra.EventsByTagStressSpec

truncate not supported:
* akka.persistence.cassandra.EventsByTagRecoverySpec
* akka.persistence.cassandra.reconciler.BuildTagViewForPersisetceIdSpec
* akka.persistence.cassandra.reconciler.TruncateAllSpec

Materialized view, not relevant:
* akka.persistence.cassandra.EventsByTagMigrationSpec
* akka.persistence.cassandra.EventsByTagMigrationProvidePersistenceIds

A few other adjustments:
* change some of the testing infra
* refreshSchema needed when using same session in tests
* logging config

* MultiPluginSpec
  • Loading branch information
patriknw authored Apr 17, 2020
1 parent 7f2ad5d commit bc58942
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 58 deletions.
5 changes: 5 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ akka.persistence.cassandra {
# won't function.
coordinated-shutdown-on-error = off

compatibility {
# Enable this when using CosmosDB
cosmosdb = off
}

//#shared

//#journal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.typesafe.config.Config
val snapshotSettings: SnapshotSettings = new SnapshotSettings(system, config)

val healthCheckSettings: HealthCheckSettings = new HealthCheckSettings(system, config)

val cosmosDb: Boolean = config.getBoolean("compatibility.cosmosdb")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ import akka.stream.scaladsl.Source
private def preparedDeleteMessages: Future[PreparedStatement] = {
if (settings.journalSettings.supportDeletes) {
session.serverMetaData.flatMap { meta =>
session.prepare(statements.journalStatements.deleteMessages(meta.isVersion2))
session.prepare(statements.journalStatements.deleteMessages(meta.isVersion2 || settings.cosmosDb))
}
} else
deletesNotSupportedException
Expand Down Expand Up @@ -518,7 +518,7 @@ import akka.stream.scaladsl.Source

def physicalDelete(lowestPartition: Long, highestPartition: Long, toSeqNr: Long): Future[Done] = {
session.serverMetaData.flatMap { meta =>
if (meta.isVersion2) {
if (meta.isVersion2 || settings.cosmosDb) {
physicalDelete2xCompat(lowestPartition, highestPartition, toSeqNr)
} else {
val deleteResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
session.serverMetaData.flatMap { meta =>
if (meta.isVersion2
|| settings.cosmosDb
|| 0L < criteria.minTimestamp
|| criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
preparedSelectSnapshotMetadata.flatMap { snapshotMetaPs =>
Expand Down
9 changes: 4 additions & 5 deletions core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<appender name="CONSOLE"
class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<!-- Silence initial setup logging from Logback -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1}
[%X{akkaSource}] - %m%n%xException</pattern>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<logger name="org.apache.cassandra" level="ERROR" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

package akka.persistence.cassandra

import java.net.InetSocketAddress
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit

import scala.concurrent.Await

import akka.actor.{ ActorSystem, PoisonPill, Props }
import akka.persistence.PersistentActor
import akka.testkit.{ TestKitBase, TestProbe }
Expand All @@ -19,6 +20,9 @@ import org.scalatest._
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }

import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry

object CassandraLifecycle {

val firstTimeBucket: String = {
Expand All @@ -40,6 +44,7 @@ object CassandraLifecycle {
# needed when testing with Akka 2.6
akka.actor.allow-java-serialization = on
akka.actor.warn-about-java-serializer-usage = off
akka.use-slf4j = off
""").withFallback(CassandraSpec.enableAutocreate).resolve()

def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = {
Expand Down Expand Up @@ -99,14 +104,11 @@ trait CassandraLifecycle extends BeforeAndAfterAll with TestKitBase {

def systemName: String

def port(): Int = 9042
lazy val cluster: CqlSession =
Await.result(session.underlying(), 10.seconds)

lazy val cluster = {
CqlSession
.builder()
.withLocalDatacenter("datacenter1")
.addContactPoint(new InetSocketAddress("localhost", port()))
.build()
def session: CassandraSession = {
CassandraSessionRegistry(system).sessionFor("akka.persistence.cassandra")
}

override protected def beforeAll(): Unit = {
Expand All @@ -119,8 +121,8 @@ trait CassandraLifecycle extends BeforeAndAfterAll with TestKitBase {
}

override protected def afterAll(): Unit = {
shutdown(system, verifySystemShutdown = true)
externalCassandraCleanup()
shutdown(system, verifySystemShutdown = true)
super.afterAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import org.scalatest.time.{ Milliseconds, Seconds, Span }
import org.scalatest.{ Outcome, Suite }
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.matchers.should.Matchers

import scala.collection.immutable
import scala.concurrent.duration._

import akka.persistence.cassandra.journal.CassandraJournal
import akka.serialization.SerializationExtension

import scala.util.control.NonFatal

import akka.persistence.cassandra.TestTaggingActor.Ack
import akka.actor.PoisonPill

Expand All @@ -46,12 +46,10 @@ object CassandraSpec {
reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_").take(48) // Max length of a c* keyspace
}

def configOverrides(journalKeyspace: String, snapshotStoreKeyspace: String, port: Int): Config =
def configOverrides(journalKeyspace: String, snapshotStoreKeyspace: String): Config =
ConfigFactory.parseString(s"""
akka.persistence.cassandra {
journal.keyspace = $journalKeyspace
# FIXME this is not the way to configure port. Do we need port config in tests?
port = $port
snapshot {
keyspace = $snapshotStoreKeyspace
Expand All @@ -77,7 +75,8 @@ object CassandraSpec {

val fallbackConfig = ConfigFactory.parseString(s"""
akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"]
akka.loglevel = DEBUG
akka.loglevel = INFO
akka.use-slf4j = off

datastax-java-driver {
basic.request {
Expand Down Expand Up @@ -197,7 +196,6 @@ abstract class CassandraSpec(
keyspaces().foreach { keyspace =>
cluster.execute(s"drop keyspace if exists $keyspace")
}
cluster.close()
} catch {
case NonFatal(t) =>
println("Exception during cleanup")
Expand All @@ -208,7 +206,7 @@ abstract class CassandraSpec(
final implicit lazy val system: ActorSystem = {
// always use this port and keyspace generated here, then test config, then the lifecycle config
val finalConfig =
configOverrides(journalName, snapshotName, port())
configOverrides(journalName, snapshotName)
.withFallback(config) // test's config
.withFallback(fallbackConfig) // generally good config that tests can override
.withFallback(CassandraLifecycle.config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ abstract class AbstractEventsByTagMigrationSpec
override protected def afterAll(): Unit = {
try {
externalCassandraCleanup()
cluster.close()
} catch {
case NonFatal(e) =>
println("Failed to cleanup cassandra")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class CassandraLoadTypedSpec extends CassandraSpec(dumpRowsOnFailure = false) wi

private def testThroughput(processor: ActorRef[Command], probe: TestProbe[String]): Unit = {
val warmCycles = 100L
val loadCycles = 2000L
val loadCycles = 500L // increase for serious testing

(1L to warmCycles).foreach { i =>
processor ! "a"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import akka.persistence.{ PersistentActor, SaveSnapshotSuccess }
import com.typesafe.config.ConfigFactory

object MultiPluginSpec {
val journalKeyspace = "multiplugin_spec_journal"
val snapshotKeyspace = "multiplugin_spec_snapshot"
val now = System.currentTimeMillis()
val journalKeyspace = s"multiplugin_spec_journal_$now"
val snapshotKeyspace = s"multiplugin_spec_snapshot_$now"
val cassandraPort = CassandraLauncher.randomPort
val config = ConfigFactory.parseString(s"""
|akka.test.single-expect-default = 20s
|akka.test.filter-leeway = 20s
|
|akka.persistence.snapshot-store.plugin = ""
|
|akka.persistence.cassandra.journal.keyspace = $journalKeyspace
|akka.persistence.cassandra.journal.keyspace-autocreate=false
|akka.persistence.cassandra.journal.circuit-breaker.call-timeout = 30s
|akka.persistence.cassandra.snapshot.keyspace=$snapshotKeyspace
|
Expand All @@ -38,7 +40,7 @@ object MultiPluginSpec {
|cassandra-plugin-d.journal.table=processor_d_messages
|cassandra-plugin-d.snapshot.table=snapshot_d_messages
|
""".stripMargin).withFallback(CassandraSpec.enableAutocreate)
""".stripMargin).withFallback(CassandraLifecycle.config)

trait Processor extends PersistentActor {

Expand Down Expand Up @@ -74,23 +76,18 @@ object MultiPluginSpec {

class MultiPluginSpec
extends CassandraSpec(
config.withFallback(ConfigFactory.load("reference.conf")),
MultiPluginSpec.journalKeyspace,
MultiPluginSpec.snapshotKeyspace) {
MultiPluginSpec.config,
journalName = MultiPluginSpec.journalKeyspace,
snapshotName = MultiPluginSpec.snapshotKeyspace) {

lazy val cassandraPluginSettings = PluginSettings(system)

// default journal plugin is not configured for this test
override def awaitPersistenceInit(): Unit = ()
// default journal plugin is not used for this test
// override def awaitPersistenceInit(): Unit = ()

override protected def beforeAll(): Unit = {
super.beforeAll()

cluster.execute(
s"CREATE KEYSPACE IF NOT EXISTS $journalKeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }")
cluster.execute(
s"CREATE KEYSPACE IF NOT EXISTS $snapshotKeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }")

CassandraLifecycle.awaitPersistenceInit(system, "cassandra-plugin-a.journal")
CassandraLifecycle.awaitPersistenceInit(system, "cassandra-plugin-b.journal")
CassandraLifecycle.awaitPersistenceInit(system, "cassandra-plugin-c.journal", "cassandra-plugin-c.snapshot")
Expand All @@ -100,20 +97,20 @@ class MultiPluginSpec
"A Cassandra journal" must {
"be usable multiple times with different configurations for two actors having the same persistence id" in {
val processorA = system.actorOf(Props(classOf[OverrideJournalPluginProcessor], "cassandra-plugin-a.journal"))
processorA ! s"msg"
expectMsgAllOf(s"msg-1")
processorA ! s"msg-a"
expectMsg(s"msg-a-1")

val processorB = system.actorOf(Props(classOf[OverrideJournalPluginProcessor], "cassandra-plugin-b.journal"))
processorB ! s"msg"
expectMsgAllOf(s"msg-1")
processorB ! s"msg-b"
expectMsg(s"msg-b-1")

processorB ! s"msg"
expectMsgAllOf(s"msg-2")
processorB ! s"msg-b"
expectMsg(s"msg-b-2")

// c is actually a and therefore the next message must be seqNr 2 and not 3
val processorC = system.actorOf(Props(classOf[OverrideJournalPluginProcessor], "cassandra-plugin-a.journal"))
processorC ! s"msg"
expectMsgAllOf(s"msg-2")
processorC ! s"msg-a"
expectMsg(s"msg-a-2")
}
}

Expand All @@ -122,17 +119,17 @@ class MultiPluginSpec
val processorC =
system.actorOf(
Props(classOf[OverrideSnapshotPluginProcessor], "cassandra-plugin-c.journal", "cassandra-plugin-c.snapshot"))
processorC ! s"msg"
expectMsgAllOf(s"msg-1")
processorC ! s"msg-c"
expectMsg(s"msg-c-1")

val processorD =
system.actorOf(
Props(classOf[OverrideSnapshotPluginProcessor], "cassandra-plugin-d.journal", "cassandra-plugin-d.snapshot"))
processorD ! s"msg"
expectMsgAllOf(s"msg-1")
processorD ! s"msg-d"
expectMsg(s"msg-d-1")

processorD ! s"msg"
expectMsgAllOf(s"msg-2")
processorD ! s"msg-d"
expectMsg(s"msg-d-2")

processorC ! "snapshot"
processorD ! "snapshot"
Expand All @@ -141,15 +138,15 @@ class MultiPluginSpec
val processorE =
system.actorOf(
Props(classOf[OverrideSnapshotPluginProcessor], "cassandra-plugin-c.journal", "cassandra-plugin-c.snapshot"))
processorE ! s"msg"
expectMsgAllOf(s"msg-2")
processorE ! s"msg-c"
expectMsg(s"msg-c-2")

// e is actually c and therefore the next message must be seqNr 2 after recovery by using the snapshot
val processorF =
system.actorOf(
Props(classOf[OverrideSnapshotPluginProcessor], "cassandra-plugin-d.journal", "cassandra-plugin-d.snapshot"))
processorF ! s"msg"
expectMsgAllOf(s"msg-3")
processorF ! s"msg-d"
expectMsg(s"msg-d-3")

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {

"Cassandra query currentEventsByTag" must {
"set ttl on table" in {
cluster.refreshSchema()
val options = cluster.getMetadata.getKeyspace(journalName).get.getTable("tag_views").get().getOptions

options.get(CqlIdentifier.fromCql("default_time_to_live")) shouldEqual 86400
Expand Down Expand Up @@ -1394,10 +1395,12 @@ class EventsByTagDisabledSpec extends AbstractEventsByTagSpec(EventsByTagSpec.di

"Events by tag disabled" must {
"stop tag_views being created" in {
cluster.refreshSchema()
cluster.getMetadata.getKeyspace(journalName).get.getTable("tag_views") shouldEqual Optional.empty()
}

"stop tag_progress being created" in {
cluster.refreshSchema()
cluster.getMetadata.getKeyspace(journalName).get.getTable("tag_write_progress") shouldEqual Optional.empty()
}

Expand Down
Loading

0 comments on commit bc58942

Please sign in to comment.