Skip to content

Commit

Permalink
upgrade to latest Scanamo version (1.0.0-M28)
Browse files Browse the repository at this point in the history
Complete Scanamo update - support both v1 and v2 credentials
  • Loading branch information
rtyley committed Feb 1, 2024
1 parent 9abd55e commit f4178e5
Show file tree
Hide file tree
Showing 23 changed files with 168 additions and 110 deletions.
2 changes: 1 addition & 1 deletion app/controllers/UploadController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UploadController(override val authActions: HMACAuthActions, awsConfig: AWS
import authActions.APIAuthAction

private val credsGenerator = new CredentialsGenerator(awsConfig)
private val uploadDecorator = new UploadDecorator(awsConfig, stepFunctions)
private val uploadDecorator = new UploadDecorator(awsConfig.scanamo, awsConfig.cacheTableName, stepFunctions)

def list(atomId: String) = APIAuthAction { req =>
val atom = MediaAtom.fromThrift(getPreviewAtom(atomId))
Expand Down
6 changes: 3 additions & 3 deletions app/data/DataStores.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class DataStores(aws: AWSConfig with SNSAccess, capi: CapiAccess) {
val preview = new PreviewDynamoDataStore(aws.dynamoDB, aws.dynamoTableName)
val published = new PublishedDynamoDataStore(aws.dynamoDB, aws.publishedDynamoTableName)

val pluto: PlutoDataStore = new PlutoDataStore(aws.dynamoDB, aws.manualPlutoDynamo)
val pluto: PlutoDataStore = new PlutoDataStore(aws.scanamo, aws.manualPlutoDynamo)

val livePublisher: AtomPublisher = aws.capiContentEventsTopicName match {
case Some(capiContentEventsTopicName) =>
Expand Down Expand Up @@ -48,8 +48,8 @@ class DataStores(aws: AWSConfig with SNSAccess, capi: CapiAccess) {
val reindexPublished: PublishedKinesisAtomReindexer =
new PublishedKinesisAtomReindexer(aws.publishedKinesisReindexStreamName, aws.crossAccountKinesisClient)

val plutoCommissionStore: PlutoCommissionDataStore = new PlutoCommissionDataStore(aws)
val plutoProjectStore: PlutoProjectDataStore = new PlutoProjectDataStore(aws, plutoCommissionStore)
val plutoCommissionStore: PlutoCommissionDataStore = new PlutoCommissionDataStore(aws.scanamo, aws.plutoCommissionTableName)
val plutoProjectStore: PlutoProjectDataStore = new PlutoProjectDataStore(aws.scanamo, aws.plutoProjectTableName, plutoCommissionStore)

val atomListStore = AtomListStore(aws.stage, capi, preview)

Expand Down
2 changes: 1 addition & 1 deletion app/di.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class MediaAtomMaker(context: Context)
private val capi = new Capi(config)

private val stores = new DataStores(aws, capi)
private val permissions = new MediaAtomMakerPermissionsProvider(aws.stage, aws.region.getName, aws.credentials.instance)
private val permissions = new MediaAtomMakerPermissionsProvider(aws.stage, aws.region.getName, aws.credentials.instance.v1)

private val reindexer = buildReindexer()

Expand Down
4 changes: 2 additions & 2 deletions app/util/AWS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class AWSConfig(override val config: Config, override val credentials: AwsCreden
lazy val ec2Client = AmazonEC2ClientBuilder
.standard()
.withRegion(region.getName)
.withCredentials(credentials.instance)
.withCredentials(credentials.instance.v1)
.build()

lazy val pinboardLoaderUrl = getString("panda.domain").map(domain => s"https://pinboard.$domain/pinboard.loader.js")
Expand All @@ -43,7 +43,7 @@ class AWSConfig(override val config: Config, override val credentials: AwsCreden
lazy val expiryPollerName = "Expiry"
lazy val expiryPollerLastName = "Poller"

final override def regionName = getString("aws.region")
final override def region = AwsAccess.regionFrom(this)

final override def readTag(tagName: String) = {
val tagsResult = ec2Client.describeTags(
Expand Down
17 changes: 6 additions & 11 deletions app/util/UploadDecorator.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package util

import com.gu.media.aws.{DynamoAccess, UploadAccess}
import com.gu.media.model.{ClientAsset, ClientAssetMetadata}
import com.gu.media.upload.model.Upload
import org.scanamo.{Scanamo, Table}
import org.scanamo.generic.auto._
import org.scanamo.syntax._
import org.scanamo.auto._
import org.scanamo.{Scanamo, Table}

class UploadDecorator(aws: DynamoAccess with UploadAccess, stepFunctions: StepFunctions) {
private val table = Table[Upload](aws.cacheTableName)
class UploadDecorator(scanamo: Scanamo, cacheTableName: String, stepFunctions: StepFunctions) {
private val table = Table[Upload](cacheTableName)

def addMetadata(atomId: String, video: ClientAsset): ClientAsset = {
val id = s"$atomId-${video.id}"
Expand All @@ -28,10 +27,6 @@ class UploadDecorator(aws: DynamoAccess with UploadAccess, stepFunctions: StepFu
}
}

private def getUpload(id: String): Option[Upload] = {
val op = table.get('id -> id)
val result = Scanamo.exec(aws.dynamoDB)(op).flatMap(_.right.toOption)

result orElse { stepFunctions.getById(id) }
}
private def getUpload(id: String): Option[Upload] =
scanamo.exec(table.get("id" === id)).flatMap(_.toOption) orElse { stepFunctions.getById(id) }
}
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import scala.sys.process._

val scroogeVersion = "4.12.0"
val awsVersion = "1.11.678"
val awsV2Version = "2.21.17"
val pandaVersion = "1.2.0"
val pandaHmacVersion = "2.1.0"
val atomMakerVersion = "1.3.4"
val typesafeConfigVersion = "1.4.0" // to match what we get from Play transitively
val scanamoVersion = "1.0.0-M9"
val scanamoVersion = "1.0.0-M28"

val playJsonExtensionsVersion = "0.40.2"
val okHttpVersion = "2.4.0"
Expand Down Expand Up @@ -47,6 +48,7 @@ lazy val commonSettings = Seq(
ThisBuild / scalaVersion := "2.12.16",
scalacOptions ++= Seq("-feature", "-deprecation"/*, "-Xfatal-warnings"*/),
ThisBuild / organization := "com.gu",
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-java8-compat" % VersionScheme.Always,

resolvers ++= Seq(
"Sonatype OSS" at "https://oss.sonatype.org/content/repositories/releases/",
Expand Down Expand Up @@ -102,10 +104,12 @@ lazy val common = (project in file("common"))
"com.amazonaws" % "aws-lambda-java-core" % awsLambdaCoreVersion,
"com.amazonaws" % "aws-java-sdk-s3" % awsVersion,
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsVersion,
"software.amazon.awssdk" % "dynamodb" % awsV2Version,
"com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion,
"ai.x" %% "play-json-extensions" % playJsonExtensionsVersion,
"ch.qos.logback" % "logback-classic" % logbackClassicVersion,
"com.amazonaws" % "aws-java-sdk-sts" % awsVersion,
"software.amazon.awssdk" % "sts" % awsV2Version,
"com.amazonaws" % "aws-java-sdk-elastictranscoder" % awsVersion,
"org.scanamo" %% "scanamo" % scanamoVersion,
"com.squareup.okhttp" % "okhttp" % okHttpVersion,
Expand All @@ -132,6 +136,7 @@ lazy val app = (project in file("."))
ehcache,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonDatabindVersion,
"com.amazonaws" % "aws-java-sdk-sts" % awsVersion,
"software.amazon.awssdk" % "sts" % awsV2Version,
"com.amazonaws" % "aws-java-sdk-ec2" % awsVersion,
"org.scalatestplus.play" %% "scalatestplus-play" % scalaTestPlusPlayVersion % "test",
"org.mockito" % "mockito-core" % mockitoVersion % "test",
Expand Down
21 changes: 9 additions & 12 deletions common/src/main/scala/com/gu/media/PlutoDataStore.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package com.gu.media

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
import com.amazonaws.services.dynamodbv2.model.DeleteItemResult
import com.gu.media.model.PlutoSyncMetadataMessage
import org.scanamo.error.DynamoReadError
import org.scanamo.generic.auto._
import org.scanamo.syntax._
import org.scanamo.{Scanamo, Table}
import org.scanamo.auto._

class PlutoDataStore(client: AmazonDynamoDB, dynamoTableName: String) {
class PlutoDataStore(scanamo: Scanamo, dynamoTableName: String) {

val table = Table[PlutoSyncMetadataMessage](dynamoTableName)

def getUploadsWithAtomId(id: String): List[PlutoSyncMetadataMessage] = {
val atomIdIndex = table.index("atom-id")
val results = Scanamo.exec(client)(atomIdIndex.query('atomId -> id))
val results = scanamo.exec(atomIdIndex.query("atomId" === id))

val errors = results.collect { case Left(err) => err }
if (errors.nonEmpty) {
Expand All @@ -25,21 +22,21 @@ class PlutoDataStore(client: AmazonDynamoDB, dynamoTableName: String) {
}

def get(id: String): Option[PlutoSyncMetadataMessage] = {
val operation = table.get('id -> id)
val result = Scanamo.exec(client)(operation)
val operation = table.get("id" === id)
val result = scanamo.exec(operation)

result.map {
case Right(item) => item
case Left(err) => throw DynamoPlutoTableException(err.toString)
}
}

def put(item: PlutoSyncMetadataMessage): Option[Either[DynamoReadError, PlutoSyncMetadataMessage]] = {
Scanamo.exec(client)(table.put(item))
def put(item: PlutoSyncMetadataMessage): Unit = {
scanamo.exec(table.put(item))
}

def delete(id: String): DeleteItemResult = {
Scanamo.exec(client)(table.delete('id -> id))
def delete(id: String): Unit = {
scanamo.exec(table.delete("id" === id))
}

case class DynamoPlutoTableException(err: String) extends RuntimeException(err)
Expand Down
20 changes: 14 additions & 6 deletions common/src/main/scala/com/gu/media/aws/AwsAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import com.amazonaws.regions.{Region, Regions}
import com.gu.media.Settings

trait AwsAccess { this: Settings =>
def regionName: Option[String]
def readTag(tag: String): Option[String]

val credentials: AwsCredentials

// To avoid renaming references everywhere
def credsProvider: AWSCredentialsProvider = credentials.instance
def credsProvider: AWSCredentialsProvider = credentials.instance.v1

final def defaultRegion: Region = Region.getRegion(Regions.EU_WEST_1)
final def region: Region = regionName
.map { name => Region.getRegion(Regions.fromName(name)) }
.getOrElse(defaultRegion)
def region: Region

final def awsV2Region: software.amazon.awssdk.regions.Region =
software.amazon.awssdk.regions.Region.of(region.getName)

// These are injected as environment variables when running in a Lambda (unfortunately they cannot be tagged)
final val stage = sys.env.getOrElse("STAGE", readTag("Stage").getOrElse("DEV"))
Expand All @@ -24,3 +24,11 @@ trait AwsAccess { this: Settings =>
final val stack: Option[String] = if (isDev) Some("media-atom-maker") else readTag("Stack")
final val app: String = if (isDev) "media-atom-maker" else readTag("App").getOrElse("media-atom-maker")
}

object AwsAccess {
def regionFrom(maybeName: Option[String]): Region = maybeName
.map { name => Region.getRegion(Regions.fromName(name)) }
.getOrElse(Region.getRegion(Regions.EU_WEST_1))

def regionFrom(settings: Settings): Region = regionFrom(settings.getString("aws.region"))
}
33 changes: 13 additions & 20 deletions common/src/main/scala/com/gu/media/aws/AwsCredentials.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package com.gu.media.aws

import com.amazonaws.auth._
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.gu.media.Settings

case class AwsCredentials(instance: AWSCredentialsProvider, crossAccount: AWSCredentialsProvider,
upload: AWSCredentialsProvider)
case class AwsCredentials(
instance: AwsCredentialsProvidersForBothSdkVersions,
crossAccount: AwsCredentialsProvidersForBothSdkVersions,
upload: AwsCredentialsProvidersForBothSdkVersions
)

object AwsCredentials {
def dev(settings: Settings): AwsCredentials = {
val profile = settings.getMandatoryString("aws.profile")
val instance = new ProfileCredentialsProvider(profile)
val instance = AwsCredentialsProvidersForBothSdkVersions.profile(profile)

// To enable publishing to CAPI code from DEV, update the kinesis streams in config and uncomment below:
// val crossAccount = new ProfileCredentialsProvider("composer")
// val crossAccount = AwsCredentialsProvidersForBothSdkVersions.profile("composer")
val crossAccount = instance

val upload = devUpload(settings)
Expand All @@ -23,38 +23,31 @@ object AwsCredentials {
}

def app(settings: Settings): AwsCredentials = {
val instance = InstanceProfileCredentialsProvider.getInstance()
val instance = AwsCredentialsProvidersForBothSdkVersions.instance()

val crossAccount = assumeCrossAccountRole(instance, settings)

AwsCredentials(instance, crossAccount, upload = instance)
}

def lambda(): AwsCredentials = {
val instance = new EnvironmentVariableCredentialsProvider()
val instance = AwsCredentialsProvidersForBothSdkVersions.environmentVariables()
AwsCredentials(instance, crossAccount = instance, upload = instance)
}

private def devUpload(settings: Settings): AWSCredentialsProvider = {
private def devUpload(settings: Settings): AwsCredentialsProvidersForBothSdkVersions = {
// Only required in dev (because federated credentials such as those from Janus cannot do STS requests).
// Instance profile credentials are sufficient when deployed.
val accessKey = settings.getMandatoryString("aws.upload.accessKey", "This is the AwsId output of the dev cloudformation")
val secretKey = settings.getMandatoryString("aws.upload.secretKey", "This is the AwsSecret output of the dev cloudformation")

new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey))
AwsCredentialsProvidersForBothSdkVersions.static(accessKey, secretKey)
}

private def assumeCrossAccountRole(instance: AWSCredentialsProvider, settings: Settings) = {
private def assumeCrossAccountRole(instance: AwsCredentialsProvidersForBothSdkVersions, settings: Settings): AwsCredentialsProvidersForBothSdkVersions = {
val crossAccountRoleArn = settings.getMandatoryString("aws.kinesis.stsCapiRoleToAssume",
"Role to assume to access CAPI streams (in format arn:aws:iam::<account>:role/<role_name>)")

assumeAccountRole(instance, crossAccountRoleArn, "capi")
}

private def assumeAccountRole(instance: AWSCredentialsProvider, roleArn: String, sessionNameSuffix: String): AWSCredentialsProvider = {
val securityTokens = AWSSecurityTokenServiceClientBuilder.standard().withCredentials(instance).build()

new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, s"media-atom-maker-${sessionNameSuffix}")
.withStsClient(securityTokens).build()
instance.assumeAccountRole(crossAccountRoleArn, "capi", AwsAccess.regionFrom(settings).getName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.gu.media.aws

import com.amazonaws.auth._
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.gu.media.Settings
import com.gu.media.aws.AwsV2Util
import software.amazon.awssdk.auth.{credentials => awsv2}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sts.{StsClient, StsClientBuilder}
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest

case class AwsCredentialsProvidersForBothSdkVersions(
v1: com.amazonaws.auth.AWSCredentialsProvider,
v2: software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
) {
def assumeAccountRole(roleArn: String, sessionNameSuffix: String, regionName: String): AwsCredentialsProvidersForBothSdkVersions = {
val roleSessionName = s"media-atom-maker-$sessionNameSuffix"
AwsCredentialsProvidersForBothSdkVersions(
new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder.standard().withCredentials(v1).build()).build(),
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider.builder()
.stsClient(AwsV2Util.buildSync[StsClient, StsClientBuilder](StsClient.builder(), v2, Region.of(regionName)))
.refreshRequest(AssumeRoleRequest.builder.roleSessionName(roleSessionName).roleArn(roleArn).build)
.build()
)
}
}

object AwsCredentialsProvidersForBothSdkVersions {
def profile(name: String): AwsCredentialsProvidersForBothSdkVersions = AwsCredentialsProvidersForBothSdkVersions(
new ProfileCredentialsProvider(name),
awsv2.ProfileCredentialsProvider.create(name)
)

def instance(): AwsCredentialsProvidersForBothSdkVersions = AwsCredentialsProvidersForBothSdkVersions(
InstanceProfileCredentialsProvider.getInstance(),
awsv2.InstanceProfileCredentialsProvider.create()
)

def environmentVariables(): AwsCredentialsProvidersForBothSdkVersions = AwsCredentialsProvidersForBothSdkVersions(
new EnvironmentVariableCredentialsProvider(),
awsv2.EnvironmentVariableCredentialsProvider.create()
)

def static(accessKey: String, secretKey: String): AwsCredentialsProvidersForBothSdkVersions = AwsCredentialsProvidersForBothSdkVersions(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)),
awsv2.StaticCredentialsProvider.create(awsv2.AwsBasicCredentials.create(accessKey, secretKey))
)
}
15 changes: 15 additions & 0 deletions common/src/main/scala/com/gu/media/aws/AwsV2Util.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.gu.media.aws

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.awscore.client.builder.{AwsClientBuilder, AwsSyncClientBuilder}
import software.amazon.awssdk.http.apache.ApacheHttpClient

object AwsV2Util {
def buildSync[T, B <: AwsClientBuilder[B, T] with AwsSyncClientBuilder[B, T]](
builder: B, creds: AwsCredentialsProvider, region: software.amazon.awssdk.regions.Region
): T = builder
.httpClientBuilder(ApacheHttpClient.builder())
.credentialsProvider(creds)
.region(region)
.build()
}
11 changes: 11 additions & 0 deletions common/src/main/scala/com/gu/media/aws/DynamoAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package com.gu.media.aws

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.gu.media.Settings
import com.gu.media.aws.AwsV2Util.buildSync
import org.scanamo.Scanamo
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.awscore.client.builder.{AwsClientBuilder, AwsSyncClientBuilder}
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.dynamodb.{DynamoDbClient, DynamoDbClientBuilder}

trait DynamoAccess { this: Settings with AwsAccess =>
lazy val dynamoTableName: String = sys.env.getOrElse("ATOM_TABLE_NAME",
Expand All @@ -24,4 +30,9 @@ trait DynamoAccess { this: Settings with AwsAccess =>
.withCredentials(credsProvider)
.withRegion(region.getName)
.build()

lazy val dynamoDbSdkV2: DynamoDbClient =
buildSync[DynamoDbClient, DynamoDbClientBuilder](DynamoDbClient.builder(), credentials.instance.v2, awsV2Region)

lazy val scanamo: Scanamo = Scanamo(dynamoDbSdkV2)
}
4 changes: 2 additions & 2 deletions common/src/main/scala/com/gu/media/aws/KinesisAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ trait KinesisAccess { this: Settings with AwsAccess with Logging =>
val syncWithPluto: Boolean = getBoolean("pluto.sync").getOrElse(false)

lazy val crossAccountKinesisClient = AmazonKinesisClientBuilder.standard()
.withCredentials(credentials.crossAccount)
.withCredentials(credentials.crossAccount.v1)
.withRegion(region.getName)
.build()

lazy val kinesisClient = AmazonKinesisClientBuilder.standard()
.withCredentials(credentials.instance)
.withCredentials(credentials.instance.v1)
.withRegion(region.getName)
.build()

Expand Down
Loading

0 comments on commit f4178e5

Please sign in to comment.