Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce pekko-based activemq connector #313

Merged
merged 11 commits into from
Aug 7, 2023
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target
run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target
run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
26 changes: 20 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / tlBaseVersion := "0.4" // current series x.y
ThisBuild / tlBaseVersion := "0.5" // current series x.y

ThisBuild / organization := "com.ocadotechnology"
ThisBuild / organizationName := "Ocado Technology"
Expand Down Expand Up @@ -60,8 +60,8 @@ lazy val root = (project in file("."))
IntegrationTest / classDirectory := (Test / classDirectory).value,
IntegrationTest / parallelExecution := true
)
.aggregate(core, kernel, high, activemq, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy)
.dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.aggregate(core, kernel, high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy)
.dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy)

def module(name: String, directory: String = ".") = Project(s"pass4s-$name", file(directory) / name).settings(commonSettings)

Expand Down Expand Up @@ -102,7 +102,7 @@ val nettySnykOverrides = Seq(
"io.netty" % "netty-handler" % nettyVersion
)

lazy val activemq = module("activemq", directory = "connectors")
lazy val activemqAkka = module("activemq", directory = "connectors")
.settings(
name := "pass4s-connector-activemq",
libraryDependencies ++= Seq(
Expand All @@ -114,6 +114,20 @@ lazy val activemq = module("activemq", directory = "connectors")
)
.dependsOn(core)

lazy val activemqPekko = module("activemq-pekko", directory = "connectors")
.settings(
name := "pass4s-connector-activemq",
resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/",
resolvers ++= Resolver.sonatypeOssRepos("snapshots"),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+140-7d704044-SNAPSHOT", // TODO to be changed to stable release once https://github.com/apache/incubator-pekko-connectors/issues/210 is ready
"org.apache.activemq" % "activemq-pool" % Versions.ActiveMq,
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats
),
headerSources / excludeFilter := HiddenFileFilter || "taps.scala"
)
.dependsOn(core)

lazy val kinesis = module("kinesis", directory = "connectors")
.settings(
name := "pass4s-connector-kinesis",
Expand Down Expand Up @@ -206,7 +220,7 @@ lazy val docs = project // new documentation project
WorkflowStep.Sbt(List("docs/mdoc"))
)
)
.dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.enablePlugins(MdocPlugin, DocusaurusPlugin)

// misc
Expand All @@ -223,7 +237,7 @@ lazy val demo = module("demo")
"ch.qos.logback" % "logback-classic" % Versions.Logback
)
)
.dependsOn(activemq, sns, sqs, extra, logging)
.dependsOn(activemqPekko, sns, sqs, extra, logging)

