diff --git a/404.html b/404.html index e434b724..d0fbd7ba 100644 --- a/404.html +++ b/404.html @@ -5,13 +5,13 @@ Page Not Found | Pass4s - +
Skip to main content

Page Not Found

We could not find what you were looking for.

Please contact the owner of the site that linked you to the original URL and let them know their link is broken.

- + \ No newline at end of file diff --git a/assets/js/runtime~main.6cdce50a.js b/assets/js/runtime~main.6cdce50a.js deleted file mode 100644 index ce4d709c..00000000 --- a/assets/js/runtime~main.6cdce50a.js +++ /dev/null @@ -1 +0,0 @@ -(()=>{"use strict";var e,t,r,a,o,n={},f={};function i(e){var t=f[e];if(void 0!==t)return t.exports;var r=f[e]={exports:{}};return n[e].call(r.exports,r,r.exports,i),r.exports}i.m=n,e=[],i.O=(t,r,a,o)=>{if(!r){var n=1/0;for(d=0;d=o)&&Object.keys(i.O).every((e=>i.O[e](r[c])))?r.splice(c--,1):(f=!1,o0&&e[d-1][2]>o;d--)e[d]=e[d-1];e[d]=[r,a,o]},i.n=e=>{var t=e&&e.__esModule?()=>e.default:()=>e;return i.d(t,{a:t}),t},r=Object.getPrototypeOf?e=>Object.getPrototypeOf(e):e=>e.__proto__,i.t=function(e,a){if(1&a&&(e=this(e)),8&a)return e;if("object"==typeof e&&e){if(4&a&&e.__esModule)return e;if(16&a&&"function"==typeof e.then)return e}var o=Object.create(null);i.r(o);var n={};t=t||[null,r({}),r([]),r(r)];for(var f=2&a&&e;"object"==typeof f&&!~t.indexOf(f);f=r(f))Object.getOwnPropertyNames(f).forEach((t=>n[t]=()=>e[t]));return n.default=()=>e,i.d(o,n),o},i.d=(e,t)=>{for(var r in t)i.o(t,r)&&!i.o(e,r)&&Object.defineProperty(e,r,{enumerable:!0,get:t[r]})},i.f={},i.e=e=>Promise.all(Object.keys(i.f).reduce(((t,r)=>(i.f[r](e,t),t)),[])),i.u=e=>"assets/js/"+({53:"935f2afb",74:"10b69c63",155:"1eb5bd94",195:"c4f5d8e4",300:"51545c38",391:"19bf2b45",504:"fe4863e2",514:"1be78505",586:"3109491d",589:"3169204a",607:"a9497a52",723:"aedc008b",779:"c3a2bac9",852:"c3f819a8",918:"17896441",922:"48ee0d35"}[e]||e)+"."+{53:"6ca680c3",74:"48a11d2d",155:"0d6eb740",195:"d323dcd6",300:"cf0c10ad",391:"f3e57872",504:"c4d2d8f5",514:"b89ba4a4",586:"c8cddfdf",589:"76843d67",607:"3e04c0d6",723:"2f9a2bde",779:"b2fea44b",852:"4682f288",918:"cf79d53f",922:"8c91f9e3",972:"40c5be87"}[e]+".js",i.miniCssF=e=>{},i.g=function(){if("object"==typeof globalThis)return globalThis;try{return this||new Function("return this")()}catch(e){if("object"==typeof window)return window}}(),i.o=(e,t)=>Object.prototype.hasOwnProperty.call(e,t),a={},o="pass4s:",i.l=(e,t,r,n)=>{if(a[e])a[e].push(t);else{var f,c;if(void 0!==r)for(var s=document.getElementsByTagName("script"),d=0;d{f.onerror=f.onload=null,clearTimeout(b);var o=a[e];if(delete a[e],f.parentNode&&f.parentNode.removeChild(f),o&&o.forEach((e=>e(r))),t)return t(r)},b=setTimeout(l.bind(null,void 0,{type:"timeout",target:f}),12e4);f.onerror=l.bind(null,f.onerror),f.onload=l.bind(null,f.onload),c&&document.head.appendChild(f)}},i.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},i.p="/pass4s/",i.gca=function(e){return e={17896441:"918","935f2afb":"53","10b69c63":"74","1eb5bd94":"155",c4f5d8e4:"195","51545c38":"300","19bf2b45":"391",fe4863e2:"504","1be78505":"514","3109491d":"586","3169204a":"589",a9497a52:"607",aedc008b:"723",c3a2bac9:"779",c3f819a8:"852","48ee0d35":"922"}[e]||e,i.p+i.u(e)},(()=>{var e={303:0,532:0};i.f.j=(t,r)=>{var a=i.o(e,t)?e[t]:void 0;if(0!==a)if(a)r.push(a[2]);else if(/^(303|532)$/.test(t))e[t]=0;else{var o=new Promise(((r,o)=>a=e[t]=[r,o]));r.push(a[2]=o);var n=i.p+i.u(t),f=new Error;i.l(n,(r=>{if(i.o(e,t)&&(0!==(a=e[t])&&(e[t]=void 0),a)){var o=r&&("load"===r.type?"missing":r.type),n=r&&r.target&&r.target.src;f.message="Loading chunk "+t+" failed.\n("+o+": "+n+")",f.name="ChunkLoadError",f.type=o,f.request=n,a[1](f)}}),"chunk-"+t,t)}},i.O.j=t=>0===e[t];var t=(t,r)=>{var a,o,n=r[0],f=r[1],c=r[2],s=0;if(n.some((t=>0!==e[t]))){for(a in f)i.o(f,a)&&(i.m[a]=f[a]);if(c)var d=c(i)}for(t&&t(r);s{"use strict";var e,t,r,a,o,n={},f={};function c(e){var t=f[e];if(void 0!==t)return t.exports;var r=f[e]={exports:{}};return n[e].call(r.exports,r,r.exports,c),r.exports}c.m=n,e=[],c.O=(t,r,a,o)=>{if(!r){var n=1/0;for(s=0;s=o)&&Object.keys(c.O).every((e=>c.O[e](r[i])))?r.splice(i--,1):(f=!1,o0&&e[s-1][2]>o;s--)e[s]=e[s-1];e[s]=[r,a,o]},c.n=e=>{var t=e&&e.__esModule?()=>e.default:()=>e;return c.d(t,{a:t}),t},r=Object.getPrototypeOf?e=>Object.getPrototypeOf(e):e=>e.__proto__,c.t=function(e,a){if(1&a&&(e=this(e)),8&a)return e;if("object"==typeof e&&e){if(4&a&&e.__esModule)return e;if(16&a&&"function"==typeof e.then)return e}var o=Object.create(null);c.r(o);var n={};t=t||[null,r({}),r([]),r(r)];for(var f=2&a&&e;"object"==typeof f&&!~t.indexOf(f);f=r(f))Object.getOwnPropertyNames(f).forEach((t=>n[t]=()=>e[t]));return n.default=()=>e,c.d(o,n),o},c.d=(e,t)=>{for(var r in t)c.o(t,r)&&!c.o(e,r)&&Object.defineProperty(e,r,{enumerable:!0,get:t[r]})},c.f={},c.e=e=>Promise.all(Object.keys(c.f).reduce(((t,r)=>(c.f[r](e,t),t)),[])),c.u=e=>"assets/js/"+({53:"935f2afb",74:"10b69c63",155:"1eb5bd94",195:"c4f5d8e4",300:"51545c38",391:"19bf2b45",504:"fe4863e2",514:"1be78505",586:"3109491d",589:"3169204a",607:"a9497a52",723:"aedc008b",779:"c3a2bac9",852:"c3f819a8",918:"17896441",922:"48ee0d35"}[e]||e)+"."+{53:"6ca680c3",74:"48a11d2d",155:"0d6eb740",195:"d323dcd6",300:"cf0c10ad",391:"f3e57872",504:"c4d2d8f5",514:"b89ba4a4",586:"c8cddfdf",589:"76843d67",607:"3e04c0d6",723:"2f9a2bde",779:"b2fea44b",852:"4682f288",918:"cf79d53f",922:"8c91f9e3",972:"40c5be87"}[e]+".js",c.miniCssF=e=>{},c.g=function(){if("object"==typeof globalThis)return globalThis;try{return this||new Function("return this")()}catch(e){if("object"==typeof window)return window}}(),c.o=(e,t)=>Object.prototype.hasOwnProperty.call(e,t),a={},o="pass4s:",c.l=(e,t,r,n)=>{if(a[e])a[e].push(t);else{var f,i;if(void 0!==r)for(var d=document.getElementsByTagName("script"),s=0;s{f.onerror=f.onload=null,clearTimeout(b);var o=a[e];if(delete a[e],f.parentNode&&f.parentNode.removeChild(f),o&&o.forEach((e=>e(r))),t)return t(r)},b=setTimeout(l.bind(null,void 0,{type:"timeout",target:f}),12e4);f.onerror=l.bind(null,f.onerror),f.onload=l.bind(null,f.onload),i&&document.head.appendChild(f)}},c.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},c.p="/pass4s/",c.gca=function(e){return e={17896441:"918","935f2afb":"53","10b69c63":"74","1eb5bd94":"155",c4f5d8e4:"195","51545c38":"300","19bf2b45":"391",fe4863e2:"504","1be78505":"514","3109491d":"586","3169204a":"589",a9497a52:"607",aedc008b:"723",c3a2bac9:"779",c3f819a8:"852","48ee0d35":"922"}[e]||e,c.p+c.u(e)},(()=>{var e={303:0,532:0};c.f.j=(t,r)=>{var a=c.o(e,t)?e[t]:void 0;if(0!==a)if(a)r.push(a[2]);else if(/^(303|532)$/.test(t))e[t]=0;else{var o=new Promise(((r,o)=>a=e[t]=[r,o]));r.push(a[2]=o);var n=c.p+c.u(t),f=new Error;c.l(n,(r=>{if(c.o(e,t)&&(0!==(a=e[t])&&(e[t]=void 0),a)){var o=r&&("load"===r.type?"missing":r.type),n=r&&r.target&&r.target.src;f.message="Loading chunk "+t+" failed.\n("+o+": "+n+")",f.name="ChunkLoadError",f.type=o,f.request=n,a[1](f)}}),"chunk-"+t,t)}},c.O.j=t=>0===e[t];var t=(t,r)=>{var a,o,n=r[0],f=r[1],i=r[2],d=0;if(n.some((t=>0!==e[t]))){for(a in f)c.o(f,a)&&(c.m[a]=f[a]);if(i)var s=i(c)}for(t&&t(r);d Broker | Pass4s - +

Broker

Broker is a higher level abstraction than Consumer and Sender, it resides in "com.ocadotechnology" %% "pass4s-high" % "v0.4.6" module.

It aims to:

  • Provide an easy way to build sender and consumer out of a connector
  • Allow the user to route the requests - select the right sender/consumer logic based on the source of the message
trait Broker[F[_], +P] {
def consumer[R >: P](source: Source[R]): Consumer[F, Payload]

def sender[R >: P]: Sender[F, Message[R]]
}

object Broker {
def fromConnector[F[_]: Async, P](connector: Connector[F, P]): Broker[F, P]
def routed[F[_], P](chooseBroker: End[P] => Broker[F, P]): Broker[F, P]
}

Sample broker initialization might look like this:

val brokerResource = Akka
.system[IO]
.flatMap { implicit sys =>
implicit val connectorLogger: Logger[IO] = Slf4jLogger.getLoggerFromClass[IO](classOf[Connector[IO, Jms]])

JmsConnector
.singleBroker[IO](
"admin",
"admin",
"failover:(tcp://localhost:61616)"
)
.map(_.logged)
}
.map(Broker.fromConnector[IO, Jms])

Plese see the DemoMain.scala for full usage example.

- + \ No newline at end of file diff --git a/docs/core-concepts/Consumer/index.html b/docs/core-concepts/Consumer/index.html index 45fac07b..9d69173d 100644 --- a/docs/core-concepts/Consumer/index.html +++ b/docs/core-concepts/Consumer/index.html @@ -5,13 +5,13 @@ Consumer | Pass4s - +

Consumer

Consumer is an abstraction for continuous process of executing logic upon receiving a message.

It's defined as a function in following shape: (A => F[Unit]) => F[Unit]. This means a function that:

  • takes an argument of type A => F[Unit] - think of it as the processing logic type
  • returns F[Unit] means that consuming itself is only a side effect and yields no real value
trait Consumer[F[_], +A] extends ((A => F[Unit]) => F[Unit]) with Serializable { self =>
/** Starts the consumer, passing every message through the processing function `f`. Think of it like of an `evalMap` on [[Stream]] or
* `use` on [[cats.effect.Resource]].
*/
def consume(f: A => F[Unit]): F[Unit]

/** Starts the consumer, but allows the processing function `f` to be in a different effect than that of the consumer's. A `commit`
* function also needs to be passed - it will be used after every message.
*/
def consumeCommit[T[_]](commit: T[Unit] => F[Unit])(f: A => T[Unit]): F[Unit] = self.consume(f andThen commit)
}

To start a consumer, you need a function that will handle messages of type A and return effects in F[_]. As you can see in the example above, the consumer can also be transactional, meaning it can perform an action in one effect and then translate the result to the other. It's especially useful when you want to perform database operations in ConnectionIO[_] while your application effect is IO[_].

The end user usually doesn't instantiate the Consumer directly. Instead they would usually get one from Broker or MessageProcessor.

This abstraction comes with a lot of useful manipulators as well as Semigroup, Monoid, Functor and Monad instances. Please refer to Consumer.scala sources and the scaladocs.

Basic usage

Here's a simple consumer implementation configured to use with our localstack setup. If you want to run it locally, simply save the file somewhere and run it using scala-cli using scala-cli run filename.scala

//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.3.1"
//> using lib "com.ocadotechnology::pass4s-core:0.3.1"
//> using lib "com.ocadotechnology::pass4s-high:0.3.1"
//> using lib "com.ocadotechnology::pass4s-connector-sqs:0.3.1"
//> using lib "org.typelevel::log4cats-noop:2.5.0"

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sqs.SqsConnector
import com.ocadotechnology.pass4s.connectors.sqs.SqsEndpoint
import com.ocadotechnology.pass4s.connectors.sqs.SqsSource
import com.ocadotechnology.pass4s.connectors.sqs.SqsUrl
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

object BaseConsumer extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

// Initialize credentials
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
val localstackURI = new URI("http://localhost:4566")
val sqsSource = SqsEndpoint(SqsUrl("http://localhost:4566/000000000000/local_queue"))

val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
// Create connector resource using provided credentials
val sqsConnector =
SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

sqsConnector.use { connector => // obtain the connector resource
val broker = Broker.fromConnector(connector) // create broker from connector

IO.println(s"Processor listening for messages on $sqsSource") *>
broker
.consumer(sqsSource) // obtain the consumer for certain SQS source
.consume(message => IO.println(s"Received message: $message")) // bind consumer logic
.background // run in background
.void
.use(_ => IO.never)
}
}
}

This is a rather raw way of using consumer, you might want to use MessageProcessor for more elasticity and enriched syntax.

- + \ No newline at end of file diff --git a/docs/core-concepts/Sender/index.html b/docs/core-concepts/Sender/index.html index a63a2171..0a4b4881 100644 --- a/docs/core-concepts/Sender/index.html +++ b/docs/core-concepts/Sender/index.html @@ -5,13 +5,13 @@ Sender | Pass4s - +

Sender

Sender is a basic abstraction over the possibility to send single message. Its simplified definition looks like this:

trait Sender[F[_], -A] extends (A => F[Unit]) with Serializable {

/** Sends a single message.
*/
def sendOne(msg: A): F[Unit]

/** Alias for [[sendOne]]. Thanks to this, you can pass a Sender where a function type is expected.
*/
def apply(msg: A): F[Unit] = sendOne(msg)

}

As you can see Sender is basically a type of function of A => F[Unit] shape. It comes with many combinators for mapping, filtering and combining Senders, as well as Functor and Monoid instances.

Please refer to Sender.scala sources and the scaladocs.

The typical way of obtaining a Sender instance is by instantiating the Broker first.

Basic example

Here's a simple sender implementation configured to use with our localstack setup. If you want to run it locally, simply save the file somewhere and run it using scala-cli using scala-cli run filename.scala

//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.3.1"
//> using lib "com.ocadotechnology::pass4s-core:0.3.1"
//> using lib "com.ocadotechnology::pass4s-high:0.3.1"
//> using lib "com.ocadotechnology::pass4s-connector-sns:0.3.1"
//> using lib "org.typelevel::log4cats-noop:2.5.0"

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sns.SnsArn
import com.ocadotechnology.pass4s.connectors.sns.SnsConnector
import com.ocadotechnology.pass4s.connectors.sns.SnsDestination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

object Producer extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

// Initialize credentials
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
val localstackURI = new URI("http://localhost:4566")

val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)

// Create connector resource using provided credentials
val snsConnector =
SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

snsConnector.use { connector => // obtain the connector resource
val broker = Broker.fromConnector(connector)

val message = Message(Message.Payload("hello world!", Map()), snsDestination)

IO.println(s"Sending message $message to $snsDestination") *>
broker.sender.sendOne(message) *> // use the sender to send one message
IO.println("Sent, exiting!").as(ExitCode.Success)
}
}
}
- + \ No newline at end of file diff --git a/docs/getting-started/index.html b/docs/getting-started/index.html index fbc84415..3025feea 100644 --- a/docs/getting-started/index.html +++ b/docs/getting-started/index.html @@ -5,13 +5,13 @@ Getting started | Pass4s - +

Getting started

About

Pass4s is a Scala library providing an abstract layer for app messaging. It provides implementations for platforms like AWS SQS/SNS and ActiveMQ.

Basis installation

The library is divided into multiple modules. If you're only interested in the basic abstractions, add following to your build.sbt file:

// Algebraic abstractions (Sender/Consumer)
"com.ocadotechnology" %% "pass4s-kernel" % "v0.4.6"

// Message, Destination, CommittableMessage, Connector
"com.ocadotechnology" %% "pass4s-core" % "v0.4.6"

// Broker
"com.ocadotechnology" %% "pass4s-high" % "v0.4.6"

Modules

Connectors

ActiveMq

// ActiveMQ connector - based on Akka Alpakka
"com.ocadotechnology" %% "pass4s-connector-activemq" % "v0.4.6"
// ActiveMQ pekko connector - based on Pekko Connectors
"com.ocadotechnology" %% "pass4s-connector-pekko-activemq" % "v0.4.6"

⚠️ Warning Pekko connector is an experimental addition at the moment, as it is based on nightly build

SNS/SQS

// SNS connector
"com.ocadotechnology" %% "pass4s-connector-sns" % "v0.4.6"
// SQS connector
"com.ocadotechnology" %% "pass4s-connector-sqs" % "v0.4.6"

Kinesis

// Kinesis connector
"com.ocadotechnology" %% "pass4s-connector-kinesis" % "v0.4.6"

Useful utils

Extras - provides MessageProcessor for convenient way of building rich message consumers and an easy way to bind them to processor logic.

// high-level MessageProcessor
"com.ocadotechnology" %% "pass4s-extra" % "v0.4.6"

S3proxy - seamless support for proxying large messages through s3. Useful for sorting the large messages on sns kind of problems.

// s3proxy
"com.ocadotechnology" %% "pass4s-s3proxy" % "v0.4.6"

Circe - JSON serialization/parsing support

// circe JSON senders/consumers
"com.ocadotechnology" %% "pass4s-circe" % "v0.4.6"

Phobos - XML serialization/parsing support

// phobos XML senders/consumers
"com.ocadotechnology" %% "pass4s-phobos" % "v0.4.6"

Logging middleware that uses log4cats

// logging middleware
"com.ocadotechnology" %% "pass4s-logging" % "v0.4.6"
- + \ No newline at end of file diff --git a/docs/localstack/index.html b/docs/localstack/index.html index 128d177e..54aa2fea 100644 --- a/docs/localstack/index.html +++ b/docs/localstack/index.html @@ -5,13 +5,13 @@ Localstack | Pass4s - +

Localstack

If you want to use pass4s for SNS/SQS messaging, your usual first step is to set up local development environment. Localstack is a toolkit provided by Amazon to replicate AWS locally. This section provides ready-to-use localstack setup using docker-compose.

Docker compose setup

To launch localstack simply copy the snippet below and save it locally as docker-compose.yml. Once that's done, open terminal, navigate to the place where you saved the file and run docker-compose up. Your setup is ready to go.

version: "3.8"

services:
localstack:
container_name: localstack_main
image: localstack/localstack
hostname: localhost.localstack.cloud
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DOCKER_HOST=unix:///var/run/docker.sock
- SERVICES=sqs,sns
- EAGER_SERVICE_LOADING=1
- SKIP_SSL_CERT_DOWNLOAD=1
- HOSTNAME_EXTERNAL=localhost.localstack.cloud
volumes:
- "/tmp/localstack:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"

setup-resources:
image: localstack/localstack
environment:
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=AWSSECRET
- AWS_DEFAULT_REGION=eu-west-2
entrypoint: /bin/sh -c
command: >
"
sleep 15
alias aws='aws --endpoint-url http://localstack:4566'
# Executing SNS
aws sns create-topic --name local_sns
# Executing SQS
aws sqs create-queue --queue-name local_queue
# Subscribing to SNS to SQS
aws sns subscribe --attributes 'RawMessageDelivery=true' --topic-arn arn:aws:sns:eu-west-2:000000000000:local_sns --protocol sqs --notification-endpoint arn:aws:sqs:eu-west-2:000000000000:local_queue
aws sqs get-queue-url --queue-name local_queue
# Create na S3 bucket for large messages
aws s3 mb s3://large-messages
"
depends_on:
- localstack

Resources

This setup comes with batteries included, meaning it not only does set up the service, but also some basic resources for you to play with. The provided resources are:

  • Topic local_sns to write your messages onto
  • Queue local_queue to read messages from
  • Subscription between the two, making the messages from local_sns to be pushed to local_queue
  • Bucket large-messages in case you want to use s3 proxy

Please notice how for queue creation we use --attributes 'RawMessageDelivery=true'. This is done intentionally, make sure to use this attribute with your production setup to avoid communication issues. We add this because we don't want AWS to try interpreting our JSON message contents. To find out more check out https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html

- + \ No newline at end of file diff --git a/docs/migration-guide/index.html b/docs/migration-guide/index.html index a03e01bc..fff3f583 100644 --- a/docs/migration-guide/index.html +++ b/docs/migration-guide/index.html @@ -5,7 +5,7 @@ Migration guide | Pass4s - + @@ -18,7 +18,7 @@ The new producer will send the message in the correct format.
  • If you are having scenario where an application is both consumer and producer, then for consuming follow point 1 and for producing use firstly usingS3ProxyLegacyEncoding and then in the next release migrate to usingS3ProxyForBigPayload
  • - + \ No newline at end of file diff --git a/docs/modules/json/index.html b/docs/modules/json/index.html index 3cbfd845..66db993b 100644 --- a/docs/modules/json/index.html +++ b/docs/modules/json/index.html @@ -5,13 +5,13 @@ JSON | Pass4s - +

    JSON

    In it's core form, both the Consumer and Producer work with plain text messages. This is usually not the case for real life applications.

    This section explains how to work with messages represented in JSON. Pass4s comes with the Circe support for JSON message transformation.

    To use the module make sure to add following import to your build.sbt

    // circe JSON senders/consumers
    "com.ocadotechnology" %% "pass4s-circe" % "v0.4.6"

    With that module added, you can now import additional syntax for consumers and producers. To enable the syntax add following import to your file:

    import com.ocadotechnology.pass4s.circe.syntax._

    The syntax allows you to use .asJsonSender[T] on the senders and .asJsonConsumer[T] on consumers. Please note that for this to work, you need to provide io.circe.Encoder[T] in case of Sender and io.circe.Decoder[T] for Consumer. Please refer to this section https://circe.github.io/circe/codec.html in Circe documentation for more detailed info on how encoders and decoders work.

    When applying the syntax to consumer, consider using MessageProcessor.

    - + \ No newline at end of file diff --git a/docs/modules/message-processor/index.html b/docs/modules/message-processor/index.html index e39b5631..a219ca2a 100644 --- a/docs/modules/message-processor/index.html +++ b/docs/modules/message-processor/index.html @@ -5,13 +5,13 @@ Message processor | Pass4s - +

    Message processor

    Message processor provides a convenient way for building rich consumer, binding it to broker and attaching message handling logic.

    The usual flow of building processor starts with initialization

    val processor = MessageProcessor.init[IO]

    Where the IO can be replaced with the effect of your preference. Once initialized, you can enrich the underlying consumer by subsequent enrich calls.

    val richProcessor = 
    processor
    .enrich(_.logged)
    .enrich(_.usingS3Proxy(consumerConfig))
    .enrich(_.asJsonConsumer[String])

    Then depending on your logic you can go either with transact if you use different effect or effectful if you want to stick to IO in our example. After that you bind the broker and provide the message handling logic. Keep in mind that you can reuse once prepared processor like in the example below.

    val processor = 
    MessageProcessor
    .init[IO]
    .enrich(_.logged)
    .enrich(_.asJsonConsumer[String])
    .transacted(runEffect)
    .bindBroker(broker)

    processor.handle(Destinations.destinationA)(MyProcessor.instanceA[AppEffect])
    processor.handle(Destinations.destinationB)(MyProcessor.instanceB[AppEffect])
    - + \ No newline at end of file diff --git a/docs/modules/s3proxy/index.html b/docs/modules/s3proxy/index.html index c6d190d2..2682ae35 100644 --- a/docs/modules/s3proxy/index.html +++ b/docs/modules/s3proxy/index.html @@ -5,13 +5,13 @@ S3 Proxy | Pass4s - +

    S3 Proxy

    Some messaging solutions like SNS/SQS limit the size of the message you can send. S3 proxy solves this particular problem by sending the original payload to S3 bucket and exchanging the pointer to the S3 object on the messaging channel.

    The example below roughly shows how to use the S3 proxy on both consumer and sender.

    val senderConfig =
    S3ProxyConfig
    .Sender
    .withSnsDefaults(bucketName)
    // .copy(
    // minPayloadSize = Some(0) // You can use custom payload size
    // )
    val consumerConfig =
    S3ProxyConfig
    .Consumer
    .withSnsDefaults()
    .copy(
    shouldDeleteAfterProcessing = true // it doesn't by default, just in case there's more listeners
    )
    val broker = ??? // let's just assume you already instantiated broker
    val payload = Message.Payload("body", Map("foo" -> "bar"))

    val sender =
    broker
    .sender
    .usingS3Proxy(senderConfig)
    val consumer =
    broker
    .consumer(SqsEndpoint(SqsUrl(queueUrl)))
    .usingS3Proxy(consumerConfig)

    // no need to know anything about s3 when sending the actual message
    val sendMessageOnTopic = sender.sendOne(Message(payload, SnsDestination(SnsArn(topicArn))).widen)

    Please note that for this to work you need to have an existing s3 bucket, pass4s doesn't create any AWS resources on it's own. If you are using the provided localstack setup you can use the s3://large-messages bucket as a playground.

    For more detailed examples on s3 proxy, you might want to check out the following article https://blog.michalp.net/posts/scala/pass4s-s3-proxy/

    - + \ No newline at end of file diff --git a/docs/modules/xml/index.html b/docs/modules/xml/index.html index f65d508d..83bf5ced 100644 --- a/docs/modules/xml/index.html +++ b/docs/modules/xml/index.html @@ -5,13 +5,13 @@ XML | Pass4s - +

    XML

    This section explains how to work with messages represented in XML. Pass4s comes with the Phobos support for XML message transformation.

    To use the module make sure to add following import to your build.sbt

    // phobos XML senders/consumers
    "com.ocadotechnology" %% "pass4s-phobos" % "v0.4.6"

    With that module added, you can now import additional syntax for consumers and producers. To enable the syntax add following import to your file:

    import com.ocadotechnology.pass4s.phobos.syntax._

    The syntax allows you to use .asXmlSender[T] on the senders and .asXmlConsumer[T] on consumers. Please note that for this to work, you need to provide XmlEncoder[T] in case of Sender and XmlDecoder[T] for Consumer.

    Here's how to create most basic encoder and decoder using Phobos:

    import ru.tinkoff.phobos.decoding._
    import ru.tinkoff.phobos.encoding._
    import ru.tinkoff.phobos.syntax._
    import ru.tinkoff.phobos.derivation.semiauto._

    final case class XmlMessage(description: String, value: Long, rows: List[String])

    object XmlMessage {
    implicit val xmlEncoder: XmlEncoder[XmlMessage] = deriveXmlEncoder("xmlMessage")
    implicit val xmlDecoder: XmlDecoder[XmlMessage] = deriveXmlDecoder("xmlMessage")
    }

    Please refer to the project repository https://github.com/Tinkoff/phobos for more detailed guide on using Phobos.

    When applying the syntax to consumer, consider using MessageProcessor.

    - + \ No newline at end of file diff --git a/index.html b/index.html index 85ef2a6d..1fc00976 100644 --- a/index.html +++ b/index.html @@ -5,13 +5,13 @@ Pass4s documentation | Pass4s - +

    Pass4s

    Scala library providing an abstract layer for cross app messaging.

    Abstract

    Abstract away sender and consumer from implementation.

    Powered by Scala

    Designed with functional Scala in mind.

    Ocado Technology

    Brought to you by Ocado Technology.

    - + \ No newline at end of file