From 25683f237aab65317050d15999f20f3408d70133 Mon Sep 17 00:00:00 2001 From: Josh Blakeley <31618778+joshblakeley@users.noreply.github.com> Date: Thu, 1 Aug 2024 14:17:40 +0100 Subject: [PATCH] [tt-6671] AWS Kinesis pump (#841) * add Kinesis pump * add readme docs * go mod tidy * gofumpt and goimports * formatting * golangci lint issues * golangci lint issues again * resolve comments and lint errors * no lint dupe * try math/rand/v2 * crypto/rand to satisfy linter even though it is not needed * gofumpt --------- Co-authored-by: joshblakeley Co-authored-by: joshblakeley Co-authored-by: Matias --- README.md | 40 ++++++++++ go.mod | 11 ++- go.sum | 24 +++--- pumps/init.go | 1 + pumps/kinesis.go | 189 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 251 insertions(+), 14 deletions(-) create mode 100644 pumps/kinesis.go diff --git a/README.md b/README.md index d0c125cfc..c990e1fd3 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ The table below provides details on the fields within each `tyk_analytics` recor - [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes) - [Timestream](#timestream-config) - [AWS SQS](#SQS-config) +- [AWS Kinesis](#Kinesis-config) # Configuration: @@ -1343,6 +1344,45 @@ TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0 ``` +## Kinesis Config + +#### Authentication & Prerequisite + +We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials). + +#### Config Fields + +`stream_name` - The name of your Kinesis stream in the specified AWS region + +`region` - The AWS region your Kinesis stream is located - i.e. eu-west-2 + +`batch_size` - Optional. The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100. + +###### JSON / Conf File + +```json + "kinesis":{ + "type": "kinesis", + "meta": { + "stream_name": "my-stream", + "region": "eu-west-2", + "batch_size": 100 + } + }, +``` + +###### Env Variables + +``` +#Kinesis Pump Configuration +TYK_PMP_PUMPS_KINESIS_TYPE=kinesis +TYK_PMP_PUMPS_KINESIs_META_STREAMNAME=my-stream +TYK_PMP_PUMPS_KINESIS_META_REGION=eu-west-2 +TYK_PMP_PUMPS_KINESIS_META_BATCHSIZE=100 +``` + + + # Base Pump Configurations The following configurations can be added to any Pump. Keep reading for an example. diff --git a/go.mod b/go.mod index 3350d3637..ef3bf705a 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,10 @@ require ( github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 github.com/TykTechnologies/storage v1.2.0 - github.com/aws/aws-sdk-go-v2 v1.22.1 + github.com/aws/aws-sdk-go-v2 v1.30.1 github.com/aws/aws-sdk-go-v2/config v1.9.0 github.com/aws/aws-sdk-go-v2/credentials v1.6.1 + github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1 github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 github.com/cenkalti/backoff/v4 v4.0.2 @@ -57,15 +58,16 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 // indirect - github.com/aws/smithy-go v1.16.0 // indirect + github.com/aws/smithy-go v1.20.3 // indirect github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect @@ -89,6 +91,7 @@ require ( github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/joho/godotenv v1.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.15.9 // indirect diff --git a/go.sum b/go.sum index 91c4837b0..954ae1026 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,10 @@ github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE= github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= -github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U= -github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c= +github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o= +github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 h1:tW1/Rkad38LA15X4UQtjXZXNKsCgkshC3EbmcUmghTg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3/go.mod h1:UbnqO+zjqk3uIt9yCACHJ9IVNhyhOCnYk8yA19SAWrM= github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s= github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ= github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0= @@ -42,12 +44,12 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 h1:5SAoZ4jYpGH4721ZNoS1znQrhOfZinOhc4XuTXx/nVc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13/go.mod h1:+rdA6ZLpaSeM7tSg/B0IEDinCIBJGmW8rKDFkYpP04g= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 h1:WIijqeaAO7TYFLbhsZmi2rgLEAtWOC1LhxCAVTJlSKw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13/go.mod h1:i+kbfa76PQbWw/ULoWnp51EYVWH4ENln76fLQE3lXT8= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= @@ -55,6 +57,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1 h1:UIEtjoWh7oqjHXdgdjOP/tinga1uKR9F//tiUNshE7w= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1/go.mod h1:tqz5Yq7ohiQIQ7qrj6e2fWJbT1Owq9zEo78mZb/+eWU= github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE= github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU= github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs= @@ -67,8 +71,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc= github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik= -github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= +github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= +github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -186,7 +190,6 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= -github.com/jackc/pgconn v1.10.0 h1:4EYhlDVEMsJ30nNj0mmgwIUXoq7e9sMJrVC2ED6QlCU= github.com/jackc/pgconn v1.10.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= @@ -207,7 +210,6 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= @@ -233,7 +235,9 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI= github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= diff --git a/pumps/init.go b/pumps/init.go index 544988ef4..304d8d697 100644 --- a/pumps/init.go +++ b/pumps/init.go @@ -37,4 +37,5 @@ func init() { AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{} AvailablePumps["resurfaceio"] = &ResurfacePump{} AvailablePumps["sqs"] = &SQSPump{} + AvailablePumps["kinesis"] = &KinesisPump{} } diff --git a/pumps/kinesis.go b/pumps/kinesis.go new file mode 100644 index 000000000..cb8c2e247 --- /dev/null +++ b/pumps/kinesis.go @@ -0,0 +1,189 @@ +package pumps + +import ( + "context" + "crypto/rand" + "encoding/json" + "fmt" + "math/big" + + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/mitchellh/mapstructure" + "github.com/sirupsen/logrus" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" +) + +// KinesisPump is a Tyk Pump that sends analytics records to AWS Kinesis. +type KinesisPump struct { + client *kinesis.Client + kinesisConf *KinesisConf + log *logrus.Entry + CommonPumpConfig +} + +// @PumpConf Kinesis +type KinesisConf struct { + // The prefix for the environment variables that will be used to override the configuration. + // Defaults to `TYK_PMP_PUMPS_KINESIS_META` + EnvPrefix string `mapstructure:"meta_env_prefix"` + // A name to identify the stream. The stream name is scoped to the AWS account used by the application + // that creates the stream. It is also scoped by AWS Region. + // That is, two streams in two different AWS accounts can have the same name. + // Two streams in the same AWS account but in two different Regions can also have the same name. + StreamName string `mapstructure:"stream_name"` + // AWS Region the Kinesis stream targets + Region string `mapstructure:"region"` + // Each PutRecords (the function used in this pump)request can support up to 500 records. + // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. + // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. + BatchSize int `mapstructure:"batch_size"` +} + +var ( + kinesisPrefix = "kinesis-pump" + kinesisDefaultENV = PUMPS_ENV_PREFIX + "_KINESIS" + PUMPS_ENV_META_PREFIX +) + +func (p *KinesisPump) New() Pump { + newPump := KinesisPump{} + return &newPump +} + +// Init initializes the pump with configuration settings. +func (p *KinesisPump) Init(config interface{}) error { + p.log = log.WithField("prefix", kinesisPrefix) + + // Read configuration file + p.kinesisConf = &KinesisConf{} + err := mapstructure.Decode(config, &p.kinesisConf) + if err != nil { + p.log.Fatal("Failed to decode configuration: ", err) + } + + processPumpEnvVars(p, p.log, p.kinesisConf, kinesisDefaultENV) + + // Load AWS configuration + // Credentials are loaded as specified in + // https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials + cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(p.kinesisConf.Region)) + if err != nil { + p.log.Fatalf("unable to load Kinesis SDK config, %v", err) + } + + defaultBatchSize := 100 + if p.kinesisConf.BatchSize == 0 { + p.kinesisConf.BatchSize = defaultBatchSize + } + + if p.kinesisConf.StreamName == "" { + p.log.Error("Stream name unset - may be unable to produce records") + } + + // Create Kinesis client + p.client = kinesis.NewFromConfig(cfg) + p.log.Info(p.GetName() + " Initialized") + + return nil +} + +// WriteData writes the analytics records to AWS Kinesis in batches. +func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) error { + batches := splitIntoBatches(records, p.kinesisConf.BatchSize) + for _, batch := range batches { + var entries []types.PutRecordsRequestEntry + for _, record := range batch { + // Build message format + decoded, ok := record.(analytics.AnalyticsRecord) + if !ok { + p.log.WithField("record", record).Error("unable to decode record") + continue + } + //nolint:dupl + analyticsRecord := Json{ + "timestamp": decoded.TimeStamp, + "method": decoded.Method, + "path": decoded.Path, + "raw_path": decoded.RawPath, + "response_code": decoded.ResponseCode, + "alias": decoded.Alias, + "api_key": decoded.APIKey, + "api_version": decoded.APIVersion, + "api_name": decoded.APIName, + "api_id": decoded.APIID, + "org_id": decoded.OrgID, + "oauth_id": decoded.OauthID, + "raw_request": decoded.RawRequest, + "request_time_ms": decoded.RequestTime, + "raw_response": decoded.RawResponse, + "ip_address": decoded.IPAddress, + "host": decoded.Host, + "content_length": decoded.ContentLength, + "user_agent": decoded.UserAgent, + "tags": decoded.Tags, + } + + // Transform object to json string + json, jsonError := json.Marshal(analyticsRecord) + if jsonError != nil { + p.log.WithError(jsonError).Error("unable to marshal message") + } + + n, err := rand.Int(rand.Reader, big.NewInt(1000000000)) + if err != nil { + p.log.Error("failed to generate int for Partition key: ", err) + } + + // Partition key uses a string representation of Int + // Should even distribute across shards as AWS uses md5 of each message partition key + entry := types.PutRecordsRequestEntry{ + Data: json, + PartitionKey: aws.String(fmt.Sprint(n)), + } + entries = append(entries, entry) + } + + input := &kinesis.PutRecordsInput{ + StreamName: aws.String(p.kinesisConf.StreamName), + Records: entries, + } + + output, err := p.client.PutRecords(ctx, input) + if err != nil { + p.log.Error("failed to put records to Kinesis: ", err) + } + + // Check for failed records + if output != nil { + for _, record := range output.Records { + if record.ErrorCode != nil { + p.log.Debugf("Failed to put record: %s - %s", aws.ToString(record.ErrorCode), aws.ToString(record.ErrorMessage)) + } + p.log.Debug(record) + } + p.log.Info("Purged ", len(output.Records), " records...") + } + } + return nil +} + +// splitIntoBatches splits the records into batches of the specified size. +func splitIntoBatches(records []interface{}, batchSize int) [][]interface{} { + var batches [][]interface{} + for batchSize < len(records) { + records, batches = records[batchSize:], append(batches, records[0:batchSize:batchSize]) + } + return append(batches, records) +} + +// GetName returns the name of the pump. +func (p *KinesisPump) GetName() string { + return "Kinesis Pump" +} + +func (p *KinesisPump) GetEnvPrefix() string { + return p.kinesisConf.EnvPrefix +}