From c9a1bb99111a796231eeaf3034356ef316ef4d8d Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Wed, 3 Jul 2024 09:46:52 +0100 Subject: [PATCH 01/12] add Kinesis pump --- pumps/init.go | 1 + pumps/kinesis.go | 181 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 pumps/kinesis.go diff --git a/pumps/init.go b/pumps/init.go index 544988ef4..304d8d697 100644 --- a/pumps/init.go +++ b/pumps/init.go @@ -37,4 +37,5 @@ func init() { AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{} AvailablePumps["resurfaceio"] = &ResurfacePump{} AvailablePumps["sqs"] = &SQSPump{} + AvailablePumps["kinesis"] = &KinesisPump{} } diff --git a/pumps/kinesis.go b/pumps/kinesis.go new file mode 100644 index 000000000..8dca8dc84 --- /dev/null +++ b/pumps/kinesis.go @@ -0,0 +1,181 @@ +package pumps + +import ( + "context" + "encoding/json" + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/mitchellh/mapstructure" + "github.com/sirupsen/logrus" + "math/rand" + + "strconv" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" +) + +// KinesisPump is a Tyk Pump that sends analytics records to AWS Kinesis. +type KinesisPump struct { + client *kinesis.Client + kinesisConf *KinesisConf + log *logrus.Entry + CommonPumpConfig +} + +// @PumpConf Kinesis +type KinesisConf struct { + // The prefix for the environment variables that will be used to override the configuration. + // Defaults to `TYK_PMP_PUMPS_KINESIS_META` + EnvPrefix string `mapstructure:"meta_env_prefix"` + // A name to identify the stream. The stream name is scoped to the AWS account used by the application + // that creates the stream. It is also scoped by AWS Region. + // That is, two streams in two different AWS accounts can have the same name. + // Two streams in the same AWS account but in two different Regions can also have the same name. + StreamName string `mapstructure:"stream_name"` + // Each PutRecords (the function used in this pump)request can support up to 500 records. + // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. + // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. + BatchSize int `mapstructure:"batch_size"` + // AWS Region the Kinesis stream targets + Region string `mapstructure:"region"` +} + +var kinesisPrefix = "kinesis-pump" +var kinesisDefaultENV = PUMPS_ENV_PREFIX + "_KINESIS" + PUMPS_ENV_META_PREFIX + +func (p *KinesisPump) New() Pump { + newPump := KinesisPump{} + return &newPump +} + +// Init initializes the pump with configuration settings. +func (p *KinesisPump) Init(config interface{}) error { + + p.log = log.WithField("prefix", kinesisPrefix) + + //Read configuration file + p.kinesisConf = &KinesisConf{} + err := mapstructure.Decode(config, &p.kinesisConf) + if err != nil { + p.log.Fatal("Failed to decode configuration: ", err) + } + + processPumpEnvVars(p, p.log, p.kinesisConf, kinesisDefaultENV) + + // Load AWS configuration + // Credentials are loaded as specified in + // https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials + cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(p.kinesisConf.Region)) + if err != nil { + p.log.Fatalf("unable to load Kinesis SDK config, %v", err) + } + + var defaultBatchSize = 100 + if p.kinesisConf.BatchSize == 0 { + p.kinesisConf.BatchSize = defaultBatchSize + } + + if p.kinesisConf.StreamName == "" { + p.log.Error("Stream name unset - may be unable to produce records") + } + + // Create Kinesis client + p.client = kinesis.NewFromConfig(cfg) + p.log.Info(p.GetName() + " Initialized") + + return nil + +} + +// WriteData writes the analytics records to AWS Kinesis in batches. +func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) error { + // + batches := splitIntoBatches(records, p.kinesisConf.BatchSize) + for _, batch := range batches { + var entries []types.PutRecordsRequestEntry + for _, record := range batch { + + //Build message format + decoded := record.(analytics.AnalyticsRecord) + message := Json{ + "timestamp": decoded.TimeStamp, + "method": decoded.Method, + "path": decoded.Path, + "raw_path": decoded.RawPath, + "response_code": decoded.ResponseCode, + "alias": decoded.Alias, + "api_key": decoded.APIKey, + "api_version": decoded.APIVersion, + "api_name": decoded.APIName, + "api_id": decoded.APIID, + "org_id": decoded.OrgID, + "oauth_id": decoded.OauthID, + "raw_request": decoded.RawRequest, + "request_time_ms": decoded.RequestTime, + "raw_response": decoded.RawResponse, + "ip_address": decoded.IPAddress, + "host": decoded.Host, + "content_length": decoded.ContentLength, + "user_agent": decoded.UserAgent, + "tags": decoded.Tags, + } + + //Transform object to json string + json, jsonError := json.Marshal(message) + if jsonError != nil { + p.log.WithError(jsonError).Error("unable to marshal message") + } + + // Partition key uses a string representation of Int + // Should even distribute across shards as AWS uses md5 of each message partition key + entry := types.PutRecordsRequestEntry{ + Data: json, + PartitionKey: aws.String(strconv.Itoa(rand.Int())), + } + entries = append(entries, entry) + } + + input := &kinesis.PutRecordsInput{ + StreamName: aws.String(p.kinesisConf.StreamName), + Records: entries, + } + + output, err := p.client.PutRecords(context.TODO(), input) + if err != nil { + p.log.Error("failed to put records to Kinesis: ", err) + } + + // Check for failed records + if output != nil { + for _, record := range output.Records { + if record.ErrorCode != nil { + p.log.Debugf("Failed to put record: %s - %s", aws.ToString(record.ErrorCode), aws.ToString(record.ErrorMessage)) + } + p.log.Debug(record) + } + p.log.Info("Purged ", len(output.Records), " records...") + + } + } + return nil +} + +// splitIntoBatches splits the records into batches of the specified size. +func splitIntoBatches(records []interface{}, batchSize int) [][]interface{} { + var batches [][]interface{} + for batchSize < len(records) { + records, batches = records[batchSize:], append(batches, records[0:batchSize:batchSize]) + } + return append(batches, records) +} + +// GetName returns the name of the pump. +func (p *KinesisPump) GetName() string { + return "Kinesis Pump" +} + +func (p *KinesisPump) GetEnvPrefix() string { + return p.kinesisConf.EnvPrefix +} From 27082dae697e3327f085d00e899aa8572207f935 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Thu, 4 Jul 2024 10:15:57 +0100 Subject: [PATCH 02/12] add readme docs --- README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.md b/README.md index d0c125cfc..c990e1fd3 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ The table below provides details on the fields within each `tyk_analytics` recor - [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes) - [Timestream](#timestream-config) - [AWS SQS](#SQS-config) +- [AWS Kinesis](#Kinesis-config) # Configuration: @@ -1343,6 +1344,45 @@ TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0 ``` +## Kinesis Config + +#### Authentication & Prerequisite + +We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials). + +#### Config Fields + +`stream_name` - The name of your Kinesis stream in the specified AWS region + +`region` - The AWS region your Kinesis stream is located - i.e. eu-west-2 + +`batch_size` - Optional. The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100. + +###### JSON / Conf File + +```json + "kinesis":{ + "type": "kinesis", + "meta": { + "stream_name": "my-stream", + "region": "eu-west-2", + "batch_size": 100 + } + }, +``` + +###### Env Variables + +``` +#Kinesis Pump Configuration +TYK_PMP_PUMPS_KINESIS_TYPE=kinesis +TYK_PMP_PUMPS_KINESIs_META_STREAMNAME=my-stream +TYK_PMP_PUMPS_KINESIS_META_REGION=eu-west-2 +TYK_PMP_PUMPS_KINESIS_META_BATCHSIZE=100 +``` + + + # Base Pump Configurations The following configurations can be added to any Pump. Keep reading for an example. From a1ae1cd00547e79ee80d72c3711a85dd5240cbfa Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Fri, 5 Jul 2024 17:34:38 +0100 Subject: [PATCH 03/12] go mod tidy --- go.mod | 11 +++++++---- go.sum | 24 ++++++++++++++---------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 3350d3637..ef3bf705a 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,10 @@ require ( github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 github.com/TykTechnologies/storage v1.2.0 - github.com/aws/aws-sdk-go-v2 v1.22.1 + github.com/aws/aws-sdk-go-v2 v1.30.1 github.com/aws/aws-sdk-go-v2/config v1.9.0 github.com/aws/aws-sdk-go-v2/credentials v1.6.1 + github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1 github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 github.com/cenkalti/backoff/v4 v4.0.2 @@ -57,15 +58,16 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 // indirect - github.com/aws/smithy-go v1.16.0 // indirect + github.com/aws/smithy-go v1.20.3 // indirect github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect @@ -89,6 +91,7 @@ require ( github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/joho/godotenv v1.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.15.9 // indirect diff --git a/go.sum b/go.sum index 91c4837b0..954ae1026 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,10 @@ github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE= github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= -github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U= -github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c= +github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o= +github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 h1:tW1/Rkad38LA15X4UQtjXZXNKsCgkshC3EbmcUmghTg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3/go.mod h1:UbnqO+zjqk3uIt9yCACHJ9IVNhyhOCnYk8yA19SAWrM= github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s= github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ= github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0= @@ -42,12 +44,12 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 h1:5SAoZ4jYpGH4721ZNoS1znQrhOfZinOhc4XuTXx/nVc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13/go.mod h1:+rdA6ZLpaSeM7tSg/B0IEDinCIBJGmW8rKDFkYpP04g= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 h1:WIijqeaAO7TYFLbhsZmi2rgLEAtWOC1LhxCAVTJlSKw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13/go.mod h1:i+kbfa76PQbWw/ULoWnp51EYVWH4ENln76fLQE3lXT8= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= @@ -55,6 +57,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1 h1:UIEtjoWh7oqjHXdgdjOP/tinga1uKR9F//tiUNshE7w= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1/go.mod h1:tqz5Yq7ohiQIQ7qrj6e2fWJbT1Owq9zEo78mZb/+eWU= github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE= github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU= github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs= @@ -67,8 +71,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc= github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik= -github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= +github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= +github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -186,7 +190,6 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= -github.com/jackc/pgconn v1.10.0 h1:4EYhlDVEMsJ30nNj0mmgwIUXoq7e9sMJrVC2ED6QlCU= github.com/jackc/pgconn v1.10.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= @@ -207,7 +210,6 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= @@ -233,7 +235,9 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI= github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= From eec9cb523ab27e0286526ff94a60ba38ccea61aa Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 14:34:27 +0100 Subject: [PATCH 04/12] gofumpt and goimports --- pumps/kinesis.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index 8dca8dc84..eff4f4780 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -3,10 +3,11 @@ package pumps import ( "context" "encoding/json" + "math/rand" + "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" - "math/rand" "strconv" From d7207e98d611dcfd03ba3f9b3d64ae232849ee6a Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 14:38:38 +0100 Subject: [PATCH 05/12] formatting --- pumps/kinesis.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index eff4f4780..fc3e34099 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -4,13 +4,12 @@ import ( "context" "encoding/json" "math/rand" + "strconv" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" - "strconv" - "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -43,8 +42,10 @@ type KinesisConf struct { Region string `mapstructure:"region"` } -var kinesisPrefix = "kinesis-pump" -var kinesisDefaultENV = PUMPS_ENV_PREFIX + "_KINESIS" + PUMPS_ENV_META_PREFIX +var ( + kinesisPrefix = "kinesis-pump" + kinesisDefaultENV = PUMPS_ENV_PREFIX + "_KINESIS" + PUMPS_ENV_META_PREFIX +) func (p *KinesisPump) New() Pump { newPump := KinesisPump{} @@ -53,10 +54,9 @@ func (p *KinesisPump) New() Pump { // Init initializes the pump with configuration settings. func (p *KinesisPump) Init(config interface{}) error { - p.log = log.WithField("prefix", kinesisPrefix) - //Read configuration file + // Read configuration file p.kinesisConf = &KinesisConf{} err := mapstructure.Decode(config, &p.kinesisConf) if err != nil { @@ -73,7 +73,7 @@ func (p *KinesisPump) Init(config interface{}) error { p.log.Fatalf("unable to load Kinesis SDK config, %v", err) } - var defaultBatchSize = 100 + defaultBatchSize := 100 if p.kinesisConf.BatchSize == 0 { p.kinesisConf.BatchSize = defaultBatchSize } @@ -87,18 +87,16 @@ func (p *KinesisPump) Init(config interface{}) error { p.log.Info(p.GetName() + " Initialized") return nil - } // WriteData writes the analytics records to AWS Kinesis in batches. func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) error { - // batches := splitIntoBatches(records, p.kinesisConf.BatchSize) for _, batch := range batches { var entries []types.PutRecordsRequestEntry for _, record := range batch { - //Build message format + // Build message format decoded := record.(analytics.AnalyticsRecord) message := Json{ "timestamp": decoded.TimeStamp, @@ -123,7 +121,7 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro "tags": decoded.Tags, } - //Transform object to json string + // Transform object to json string json, jsonError := json.Marshal(message) if jsonError != nil { p.log.WithError(jsonError).Error("unable to marshal message") From c1a03b9f7bd19f49f4aec3aa5006e6d998274a8a Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 15:11:18 +0100 Subject: [PATCH 06/12] golangci lint issues --- pumps/kinesis.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index fc3e34099..3dfb9112e 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -26,6 +26,10 @@ type KinesisPump struct { // @PumpConf Kinesis type KinesisConf struct { + // Each PutRecords (the function used in this pump)request can support up to 500 records. + // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. + // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. + BatchSize int `mapstructure:"batch_size"` // The prefix for the environment variables that will be used to override the configuration. // Defaults to `TYK_PMP_PUMPS_KINESIS_META` EnvPrefix string `mapstructure:"meta_env_prefix"` @@ -34,10 +38,6 @@ type KinesisConf struct { // That is, two streams in two different AWS accounts can have the same name. // Two streams in the same AWS account but in two different Regions can also have the same name. StreamName string `mapstructure:"stream_name"` - // Each PutRecords (the function used in this pump)request can support up to 500 records. - // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. - // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. - BatchSize int `mapstructure:"batch_size"` // AWS Region the Kinesis stream targets Region string `mapstructure:"region"` } @@ -95,7 +95,6 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro for _, batch := range batches { var entries []types.PutRecordsRequestEntry for _, record := range batch { - // Build message format decoded := record.(analytics.AnalyticsRecord) message := Json{ From c3a2c64aa0e230d7ac8be9d2093aa7507d880791 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 15:18:02 +0100 Subject: [PATCH 07/12] golangci lint issues again --- pumps/kinesis.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index 3dfb9112e..47f047b36 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -26,10 +26,6 @@ type KinesisPump struct { // @PumpConf Kinesis type KinesisConf struct { - // Each PutRecords (the function used in this pump)request can support up to 500 records. - // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. - // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. - BatchSize int `mapstructure:"batch_size"` // The prefix for the environment variables that will be used to override the configuration. // Defaults to `TYK_PMP_PUMPS_KINESIS_META` EnvPrefix string `mapstructure:"meta_env_prefix"` @@ -40,6 +36,10 @@ type KinesisConf struct { StreamName string `mapstructure:"stream_name"` // AWS Region the Kinesis stream targets Region string `mapstructure:"region"` + // Each PutRecords (the function used in this pump)request can support up to 500 records. + // Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys. + // Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second. + BatchSize int `mapstructure:"batch_size"` } var ( @@ -154,7 +154,6 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro p.log.Debug(record) } p.log.Info("Purged ", len(output.Records), " records...") - } } return nil From 9253b8abb134400d806c6445294c0627fc3bc0e8 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 15:49:43 +0100 Subject: [PATCH 08/12] resolve comments and lint errors --- pumps/kinesis.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index 47f047b36..26d879336 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -96,8 +96,12 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro var entries []types.PutRecordsRequestEntry for _, record := range batch { // Build message format - decoded := record.(analytics.AnalyticsRecord) - message := Json{ + decoded, ok := record.(analytics.AnalyticsRecord) + if !ok { + p.log.WithField("record", record).Error("unable to decode record") + continue + } + analyticsRecord := Json{ "timestamp": decoded.TimeStamp, "method": decoded.Method, "path": decoded.Path, @@ -121,7 +125,7 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro } // Transform object to json string - json, jsonError := json.Marshal(message) + json, jsonError := json.Marshal(analyticsRecord) if jsonError != nil { p.log.WithError(jsonError).Error("unable to marshal message") } @@ -140,7 +144,7 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro Records: entries, } - output, err := p.client.PutRecords(context.TODO(), input) + output, err := p.client.PutRecords(ctx, input) if err != nil { p.log.Error("failed to put records to Kinesis: ", err) } From 1bf1f7a4e2bf5c09ff40d60fed4a3b5f9f767854 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 15:51:27 +0100 Subject: [PATCH 09/12] no lint dupe --- pumps/kinesis.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index 26d879336..a004571aa 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -101,6 +101,7 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro p.log.WithField("record", record).Error("unable to decode record") continue } + //nolint:dupl analyticsRecord := Json{ "timestamp": decoded.TimeStamp, "method": decoded.Method, From a8d77061bb22acccd7e6154f1efe9d88289eaf80 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 15:55:13 +0100 Subject: [PATCH 10/12] try math/rand/v2 --- pumps/kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index a004571aa..adcf21cb9 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -3,7 +3,7 @@ package pumps import ( "context" "encoding/json" - "math/rand" + "math/rand/v2" "strconv" "github.com/TykTechnologies/tyk-pump/analytics" From 44d0857b8e9b4e508caf627612180bf241c23340 Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 16:10:05 +0100 Subject: [PATCH 11/12] crypto/rand to satisfy linter even though it is not needed --- pumps/kinesis.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index adcf21cb9..197bff11f 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -2,13 +2,13 @@ package pumps import ( "context" + "crypto/rand" "encoding/json" - "math/rand/v2" - "strconv" - + "fmt" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" + "math/big" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" @@ -131,11 +131,16 @@ func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) erro p.log.WithError(jsonError).Error("unable to marshal message") } + n, err := rand.Int(rand.Reader, big.NewInt(1000000000)) + if err != nil { + p.log.Error("failed to generate int for Partition key: ", err) + } + // Partition key uses a string representation of Int // Should even distribute across shards as AWS uses md5 of each message partition key entry := types.PutRecordsRequestEntry{ Data: json, - PartitionKey: aws.String(strconv.Itoa(rand.Int())), + PartitionKey: aws.String(fmt.Sprint(n)), } entries = append(entries, entry) } From 0abfb6e1af7d1ec950cc8991030aa505152cd0fc Mon Sep 17 00:00:00 2001 From: joshblakeley Date: Mon, 8 Jul 2024 16:11:59 +0100 Subject: [PATCH 12/12] gofumpt --- pumps/kinesis.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pumps/kinesis.go b/pumps/kinesis.go index 197bff11f..cb8c2e247 100644 --- a/pumps/kinesis.go +++ b/pumps/kinesis.go @@ -5,10 +5,11 @@ import ( "crypto/rand" "encoding/json" "fmt" + "math/big" + "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" - "math/big" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config"