lazy val commonSettings = Seq(
organization := "com.ocadotechnology",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.activemq

import cats.effect.Resource
import cats.effect.Sync
import cats.implicits._
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.pool.PooledConnectionFactory

object ConnectionFactories {

/** Creates a pooled ActiveMQ connection factory.
*
* Use `failover:(tcp://address)` to be sure that send operation is never failing. (Connection retries are handled by connection factory)
* Read documentation: https://activemq.apache.org/failover-transport-reference.html
*
* Use raw `tcp://address` to make send operation able to fail. This may be useful when working with outbox pattern
*/
def pooled[F[_]: Sync](username: String, password: String, url: String): Resource[F, PooledConnectionFactory] =
Resource.suspend {
unpooled[F](username, password, url).map(makePooled[F](_))
}

/** Creates an ActiveMQ connection factory.
*/
def unpooled[F[_]: Sync](username: String, password: String, url: String): F[ActiveMQConnectionFactory] =
Sync[F].delay(new ActiveMQConnectionFactory(username, password, url))

/** Wraps the base factory in a connection pool.
*/
def makePooled[F[_]: Sync](baseFactory: ActiveMQConnectionFactory): Resource[F, PooledConnectionFactory] =
Resource.make(Sync[F].delay(new PooledConnectionFactory(baseFactory)))(pcf => Sync[F].delay(pcf.stop()))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings}
import cats.effect.Resource
import com.ocadotechnology.pass4s.connectors.activemq.JmsSource.JmsSourceSettings
import com.ocadotechnology.pass4s.connectors.activemq.consumer._
import com.ocadotechnology.pass4s.connectors.activemq.producer._
import com.ocadotechnology.pass4s.core.CommittableMessage
import com.ocadotechnology.pass4s.core.Connector
import com.ocadotechnology.pass4s.core.Destination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import fs2.Stream
import org.typelevel.log4cats.Logger

import javax.jms.ConnectionFactory
import scala.concurrent.duration._
import scala.reflect.runtime.universe._
import cats.effect.kernel.Async

trait Jms

object Jms {
sealed trait Type extends Product with Serializable

object Type {
final case object Queue extends Type
final case object Topic extends Type
}

}

final case class JmsSource private (name: String, sourceType: Jms.Type, settings: JmsSourceSettings) extends Source[Jms] {
override val capability: Type = typeOf[Jms]

override val messageProcessingTimeout: Option[FiniteDuration] = Some(settings.messageProcessingTimeout)
override val cancelableMessageProcessing: Boolean = settings.cancelableMessageProcessing
override val maxConcurrent: Int = settings.parallelSessions

def toDestination: JmsDestination = JmsDestination(name, sourceType)
}

object JmsSource {

final case class JmsSourceSettings(
// sets internal timeout on a message processing. JMS' ackTimeout will be (x + 1 second) * 1.2
messageProcessingTimeout: FiniteDuration = 30.seconds,
cancelableMessageProcessing: Boolean = true,
parallelSessions: Int = 1,
restartSettings: RestartSettings = RestartSettings(minBackoff = 2.second, maxBackoff = 30.seconds, randomFactor = 0.2)
)

final case class RestartSettings(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double) {
val toAkka: AkkaRestartSettings = AkkaRestartSettings(minBackoff, maxBackoff, randomFactor)
}

def queue(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Queue, settings)

def topic(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Topic, settings)
}

final case class JmsDestination private (name: String, destinationType: Jms.Type) extends Destination[Jms] {
override val capability: Type = typeOf[Jms]

def toSource(settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, destinationType, settings)
}

object JmsDestination {
def queue(name: String): JmsDestination = JmsDestination(name, Jms.Type.Queue)

def topic(name: String): JmsDestination = JmsDestination(name, Jms.Type.Topic)
}

object JmsConnector {
type JmsConnector[F[_]] = Connector.Aux[F, Jms, ConnectionFactory]

// these might have to return resources,
// we might also have variants that build an Egress directly or have a conversion method on Connector
// (probably not, as methods on Connector shouldn't be used by end users)
def singleBroker[F[_]: Logger: Async](
username: String,
password: String,
url: String
)(
implicit as: ActorSystem
): Resource[F, JmsConnector[F]] =
ConnectionFactories.pooled(username, password, url).flatMap(singleBroker[F](_))

def singleBroker[F[_]: Logger: Async](
connectionFactory: ConnectionFactory
)(
implicit as: ActorSystem
): Resource[F, JmsConnector[F]] =
for {
producer <- createMessageProducer(connectionFactory)
} yield new Connector[F, Jms] {

type Raw = ConnectionFactory
override val underlying: ConnectionFactory = connectionFactory

override def consumeBatched[R >: Jms](source: Source[R]): Stream[F, List[CommittableMessage[F]]] =
consumeAndReconnectOnErrors(connectionFactory)(source).map(List(_))

override def produce[R >: Jms](message: Message[R]): F[Unit] =
producer(message)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.stream.connectors.{jms => pekkojms}

private[activemq] object common {

def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = {
case (name, Jms.Type.Topic) => pekkojms.Topic(name)
case (name, Jms.Type.Queue) => pekkojms.Queue(name)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer
import org.apache.pekko.stream.connectors.{jms => pekkojms}
import org.apache.pekko.stream.scaladsl.RestartSource
import cats.ApplicativeThrow
import cats.effect.Async
import cats.effect.Sync
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.activemq.taps._
import com.ocadotechnology.pass4s.core.Message.Payload
import com.ocadotechnology.pass4s.core.CommittableMessage
import com.ocadotechnology.pass4s.core.Source
import fs2.Stream
import org.typelevel.log4cats.Logger

import javax.jms
import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
import scala.util.Try

private[activemq] object consumer {

def consumeAndReconnectOnErrors[F[_]: Async: Logger](
connectionFactory: jms.ConnectionFactory
)(
source: Source[_]
)(
implicit as: ActorSystem
): Stream[F, CommittableMessage[F]] =
for {
JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source))

jmsConsumerSettings = pekkojms
.JmsConsumerSettings(as, connectionFactory)
.withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2)
.withSessionCount(settings.parallelSessions)
.withFailStreamOnAckTimeout(true)
.withDestination(common.toPekkoDestination(name, sourceType))

txEnvelope <- RestartSource
.withBackoff(settings.restartSettings.toAkka) { () =>
JmsConsumer.txSource(jmsConsumerSettings).named(getClass.getSimpleName)
}
.toStream[F]()
committableMessage <- Stream.eval(toCommittableMessage(txEnvelope)).unNone
} yield committableMessage

private def extractJmsSource[F[_]: ApplicativeThrow](source: Source[_]): F[JmsSource] =
source match {
case jmsSource: JmsSource => jmsSource.pure[F]
case unsupportedDestination =>
ApplicativeThrow[F].raiseError(
new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination")
)
}

private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = {
val commit = Sync[F].delay(txEnvelope.commit())
val rollback = Sync[F].delay(txEnvelope.rollback())
txEnvelope.message match {
case textMessage: jms.TextMessage =>
CommittableMessage.instance(Payload(textMessage.getText, getHeaders(textMessage)), commit, _ => rollback).some.pure[F]
case unsupportedMessage =>
Logger[F].warn(s"JmsConnector supports only TextMessages. Ignoring received message: $unsupportedMessage") *> rollback.as(None)
}
}

// fixme: add headers/properties from underlying message - need to double check if all properties are returned by getPropertyNames
private def getHeaders(msg: jms.Message): Map[String, String] =
Try {
msg
.getPropertyNames
.asIterator()
.asInstanceOf[java.util.Iterator[String]] // Please forgive me I have to do this, but underneath it is really String
.asScala
.map(name => name -> msg.getStringProperty(name))
.toMap
}.getOrElse(Map.empty)

}
Loading
Loading