From 79326ef6eae3db08602dc5e881355ca882db3583 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 12:58:48 -0500 Subject: [PATCH 01/26] updated deps --- go.mod | 17 +++++++++-------- go.sum | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 1d4988a..b4ff0f4 100644 --- a/go.mod +++ b/go.mod @@ -6,27 +6,28 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/confluentinc/confluent-kafka-go v1.8.2 github.com/go-zookeeper/zk v1.0.2 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.2 + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.3 github.com/jamiealquiza/envy v1.1.0 github.com/spf13/cobra v1.3.0 github.com/stretchr/testify v1.7.0 github.com/zorkian/go-datadog-api v2.30.0+incompatible - google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa - google.golang.org/grpc v1.43.0 + google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 + google.golang.org/grpc v1.44.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 google.golang.org/protobuf v1.27.1 - gopkg.in/DataDog/dd-trace-go.v1 v1.34.0 + gopkg.in/DataDog/dd-trace-go.v1 v1.36.0 ) require ( - github.com/DataDog/datadog-go v4.4.0+incompatible // indirect + github.com/DataDog/datadog-go v4.8.3+incompatible // indirect + github.com/DataDog/datadog-go/v5 v5.0.2 // indirect github.com/DataDog/gostackparse v0.5.0 // indirect github.com/Microsoft/go-winio v0.5.1 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + github.com/google/pprof v0.0.0-20220128192902-513e8ac6eea1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect @@ -34,8 +35,8 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.1.1 // indirect - golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect - golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect + golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect golang.org/x/text v0.3.7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index c9387ea..3a4f0cb 100644 --- a/go.sum +++ b/go.sum @@ -47,15 +47,22 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/datadog-agent/pkg/obfuscate v0.0.0-20211129110424-6491aa3bf583/go.mod h1:EP9f4GqaDJyP1F5jTNMtzdIpw3JpNs3rMSJOnYywCiw= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v4.4.0+incompatible h1:R7WqXWP4fIOAqWJtUKmSfuc7eDsBT58k9AY5WSHVosk= github.com/DataDog/datadog-go v4.4.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/datadog-go v4.8.2+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q= +github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/datadog-go/v5 v5.0.2 h1:UFtEe7662/Qojxkw1d6SboAeA0CPI3naKhVASwFn+04= +github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU= github.com/DataDog/gostackparse v0.5.0 h1:jb72P6GFHPHz2W0onsN51cS3FkaMDcjb0QzgxxA4gDk= github.com/DataDog/gostackparse v0.5.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM= github.com/DataDog/sketches-go v1.0.0 h1:chm5KSXO7kO+ywGWJ0Zs6tdmWU8PBXSbywFVciL6BG4= github.com/DataDog/sketches-go v1.0.0/go.mod h1:O+XkJHWk9w4hDwY2ZUDU31ZC9sNYlYo8DiFsxjYeo1k= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.5.1 h1:aPJp2QD7OOrhO5tQXqQoGSJc+DjDtWTGLOmNyAm6FgY= github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -104,6 +111,9 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -205,6 +215,8 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20220128192902-513e8ac6eea1 h1:fBq9NM3cpLvsvc+RkweQROGEeYx7QEk35Htz7bAmyuM= +github.com/google/pprof v0.0.0-20220128192902-513e8ac6eea1/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -217,6 +229,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.2 h1:I/pwhnUln5wbMnTyRbzswA0/JxpK8sZj0aUfI3TV1So= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.2/go.mod h1:lsuH8kb4GlMdSlI4alNIBBSAt5CHJtg3i+0WuN9J5YM= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.3 h1:I8MsauTJQXZ8df8qJvEln0kYNc3bSapuaSsEsnFdEFU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.3/go.mod h1:lZdb/YAJUSj9OqrCHs2ihjtoO3+xK3G53wTYXFWRGDo= github.com/hashicorp/consul/api v1.12.0/go.mod h1:6pVBMo0ebnYdt2S3H87XhekM/HHrUoTD2XXb/VrZVy0= github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -247,10 +261,12 @@ github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jamiealquiza/envy v1.1.0 h1:Nwh4wqTZ28gDA8zB+wFkhnUpz3CEcO12zotjeqqRoKE= github.com/jamiealquiza/envy v1.1.0/go.mod h1:MP36BriGCLwEHhi1OU8E9569JNZrjWfCvzG7RsPnHus= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -271,6 +287,7 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -464,6 +481,8 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -558,6 +577,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -573,6 +594,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -733,6 +755,8 @@ google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa h1:I0YcKz0I7OAhddo7ya8kMnvprhcWM045PmkBdMO9zN0= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 h1:YxHp5zqIcAShDEvRr5/0rVESVS+njYF68PSdazrNLJo= +google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -761,6 +785,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.44.0 h1:weqSxi/TMs1SqFRMHCtBgXRs8k3X39QIDEZ0pRcttUg= +google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -779,6 +805,8 @@ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+Rur google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/DataDog/dd-trace-go.v1 v1.34.0 h1:HQqGul25XkYUuNmk8F5tYQNxSUsOVFtZdimfiprSl7Q= gopkg.in/DataDog/dd-trace-go.v1 v1.34.0/go.mod h1:HtrC65fyJ6lWazShCC9rlOeiTSZJ0XtZhkwjZM2WpC4= +gopkg.in/DataDog/dd-trace-go.v1 v1.36.0 h1:t2KEcCXajtchpvoIGm0xU+Ytj8KkRyxsXVhWOGg6lEk= +gopkg.in/DataDog/dd-trace-go.v1 v1.36.0/go.mod h1:Cv0Bzs/zTzzrUDSw8Q+q/vC+uwPD+R530npGo0lfiCE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From c9b6a7194fe4288ce60894d6516db5faf1f54d67 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 14:18:53 -0500 Subject: [PATCH 02/26] [registry] configurable default API request timeout --- cmd/registry/main.go | 4 +++ registry/server/api_topics.go | 2 ++ registry/server/server.go | 50 ++++++++++++++++++++--------------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 04e7827..c3de8ec 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -9,6 +9,7 @@ import ( "os/signal" "strings" "sync" + "time" "github.com/DataDog/kafka-kit/v3/kafkaadmin" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -52,6 +53,7 @@ func main() { flag.StringVar(&adminConfig.SASLMechanism, "kafka-sasl-mechanism", "", fmt.Sprintf("SASL mechanism to use for authentication. Supported: %s", strings.Join(saslMechanims, ", "))) flag.StringVar(&adminConfig.SASLUsername, "kafka-sasl-username", "", "SASL username for use with the PLAIN and SASL-SCRAM-* mechanisms") flag.StringVar(&adminConfig.SASLPassword, "kafka-sasl-password", "", "SASL password for use with the PLAIN and SASL-SCRAM-* mechanisms") + defaultRequestTimeout := flag.Int("default-request-timeout", 5000, "Default request API request timeout in milliseconds. API request deadlines are also automatically capped to 3x this value.") flag.IntVar(&serverConfig.TagAllowedStalenessMinutes, "tag-allowed-staleness", 60, "Minutes before tags with no associated resource are deleted") flag.IntVar(&serverConfig.TagCleanupFrequencyMinutes, "tag-cleanup-frequency", 20, "Minutes between runs of tag cleanup") @@ -60,6 +62,8 @@ func main() { envy.Parse("REGISTRY") flag.Parse() + serverConfig.DefaultRequestTimeout = time.Duration(*defaultRequestTimeout) * time.Millisecond + if *v { fmt.Println(version) os.Exit(0) diff --git a/registry/server/api_topics.go b/registry/server/api_topics.go index 0b21b41..35c9964 100644 --- a/registry/server/api_topics.go +++ b/registry/server/api_topics.go @@ -254,6 +254,8 @@ func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (* } if err = s.kafkaadmin.CreateTopic(ctx, cfg); err != nil { + // XXX: sometimes topics fail to create but no error is returned. One example + // is if a replication higher than the number of available brokers is attempted. return empty, err } diff --git a/registry/server/server.go b/registry/server/server.go index 8fa9ff6..7a74f97 100644 --- a/registry/server/server.go +++ b/registry/server/server.go @@ -33,17 +33,17 @@ const ( // Server implements the registry APIs. type Server struct { pb.UnimplementedRegistryServer - Locking cluster.Lock - HTTPListen string - GRPCListen string - ZK kafkazk.Handler - kafkaadmin kafkaadmin.KafkaAdmin - Tags *TagHandler - reqTimeout time.Duration - readReqThrottle RequestThrottle - writeReqThrottle RequestThrottle - reqID uint64 - kafkaconsumer *kafka.Consumer + Locking cluster.Lock + HTTPListen string + GRPCListen string + ZK kafkazk.Handler + kafkaadmin kafkaadmin.KafkaAdmin + Tags *TagHandler + defaultRequestTimeout time.Duration + readReqThrottle RequestThrottle + writeReqThrottle RequestThrottle + reqID uint64 + kafkaconsumer *kafka.Consumer // For tests. test bool } @@ -55,6 +55,7 @@ type Config struct { ReadReqRate int WriteReqRate int ZKTagsPrefix string + DefaultRequestTimeout time.Duration TagCleanupFrequencyMinutes int TagAllowedStalenessMinutes int @@ -87,14 +88,14 @@ func NewServer(c Config) (*Server, error) { th, _ := NewTagHandler(tcfg) return &Server{ - Locking: dummyLock{}, - HTTPListen: c.HTTPListen, - GRPCListen: c.GRPCListen, - Tags: th, - reqTimeout: 3000 * time.Millisecond, - readReqThrottle: rrt, - writeReqThrottle: wrt, - test: c.test, + Locking: dummyLock{}, + HTTPListen: c.HTTPListen, + GRPCListen: c.GRPCListen, + Tags: th, + defaultRequestTimeout: c.DefaultRequestTimeout, + readReqThrottle: rrt, + writeReqThrottle: wrt, + test: c.test, }, nil } @@ -358,10 +359,15 @@ func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) var cancel context.CancelFunc // Check if the incoming context has a deadline set. - if _, ok := ctx.Deadline(); ok { + // configuredDeadline, deadlineSet := ctx.Deadline() + _, deadlineSet := ctx.Deadline() + switch { + // No deadline set, use our default. + case !deadlineSet: + cCtx, cancel = context.WithTimeout(ctx, s.defaultRequestTimeout) + // An acceptable deadline was configured. + default: cCtx = ctx - } else { - cCtx, cancel = context.WithTimeout(ctx, s.reqTimeout) } cCtx = context.WithValue(cCtx, "reqID", reqID) From 956e091ff4d260cc924cb7c1fae82a17dadd40d5 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 14:29:11 -0500 Subject: [PATCH 03/26] [registry] imposes a maximum deadline --- registry/server/server.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/registry/server/server.go b/registry/server/server.go index 7a74f97..919f84a 100644 --- a/registry/server/server.go +++ b/registry/server/server.go @@ -359,12 +359,17 @@ func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) var cancel context.CancelFunc // Check if the incoming context has a deadline set. - // configuredDeadline, deadlineSet := ctx.Deadline() - _, deadlineSet := ctx.Deadline() + configuredDeadline, deadlineSet := ctx.Deadline() + maxDeadline := s.defaultRequestTimeout * 3 + deadlineLimit := time.Now().Add(maxDeadline) + switch { // No deadline set, use our default. case !deadlineSet: cCtx, cancel = context.WithTimeout(ctx, s.defaultRequestTimeout) + // A deadline was set, but it's longer than we permit. + case configuredDeadline.After(deadlineLimit): + cCtx, cancel = context.WithTimeout(ctx, maxDeadline) // An acceptable deadline was configured. default: cCtx = ctx From df4eeec0cb0932bfd307565f7b35bf379038179c Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:01:29 -0500 Subject: [PATCH 04/26] [registry] testServer DefaultRequestTimeout --- registry/server/helpers_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/registry/server/helpers_test.go b/registry/server/helpers_test.go index dd73459..88cda72 100644 --- a/registry/server/helpers_test.go +++ b/registry/server/helpers_test.go @@ -26,6 +26,7 @@ func testServer() *Server { s, _ := NewServer(Config{ ReadReqRate: 10, WriteReqRate: 10, + DefaultRequestTimeout: 5*time.Second, ZKTagsPrefix: testConfig.Prefix, test: true, }) From f3fe0f1ac77766b4e9ef0a8ccaef863bc079a015 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:02:25 -0500 Subject: [PATCH 05/26] [registry] Tags, TagSet Keys method, tests --- registry/server/tag.go | 87 ++++++++++++++++++++----------------- registry/server/tag_test.go | 55 +++++++++++++++++++++++ 2 files changed, 102 insertions(+), 40 deletions(-) diff --git a/registry/server/tag.go b/registry/server/tag.go index dd2ce64..454ba76 100644 --- a/registry/server/tag.go +++ b/registry/server/tag.go @@ -32,8 +32,7 @@ func (e ErrReservedTag) Error() string { return fmt.Sprintf("tag '%s' is a reserved tag", e.t) } -// TagHandler provides object filtering by tags -// along with tag storage and retrieval. +// TagHandler provides object filtering by tags along with tag storage and retrieval. type TagHandler struct { Store TagStorage } @@ -61,8 +60,8 @@ func NewTagHandler(c TagHandlerConfig) (*TagHandler, error) { } return &TagHandler{ - // More sophisticated initialization/config passing - // if additional TagStorage backends are written. + // More sophisticated initialization/config passing if additional TagStorage + // backends are written. Store: ts, }, nil } @@ -78,15 +77,14 @@ type Tags []string // TagSet is a map of key:values. type TagSet map[string]string -// KafkaObject holds an object type (broker, topic) and -// object identifier (ID, name). +// KafkaObject holds an object type (broker, topic) and object identifier (ID, +// name). type KafkaObject struct { Type string ID string } -// Valid checks if a KafkaObject has a valid -// Type field value. +// Valid checks if a KafkaObject has a valid Type field value. func (o KafkaObject) Valid() bool { switch { case o.Type == "broker", o.Type == "topic": @@ -96,14 +94,13 @@ func (o KafkaObject) Valid() bool { return false } -// Complete checks if a KafkaObject is valid -// and has a non-empty ID field value. +// Complete checks if a KafkaObject is valid and has a non-empty ID field value. func (o KafkaObject) Complete() bool { return o.Valid() && o.ID != "" } -// TagSetFromObject takes a protobuf type and returns the -// default TagSet along with any user-defined tags. +// TagSetFromObject takes a protobuf type and returns the default TagSet along +// with any user-defined tags. func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error) { var ts = TagSet{} var ko = KafkaObject{} @@ -138,9 +135,8 @@ func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error) { st, err := t.Store.GetTags(ko) if err != nil { switch { - // ErrKafkaObjectDoesNotExist from TagStorage - // simply means we do not have any user-defined - // tags stored for the requested object. + // ErrKafkaObjectDoesNotExist from TagStorage simply means we do not have any + // user-defined tags stored for the requested object. case err == ErrKafkaObjectDoesNotExist: break default: @@ -180,8 +176,8 @@ func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error) { if ts.matchAll(tagKV) { out[name] = topic - // Ensure that custom tags fetched from storage are - // populated into the tags field for the object. + // Ensure that custom tags fetched from storage are populated into the + // tags field for the object. for k, v := range ts { // Custom tags are any non-reserved object fields. if !t.Store.FieldReserved(KafkaObject{Type: "topic"}, k) { @@ -198,11 +194,10 @@ func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error) { return out, nil } -// FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. -// A filtered map is returned that includes brokers where all tags -// values match the provided input tag KVs. Additionally, any custom -// tags persisted in the TagStorage backend are populated into the -// Tags field for each matched object. +// FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A +// filtered map is returned that includes brokers where all tags values match +// the provided input tag KVs. Additionally, any custom tags persisted in the +// TagStorage backend are populated into the Tags field for each matched object. func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error) { var out = make(BrokerSet) @@ -222,8 +217,8 @@ func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error) { if ts.matchAll(tagKV) { out[id] = broker - // Ensure that custom tags fetched from storage are - // populated into the tags field for the object. + // Ensure that custom tags fetched from storage are populated into the tags + // field for the object. for k, v := range ts { // Custom tags are any non-reserved object fields. if !t.Store.FieldReserved(KafkaObject{Type: "broker"}, k) { @@ -240,9 +235,21 @@ func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error) { return out, nil } -// matchAll takes a TagSet and returns true -// if all key/values are present and equal -// to those in the input TagSet. +// Keys returns a []string of all tag keys for a Tags. It's possible to receive +// fully formed tags or just tag keys. +func (t Tags) Keys() []string { + var keys []string + + for _, tag := range t { + kv := strings.Split(tag, ":") + keys = append(keys, kv[0]) + } + + return keys +} + +// matchAll takes a TagSet and returns true if all key/values are present and +// equal to those in the input TagSet. func (t TagSet) matchAll(kv TagSet) bool { for k, v := range kv { if t[k] != v { @@ -253,8 +260,8 @@ func (t TagSet) matchAll(kv TagSet) bool { return true } -// Equal checks if the input TagSet has the same -// key:value pairs as the calling TagSet. +// Equal checks if the input TagSet has the same key:value pairs as the calling +// TagSet. func (t1 TagSet) Equal(t2 TagSet) bool { if len(t1) != len(t2) { return false @@ -291,9 +298,9 @@ func (t TagSet) Keys() []string { return keys } -// TagSet takes a Tags and returns a TagSet and error for any -// malformed tags. Tags are expected to be formatted as a -// comma delimited "key:value,key2:value2" string. +// TagSet takes a Tags and returns a TagSet and error for any malformed tags. +// Tags are expected to be formatted as a comma delimited "key:value,key2:value2" +// string. // TODO normalize all tag usage to lower case. func (t Tags) TagSet() (TagSet, error) { var ts = TagSet{} @@ -310,14 +317,14 @@ func (t Tags) TagSet() (TagSet, error) { return ts, nil } -// ReservedFields is a mapping of object types (topic, broker) -// to a set of fields reserved for internal use; these are -// default fields that become searchable through the tags interface. +// ReservedFields is a mapping of object types (topic, broker) to a set of fields +// reserved for internal use; these are default fields that become searchable +// through the tags interface. type ReservedFields map[string]map[string]struct{} -// GetReservedFields returns a map proto message types to field names -// considered reserved for internal use. All fields specified in the -// Registry proto messages are discovered here and reserved by default. +// GetReservedFields returns a map proto message types to field names considered +// reserved for internal use. All fields specified in the Registry proto messages +// are discovered here and reserved by default. func GetReservedFields() ReservedFields { var fs = make(ReservedFields) @@ -327,8 +334,8 @@ func GetReservedFields() ReservedFields { return fs } -// fieldsFromStruct extracts all user-defined fields from proto -// messages. Discovered fields are returned all lowercase. +// fieldsFromStruct extracts all user-defined fields from proto messages. +// Discovered fields are returned all lowercase. func fieldsFromStruct(s interface{}) map[string]struct{} { var fs = make(map[string]struct{}) diff --git a/registry/server/tag_test.go b/registry/server/tag_test.go index 12c5776..ba5c288 100644 --- a/registry/server/tag_test.go +++ b/registry/server/tag_test.go @@ -5,6 +5,8 @@ import ( "testing" pb "github.com/DataDog/kafka-kit/v3/registry/registry" + + "github.com/stretchr/testify/assert" ) func TestTagSetFromObject(t *testing.T) { @@ -34,6 +36,59 @@ func TestTagSetFromObject(t *testing.T) { } } +func TestTagSetKeys(t *testing.T) { + type testCase struct { + input TagSet + expected []string + } + + tests := []testCase{ + // Single KV. + { + input: TagSet{"myKey": "myValue"}, + expected: []string{"myKey"}, + }, + // Multiple KV. + { + input: TagSet{"myKey": "myValue", "myKey2": "myValue2"}, + expected: []string{"myKey", "myKey2"}, + }, + } + + for _, test := range tests { + results := test.input.Keys() + sort.Strings(results) + assert.Equal(t, test.expected, results) + } +} + + +func TestTagsKeys(t *testing.T) { + type testCase struct { + input Tags + expected []string + } + + tests := []testCase{ + // Single tag. + { + input: Tags{"myKey:myValue"}, + expected: []string{"myKey"}, + }, + // Multiple tags, mixed kv and k. + { + input: Tags{"myKey:myValue", "myKey2"}, + expected: []string{"myKey", "myKey2"}, + }, + } + + for _, test := range tests { + results := test.input.Keys() + sort.Strings(results) + assert.Equal(t, test.expected, results) + } +} + func TestMatchAll(t *testing.T) { ts := TagSet{ "k1": "v1", From e95d3c5d30b825ed7d189b3e21784702ca792da2 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:06:19 -0500 Subject: [PATCH 06/26] [registry] testIntegrationServer DefaultRequestTimeout --- registry/server/helpers_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/registry/server/helpers_test.go b/registry/server/helpers_test.go index 88cda72..3829185 100644 --- a/registry/server/helpers_test.go +++ b/registry/server/helpers_test.go @@ -41,6 +41,7 @@ func testIntegrationServer() (*Server, error) { s, _ := NewServer(Config{ HTTPListen: "localhost:8080", GRPCListen: "localhost:8090", + DefaultRequestTimeout: 5*time.Second, ReadReqRate: 10, WriteReqRate: 10, ZKTagsPrefix: testConfig.Prefix, From 72c7b623d2e2a1c05f11a845cfcc342d663128f9 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:07:32 -0500 Subject: [PATCH 07/26] [registry] DeleteBrokerTags, DeleteTopicTags RPCs ignore tag values --- registry/server/api_brokers.go | 2 +- registry/server/api_topics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/registry/server/api_brokers.go b/registry/server/api_brokers.go index 5e67411..ff132f2 100644 --- a/registry/server/api_brokers.go +++ b/registry/server/api_brokers.go @@ -348,7 +348,7 @@ func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (* // Delete the tags. id := fmt.Sprintf("%d", req.Id) - err = s.Tags.Store.DeleteTags(KafkaObject{Type: "broker", ID: id}, req.Tag) + err = s.Tags.Store.DeleteTags(KafkaObject{Type: "broker", ID: id}, Tags(req.Tag).Keys()) if err != nil { return nil, err } diff --git a/registry/server/api_topics.go b/registry/server/api_topics.go index 35c9964..cf37a09 100644 --- a/registry/server/api_topics.go +++ b/registry/server/api_topics.go @@ -456,7 +456,7 @@ func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb } // Delete the tags. - err = s.Tags.Store.DeleteTags(KafkaObject{Type: "topic", ID: req.Name}, req.Tag) + err = s.Tags.Store.DeleteTags(KafkaObject{Type: "topic", ID: req.Name}, Tags(req.Tag).Keys()) if err != nil { return nil, err } From 53b95bc5bcd4c93ddd16ff37c59e6db3b2c4b754 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:23:38 -0500 Subject: [PATCH 08/26] [cluster] adds UnlockLogError to Lock interface, implements in ZooKeeper --- cluster/cluster.go | 1 + cluster/zookeeper/locking.go | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index b52172c..ad0dabf 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -13,6 +13,7 @@ type Lock interface { // through this interface. A context is accepted for setting wait bounds. Lock(context.Context) error Unlock(context.Context) error + UnlockLogError(context.Context) // Owner returns the current owner value. Owner() interface{} } diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index 9716ad2..5a07bea 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -3,6 +3,7 @@ package zookeeper import ( "context" "fmt" + "log" "time" "github.com/go-zookeeper/zk" @@ -119,6 +120,13 @@ func (z *ZooKeeperLock) Unlock(ctx context.Context) error { return nil } +// Unlock releases a lock and logs, rather than returning, any errors if encountered. +func (z *ZooKeeperLock) UnlockLogError(ctx context.Context) { + if err := z.Unlock(ctx); err != nil { + log.Println(err) + } +} + func (z *ZooKeeperLock) deleteLockZnode(p string) error { // We have to get the znode first; the current version is required for // the delete request. From f53dfa332fc40501bfd9bbe1a2409bb98879f41c Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:23:47 -0500 Subject: [PATCH 09/26] gofmt --- registry/server/api_topics.go | 2 +- registry/server/helpers_test.go | 22 +++++++++++----------- registry/server/server.go | 7 ++++--- registry/server/tag_test.go | 13 ++++++------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/registry/server/api_topics.go b/registry/server/api_topics.go index cf37a09..f64c66c 100644 --- a/registry/server/api_topics.go +++ b/registry/server/api_topics.go @@ -180,7 +180,7 @@ func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (* if err := s.Locking.Lock(ctx); err != nil { return nil, err } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) // If we're targeting a specific set of brokers by tag, build // a replica assignment. diff --git a/registry/server/helpers_test.go b/registry/server/helpers_test.go index 3829185..bd8a3f0 100644 --- a/registry/server/helpers_test.go +++ b/registry/server/helpers_test.go @@ -24,11 +24,11 @@ var ( func testServer() *Server { s, _ := NewServer(Config{ - ReadReqRate: 10, - WriteReqRate: 10, - DefaultRequestTimeout: 5*time.Second, - ZKTagsPrefix: testConfig.Prefix, - test: true, + ReadReqRate: 10, + WriteReqRate: 10, + DefaultRequestTimeout: 5 * time.Second, + ZKTagsPrefix: testConfig.Prefix, + test: true, }) s.ZK = kafkazk.NewZooKeeperStub() @@ -39,12 +39,12 @@ func testServer() *Server { func testIntegrationServer() (*Server, error) { s, _ := NewServer(Config{ - HTTPListen: "localhost:8080", - GRPCListen: "localhost:8090", - DefaultRequestTimeout: 5*time.Second, - ReadReqRate: 10, - WriteReqRate: 10, - ZKTagsPrefix: testConfig.Prefix, + HTTPListen: "localhost:8080", + GRPCListen: "localhost:8090", + DefaultRequestTimeout: 5 * time.Second, + ReadReqRate: 10, + WriteReqRate: 10, + ZKTagsPrefix: testConfig.Prefix, }) wg := &sync.WaitGroup{} diff --git a/registry/server/server.go b/registry/server/server.go index 919f84a..b064364 100644 --- a/registry/server/server.go +++ b/registry/server/server.go @@ -438,6 +438,7 @@ func (s *Server) LogRequest(ctx context.Context, params string, reqID uint64) { type dummyLock struct{} -func (dl dummyLock) Lock(_ context.Context) error { return nil } -func (dl dummyLock) Unlock(_ context.Context) error { return nil } -func (dl dummyLock) Owner() interface{} { return nil } +func (dl dummyLock) Lock(_ context.Context) error { return nil } +func (dl dummyLock) Unlock(_ context.Context) error { return nil } +func (dl dummyLock) UnlockLogError(_ context.Context) { return } +func (dl dummyLock) Owner() interface{} { return nil } diff --git a/registry/server/tag_test.go b/registry/server/tag_test.go index ba5c288..d6d28a9 100644 --- a/registry/server/tag_test.go +++ b/registry/server/tag_test.go @@ -38,19 +38,19 @@ func TestTagSetFromObject(t *testing.T) { func TestTagSetKeys(t *testing.T) { type testCase struct { - input TagSet + input TagSet expected []string } tests := []testCase{ // Single KV. { - input: TagSet{"myKey": "myValue"}, + input: TagSet{"myKey": "myValue"}, expected: []string{"myKey"}, }, // Multiple KV. { - input: TagSet{"myKey": "myValue", "myKey2": "myValue2"}, + input: TagSet{"myKey": "myValue", "myKey2": "myValue2"}, expected: []string{"myKey", "myKey2"}, }, } @@ -62,22 +62,21 @@ func TestTagSetKeys(t *testing.T) { } } - func TestTagsKeys(t *testing.T) { type testCase struct { - input Tags + input Tags expected []string } tests := []testCase{ // Single tag. { - input: Tags{"myKey:myValue"}, + input: Tags{"myKey:myValue"}, expected: []string{"myKey"}, }, // Multiple tags, mixed kv and k. { - input: Tags{"myKey:myValue", "myKey2"}, + input: Tags{"myKey:myValue", "myKey2"}, expected: []string{"myKey", "myKey2"}, }, } From 02af090b9b0acb5adc5d0cfc706b618926bfcc45 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:27:39 -0500 Subject: [PATCH 10/26] [registry] all unlock calls use UnlockLogError --- registry/server/api_brokers.go | 4 ++-- registry/server/api_topics.go | 6 +++--- registry/server/tag_cleanup.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/registry/server/api_brokers.go b/registry/server/api_brokers.go index ff132f2..72ef2d1 100644 --- a/registry/server/api_brokers.go +++ b/registry/server/api_brokers.go @@ -284,7 +284,7 @@ func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagR if err := s.Locking.Lock(ctx); err != nil { return nil, err } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) if req.Id == 0 { return nil, ErrBrokerIDEmpty @@ -324,7 +324,7 @@ func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (* if err := s.Locking.Lock(ctx); err != nil { return nil, err } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) if req.Id == 0 { return nil, ErrBrokerIDEmpty diff --git a/registry/server/api_topics.go b/registry/server/api_topics.go index f64c66c..93ed0d2 100644 --- a/registry/server/api_topics.go +++ b/registry/server/api_topics.go @@ -286,7 +286,7 @@ func (s *Server) DeleteTopic(ctx context.Context, req *pb.TopicRequest) (*pb.Emp if err := s.Locking.Lock(ctx); err != nil { return nil, err } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) if req.Name == "" { return nil, ErrTopicNameEmpty @@ -378,7 +378,7 @@ func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagRes err = s.Locking.Lock(ctx) switch err { case nil: - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) case zklocking.ErrAlreadyOwnLock: // Don't call unlock. We should be here because CreateTopic was called with // optional tags. We'll let the parent CreateTopic call finally issue unlock. @@ -430,7 +430,7 @@ func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb if err := s.Locking.Lock(ctx); err != nil { return nil, err } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) if req.Name == "" { return nil, ErrTopicNameEmpty diff --git a/registry/server/tag_cleanup.go b/registry/server/tag_cleanup.go index 0c74e1e..ae25401 100644 --- a/registry/server/tag_cleanup.go +++ b/registry/server/tag_cleanup.go @@ -31,7 +31,7 @@ func (tc *TagCleaner) RunTagCleanup(s *Server, ctx context.Context, c Config) { log.Println(err) continue } - defer s.Locking.Unlock(ctx) + defer s.Locking.UnlockLogError(ctx) err := s.MarkForDeletion(time.Now) if err != nil { From 7629ffc1347cc3a32f74918b9c53e7d3d2cff76d Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:30:18 -0500 Subject: [PATCH 11/26] [cluster/zookeeper] clean up lock entry when LockAhead lookup fails --- cluster/zookeeper/locking.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index 5a07bea..b7f1600 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -68,6 +68,7 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // by watching the ID immediately ahead of ours. lockAhead, err := locks.LockAhead(thisID) if err != nil { + z.deleteLockZnode(node) return ErrLockingFailed{message: err.Error()} } From 2b9e8ea0cd9ebeee41a633188cc745af68a3e2d6 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 1 Feb 2022 18:49:06 -0500 Subject: [PATCH 12/26] [cluster/zookeeper] store lockMetadata in lock znode --- cluster/zookeeper/locking.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index b7f1600..a8771f0 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -2,6 +2,7 @@ package zookeeper import ( "context" + "encoding/json" "fmt" "log" "time" @@ -9,18 +10,32 @@ import ( "github.com/go-zookeeper/zk" ) +// lockMetadata is internal metadata persisted in the lock znode. +type lockMetadata struct { + Timestamp time.Time `json:"timestamp"` + OwnerID string `json:"owner_id"` +} + // Lock attemps to acquire a lock. If the lock cannot be acquired by the context // deadline, the lock attempt times out. func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Check if the context has a lock owner value. If so, check if this owner // already has the lock. - if owner := ctx.Value(z.OwnerKey); owner != nil && owner == z.Owner() { + owner := ctx.Value(z.OwnerKey) + if owner != nil && owner == z.Owner() { return ErrAlreadyOwnLock } + // Populate a lockMetadata. + meta := lockMetadata{ + Timestamp: time.Now(), + OwnerID: fmt.Sprintf("%v", owner), + } + metaJSON, _ := json.Marshal(meta) + // Enter the claim into ZooKeeper. lockPath := fmt.Sprintf("%s/lock-", z.Path) - node, err := z.c.CreateProtectedEphemeralSequential(lockPath, nil, zk.WorldACL(31)) + node, err := z.c.CreateProtectedEphemeralSequential(lockPath, metaJSON, zk.WorldACL(31)) if err != nil { return ErrLockingFailed{message: err.Error()} } @@ -72,6 +87,7 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { return ErrLockingFailed{message: err.Error()} } + // XXX(jamie): determine what we should do here. lockAheadPath, _ := locks.LockPath(lockAhead) // Get a ZooKeeper watch on the lock we're waiting on. From 94299194ddf4e77650806350f4956c1d41183466 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 09:27:09 -0500 Subject: [PATCH 13/26] [cluster/zookeeper] lock attempt limit --- cluster/zookeeper/locking.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index a8771f0..403a876 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -48,10 +48,18 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { } var interval int + var lockWaitingErr error for { - // Prevent thrashing. interval++ - if interval%5 == 0 { + + // Max failure threshold. + if interval > 5 && lockWaitingErr != nil { + z.deleteLockZnode(node) + return ErrLockingFailed{message: lockWaitingErr.Error()} + } + + // Prevent thrashing. + if interval > 1 { time.Sleep(50 * time.Millisecond) } @@ -83,15 +91,23 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // by watching the ID immediately ahead of ours. lockAhead, err := locks.LockAhead(thisID) if err != nil { - z.deleteLockZnode(node) - return ErrLockingFailed{message: err.Error()} + lockWaitingErr = err + continue } // XXX(jamie): determine what we should do here. - lockAheadPath, _ := locks.LockPath(lockAhead) + lockAheadPath, err := locks.LockPath(lockAhead) + if err != nil { + lockWaitingErr = err + continue + } // Get a ZooKeeper watch on the lock we're waiting on. _, _, blockingLockReleased, err := z.c.GetW(lockAheadPath) + if err != nil { + lockWaitingErr = err + continue + } // Race the watch event against the context timeout. select { From 13b5c237416df63e678c2a4e61c2a26b709bea0c Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 09:43:58 -0500 Subject: [PATCH 14/26] [cluster/zookeeper] discard logger --- cluster/zookeeper/zookeeper.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cluster/zookeeper/zookeeper.go b/cluster/zookeeper/zookeeper.go index 09ac89b..aed4bd0 100644 --- a/cluster/zookeeper/zookeeper.go +++ b/cluster/zookeeper/zookeeper.go @@ -3,6 +3,8 @@ package zookeeper import ( "fmt" + "io/ioutil" + "log" "strconv" "strings" "sync" @@ -60,10 +62,12 @@ func NewZooKeeperLock(c ZooKeeperLockConfig) (*ZooKeeperLock, error) { var zkl = &ZooKeeperLock{ OwnerKey: c.OwnerKey, } + var err error + var nilLog = log.New(ioutil.Discard, "", 0) // Dial zk. - zkl.c, _, err = zk.Connect([]string{c.Address}, 10*time.Second, zk.WithLogInfo(false)) + zkl.c, _, err = zk.Connect([]string{c.Address}, 10*time.Second, zk.WithLogger(nilLog)) if err != nil { return zkl, err } From 286f813b946aa948ee35a28d5c5e3aa26f842d36 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 11:24:18 -0500 Subject: [PATCH 15/26] [cluster/zookeeper] more informative error --- cluster/zookeeper/locks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/zookeeper/locks.go b/cluster/zookeeper/locks.go index a0f3bd9..a14e66e 100644 --- a/cluster/zookeeper/locks.go +++ b/cluster/zookeeper/locks.go @@ -62,7 +62,7 @@ func (le LockEntries) LockPath(id int) (string, error) { if path, exists := le.m[id]; exists { return path, nil } - return "", fmt.Errorf("lock ID doesn't exist") + return "", fmt.Errorf("failed to get lock path; referenced ID doesn't exist") } // LockAhead returns the lock ahead of the ID provided. From 7039aa0fdd202de6fdc39d407bef0e7f5b7bbd66 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 11:41:01 -0500 Subject: [PATCH 16/26] [cluster/zookeeper] defer based znode cleanup --- cluster/zookeeper/locking.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index 403a876..fd46bc7 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -36,6 +36,18 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Enter the claim into ZooKeeper. lockPath := fmt.Sprintf("%s/lock-", z.Path) node, err := z.c.CreateProtectedEphemeralSequential(lockPath, metaJSON, zk.WorldACL(31)) + + // In all return paths other than the case that we have successfully acquired + // a lock, it's critical that we remove the claim znode. + var removeZnodeAtExit bool = true + defer func() { + if removeZnodeAtExit { + z.deleteLockZnode(node) + } + }() + + // Handle the error after the cleanup defer is registered. It's likely that + // 'node' will always be an empty string if there's a non-nil error, anyway. if err != nil { return ErrLockingFailed{message: err.Error()} } @@ -43,7 +55,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Get our claim ID. thisID, err := idFromZnode(node) if err != nil { - z.deleteLockZnode(node) return ErrLockingFailed{message: err.Error()} } @@ -54,7 +65,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Max failure threshold. if interval > 5 && lockWaitingErr != nil { - z.deleteLockZnode(node) return ErrLockingFailed{message: lockWaitingErr.Error()} } @@ -66,7 +76,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Get all current locks. locks, err := z.locks() if err != nil { - z.deleteLockZnode(node) return ErrLockingFailed{message: err.Error()} } @@ -75,6 +84,7 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { if thisID == firstClaim { // We have the lock. z.mu.Lock() + // Update the lock znode. z.lockZnode, err = locks.LockPath(thisID) // Set the owner value if the context OwnerKey is specified. @@ -82,6 +92,9 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { z.owner = owner } + // XXX preventing this znode from being terminated is essential. + removeZnodeAtExit = false + z.mu.Unlock() return nil @@ -95,7 +108,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { continue } - // XXX(jamie): determine what we should do here. lockAheadPath, err := locks.LockPath(lockAhead) if err != nil { lockWaitingErr = err @@ -113,8 +125,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { select { // We've timed out. case <-ctx.Done(): - // XXX it's critical that we clean up the attempted lock. - z.deleteLockZnode(node) return ErrLockingTimedOut // Else see if we can get the claim. case <-blockingLockReleased: From 82a152e3a831d97f2dcb6751f5170b8456668597 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 12:15:36 -0500 Subject: [PATCH 17/26] [cluster/zookeeper] lock TTL config --- cluster/zookeeper/zookeeper.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cluster/zookeeper/zookeeper.go b/cluster/zookeeper/zookeeper.go index aed4bd0..9be357c 100644 --- a/cluster/zookeeper/zookeeper.go +++ b/cluster/zookeeper/zookeeper.go @@ -18,6 +18,7 @@ type ZooKeeperLock struct { c ZooKeeperClient Path string OwnerKey string + TTL int // The mutex can't be embedded because ZooKeeperLock also has Lock() / Unlock() // methods. @@ -44,6 +45,10 @@ type ZooKeeperLockConfig struct { Address string // The locking path; this is the register that locks are attempting to acquire. Path string + // A non-zero TTL sets a limit (in milliseconds) on how long a lock is possibly + // valid for. Once this limit is exceeded, any new lock claims can destroy those + // exceeding their TTL. + TTL int // An optional lock ownership identifier. Context values can be inspected to // determine if a lock owner already has the lock. For example, if we specify // an OwnerKey configuration value of UserID, any successful lock claim will @@ -61,6 +66,7 @@ type ZooKeeperLockConfig struct { func NewZooKeeperLock(c ZooKeeperLockConfig) (*ZooKeeperLock, error) { var zkl = &ZooKeeperLock{ OwnerKey: c.OwnerKey, + TTL: c.TTL, } var err error From 0acc71eab045a57788aac25e6f1ed31bf17e0846 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 12:16:19 -0500 Subject: [PATCH 18/26] [cluster/zookeeper] lock TTL populated, getLockAheadWait func --- cluster/zookeeper/locking.go | 49 +++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index fd46bc7..97388af 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -12,8 +12,9 @@ import ( // lockMetadata is internal metadata persisted in the lock znode. type lockMetadata struct { - Timestamp time.Time `json:"timestamp"` - OwnerID string `json:"owner_id"` + Timestamp time.Time `json:"timestamp"` + TTLDeadline time.Time `json:"ttl_deadline"` + OwnerID string `json:"owner_id"` } // Lock attemps to acquire a lock. If the lock cannot be acquired by the context @@ -28,8 +29,9 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // Populate a lockMetadata. meta := lockMetadata{ - Timestamp: time.Now(), - OwnerID: fmt.Sprintf("%v", owner), + Timestamp: time.Now(), + TTLDeadline: time.Now().Add(time.Duration(z.TTL) * time.Millisecond), + OwnerID: fmt.Sprintf("%v", owner), } metaJSON, _ := json.Marshal(meta) @@ -102,20 +104,7 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { // If we're here, we don't have the lock; we need to enqueue our wait position // by watching the ID immediately ahead of ours. - lockAhead, err := locks.LockAhead(thisID) - if err != nil { - lockWaitingErr = err - continue - } - - lockAheadPath, err := locks.LockPath(lockAhead) - if err != nil { - lockWaitingErr = err - continue - } - - // Get a ZooKeeper watch on the lock we're waiting on. - _, _, blockingLockReleased, err := z.c.GetW(lockAheadPath) + blockingLockReleased, err := z.getLockAheadWait(locks, thisID) if err != nil { lockWaitingErr = err continue @@ -133,6 +122,30 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { } } +// getLockAheadWait takes a lock ID and returns a watch on the lock immediately +// ahead of it. +func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) { + // Find the lock ID ahead. + lockAhead, err := locks.LockAhead(id) + if err != nil { + return nil, err + } + + // Get its path. + lockAheadPath, err := locks.LockPath(lockAhead) + if err != nil { + return nil, err + } + + // Get a ZooKeeper watch on the lock path we're waiting on. + _, _, watch, err := z.c.GetW(lockAheadPath) + if err != nil { + return nil, err + } + + return watch, nil +} + // Unlock releases a lock. func (z *ZooKeeperLock) Unlock(ctx context.Context) error { // Check if the context has a lock owner value. From e314e36e7951201669a236c391ac1a7d1c82c9c7 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 12:29:29 -0500 Subject: [PATCH 19/26] [cluster/zookeeper] extended names --- cluster/zookeeper/locking.go | 51 +++++++++++++++++++----------------- cluster/zookeeper/locks.go | 22 ++++++++-------- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index 97388af..5aa9361 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -27,6 +27,9 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { return ErrAlreadyOwnLock } + // If lock TTLs are being used, forcibly remove any expired locks. + // z.purgeExpiredLocks() + // Populate a lockMetadata. meta := lockMetadata{ Timestamp: time.Now(), @@ -122,30 +125,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { } } -// getLockAheadWait takes a lock ID and returns a watch on the lock immediately -// ahead of it. -func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) { - // Find the lock ID ahead. - lockAhead, err := locks.LockAhead(id) - if err != nil { - return nil, err - } - - // Get its path. - lockAheadPath, err := locks.LockPath(lockAhead) - if err != nil { - return nil, err - } - - // Get a ZooKeeper watch on the lock path we're waiting on. - _, _, watch, err := z.c.GetW(lockAheadPath) - if err != nil { - return nil, err - } - - return watch, nil -} - // Unlock releases a lock. func (z *ZooKeeperLock) Unlock(ctx context.Context) error { // Check if the context has a lock owner value. @@ -199,3 +178,27 @@ func (z *ZooKeeperLock) deleteLockZnode(p string) error { return nil } + +// getLockAheadWait takes a lock ID and returns a watch on the lock immediately +// ahead of it. +func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) { + // Find the lock ID ahead. + lockAhead, err := locks.LockAhead(id) + if err != nil { + return nil, err + } + + // Get its path. + lockAheadPath, err := locks.LockPath(lockAhead) + if err != nil { + return nil, err + } + + // Get a ZooKeeper watch on the lock path we're waiting on. + _, _, watch, err := z.c.GetW(lockAheadPath) + if err != nil { + return nil, err + } + + return watch, nil +} diff --git a/cluster/zookeeper/locks.go b/cluster/zookeeper/locks.go index a14e66e..03551ff 100644 --- a/cluster/zookeeper/locks.go +++ b/cluster/zookeeper/locks.go @@ -8,9 +8,9 @@ import ( // LockEntries is a container of locks. type LockEntries struct { // Map of lock ID integer to the full znode path. - m map[int]string + idToZnode map[int]string // List of IDs ascending. - l []int + idList []int } // locks returns a LockEntries of all current locks. @@ -19,8 +19,8 @@ func (z *ZooKeeperLock) locks() (LockEntries, error) { defer z.mu.RUnlock() var locks = LockEntries{ - m: map[int]string{}, - l: []int{}, + idToZnode: map[int]string{}, + idList: []int{}, } // Get all nodes in the lock path. @@ -33,19 +33,19 @@ func (z *ZooKeeperLock) locks() (LockEntries, error) { continue } // Append the znode to the map. - locks.m[id] = fmt.Sprintf("%s/%s", z.Path, n) + locks.idToZnode[id] = fmt.Sprintf("%s/%s", z.Path, n) // Append the ID to the list. - locks.l = append(locks.l, id) + locks.idList = append(locks.idList, id) } - sort.Ints(locks.l) + sort.Ints(locks.idList) return locks, e } // IDs returns all held lock IDs ascending. func (le LockEntries) IDs() []int { - return le.l + return le.idList } // First returns the ID with the lowest value. @@ -59,7 +59,7 @@ func (le LockEntries) First() (int, error) { // LockPath takes a lock ID and returns the znode path. func (le LockEntries) LockPath(id int) (string, error) { - if path, exists := le.m[id]; exists { + if path, exists := le.idToZnode[id]; exists { return path, nil } return "", fmt.Errorf("failed to get lock path; referenced ID doesn't exist") @@ -67,9 +67,9 @@ func (le LockEntries) LockPath(id int) (string, error) { // LockAhead returns the lock ahead of the ID provided. func (le LockEntries) LockAhead(id int) (int, error) { - for i, next := range le.l { + for i, next := range le.idList { if next == id && i >= 0 { - return le.l[i-1], nil + return le.idList[i-1], nil } } From d1ab930bede9b21d6f545dd6c9d2677283c53d25 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 13:09:12 -0500 Subject: [PATCH 20/26] [cluster/zookeeper] adds expired lock purging --- cluster/zookeeper/errors.go | 10 +++ cluster/zookeeper/locking.go | 85 +++++++++++++++++---- cluster/zookeeper/zookeeper-example/main.go | 3 +- 3 files changed, 84 insertions(+), 14 deletions(-) diff --git a/cluster/zookeeper/errors.go b/cluster/zookeeper/errors.go index 47d0cc2..3de036d 100644 --- a/cluster/zookeeper/errors.go +++ b/cluster/zookeeper/errors.go @@ -42,3 +42,13 @@ type ErrUnlockingFailed struct { func (err ErrUnlockingFailed) Error() string { return fmt.Sprintf("attempt to release lock failed: %s", err.message) } + +// ErrExpireLockFailed is returned when a lock with an expired TTL fails to purge. +type ErrExpireLockFailed struct { + message string +} + +// Error returns an error string. +func (err ErrExpireLockFailed) Error() string { + return fmt.Sprintf("failed to TTL expire lock: %s", err.message) +} diff --git a/cluster/zookeeper/locking.go b/cluster/zookeeper/locking.go index 5aa9361..7cba145 100644 --- a/cluster/zookeeper/locking.go +++ b/cluster/zookeeper/locking.go @@ -27,9 +27,6 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { return ErrAlreadyOwnLock } - // If lock TTLs are being used, forcibly remove any expired locks. - // z.purgeExpiredLocks() - // Populate a lockMetadata. meta := lockMetadata{ Timestamp: time.Now(), @@ -105,8 +102,21 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error { return nil } - // If we're here, we don't have the lock; we need to enqueue our wait position - // by watching the ID immediately ahead of ours. + // If we're here, we don't have the lock but can enqueue. + + // First, we'll check if the lock ahead has an expired TTL. + expiredLock, err := z.expireLockAhead(locks, thisID) + if err != nil { + lockWaitingErr = err + continue + } + + // If so, restart the iteration to get a refreshed linked list. + if expiredLock { + continue + } + + // Enqueue our wait position by watching the ID immediately ahead of ours. blockingLockReleased, err := z.getLockAheadWait(locks, thisID) if err != nil { lockWaitingErr = err @@ -179,17 +189,55 @@ func (z *ZooKeeperLock) deleteLockZnode(p string) error { return nil } -// getLockAheadWait takes a lock ID and returns a watch on the lock immediately -// ahead of it. -func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) { - // Find the lock ID ahead. - lockAhead, err := locks.LockAhead(id) +// expireLockAhead takes an ID and checks if the lock ahead of it has an expired +// TTL. If so, it purges the lock and returns true. +func (z *ZooKeeperLock) expireLockAhead(locks LockEntries, id int) (bool, error) { + // TTLs aren't being used. + if z.TTL == 0 { + return false, nil + } + + // Get the path to the lock ahead. + lockAheadPath, err := lockAheadPath(locks, id) if err != nil { - return nil, err + return false, ErrExpireLockFailed{message: err.Error()} } - // Get its path. - lockAheadPath, err := locks.LockPath(lockAhead) + // Get its metadata. + dat, _, err := z.c.Get(lockAheadPath) + if err != nil { + return false, ErrExpireLockFailed{message: err.Error()} + } + + // Deserialize. + var metadata lockMetadata + if err := json.Unmarshal(dat, &metadata); err != nil { + return false, ErrExpireLockFailed{message: err.Error()} + } + + // Check if it's expired. + if time.Now().Before(metadata.TTLDeadline) { + return false, nil + } + + // We can purge the lock. + if err := z.deleteLockZnode(lockAheadPath); err != nil { + return false, ErrExpireLockFailed{message: err.Error()} + } + + // Clear the lock state. + z.mu.Lock() + z.lockZnode = "" + z.owner = nil + z.mu.Unlock() + + return true, nil +} + +// getLockAheadWait takes a lock ID and returns a watch on the lock immediately +// ahead of it. +func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) { + lockAheadPath, err := lockAheadPath(locks, id) if err != nil { return nil, err } @@ -202,3 +250,14 @@ func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.E return watch, nil } + +func lockAheadPath(locks LockEntries, id int) (string, error) { + // Find the lock ID ahead. + lockAhead, err := locks.LockAhead(id) + if err != nil { + return "", err + } + + // Get its path. + return locks.LockPath(lockAhead) +} diff --git a/cluster/zookeeper/zookeeper-example/main.go b/cluster/zookeeper/zookeeper-example/main.go index 62ad68e..294e52f 100644 --- a/cluster/zookeeper/zookeeper-example/main.go +++ b/cluster/zookeeper/zookeeper-example/main.go @@ -25,6 +25,7 @@ func main() { Address: "localhost:2181", Path: "/my/locks", OwnerKey: "owner", + TTL: 30000, } lock, _ := zklocking.NewZooKeeperLock(cfg) @@ -37,7 +38,7 @@ func main() { } else { log.Println("I've got the lock!") defer log.Println("I've released the lock") - defer lock.Unlock(ctx) + defer lock.UnlockLogError(ctx) } <-sigs From 4ab97764243bbdf7e7c31c5252e4c76eca457bd2 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 13:13:19 -0500 Subject: [PATCH 21/26] [cluster/zookeeper] updated README, example --- cluster/zookeeper/README.md | 4 +++- cluster/zookeeper/zookeeper-example/main.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cluster/zookeeper/README.md b/cluster/zookeeper/README.md index 716f6a4..39a506a 100644 --- a/cluster/zookeeper/README.md +++ b/cluster/zookeeper/README.md @@ -1,10 +1,11 @@ # Overview -This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met. +This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met. Locks can be configured with an optional TTL. Further implementation notes: - Locks are enqueued and granted in order as locks ahead are relinquished or timed out. - Session timeouts/disconnects are handled through ZooKeeper sessions with automatic cleanup; locks that fail to acquire before the context timeout are removed from the queue even if the lock session is still active. +- Setting a `ZooKeeperLockConfig.TTL` value > 0 enables lock TTLs. Take note that TTL expirations are handled at request time from contending locks; if service A is not using TTLs and service B is, service B can forcibly abort service A locks. # Examples @@ -37,6 +38,7 @@ func main() { cfg := zklocking.ZooKeeperLockConfig{ Address: "localhost:2181", Path: "/my/locks", + TTL: 30000, OwnerKey: "owner", } diff --git a/cluster/zookeeper/zookeeper-example/main.go b/cluster/zookeeper/zookeeper-example/main.go index 294e52f..33c6744 100644 --- a/cluster/zookeeper/zookeeper-example/main.go +++ b/cluster/zookeeper/zookeeper-example/main.go @@ -15,6 +15,7 @@ import ( func main() { timeout := flag.Duration("timeout", 3*time.Second, "lock wait timeout") owner := flag.String("owner", "user1", "the lock owner ID") + ttl := flag.Int("lock-ttl", 10000, "lock TTL (milliseconds)") flag.Parse() sigs := make(chan os.Signal, 1) @@ -25,7 +26,7 @@ func main() { Address: "localhost:2181", Path: "/my/locks", OwnerKey: "owner", - TTL: 30000, + TTL: *ttl, } lock, _ := zklocking.NewZooKeeperLock(cfg) From 9e3f61b2337a829a8fd2bd9e31c40613d844aa0a Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 2 Feb 2022 14:04:55 -0500 Subject: [PATCH 22/26] [registry] sets ZooKeeperLock TTL --- registry/server/server.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/registry/server/server.go b/registry/server/server.go index b064364..21a629a 100644 --- a/registry/server/server.go +++ b/registry/server/server.go @@ -278,8 +278,11 @@ func (s *Server) InitKafkaConsumer(ctx context.Context, wg *sync.WaitGroup, cfg // EnablingLocking uses distributed locking for write operations. func (s *Server) EnablingLocking(c *kafkazk.Config) error { cfg := zklocking.ZooKeeperLockConfig{ - Address: c.Connect, - Path: "/registry/locks", + Address: c.Connect, + Path: "/registry/locks", + // The maximum API request timeout is 3x the default. Lock TTLs should be + // bound to this same duration. + TTL: 3 * int(s.defaultRequestTimeout) / 1e6, OwnerKey: "reqID", } From 491712870217b686bc12b8cd6ffd798eca66408c Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Fri, 4 Feb 2022 16:46:03 -0500 Subject: [PATCH 23/26] [cluster/zookeeper] TestLockTTLIntegration --- .../zookeeper/zookeeper_integration_test.go | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/cluster/zookeeper/zookeeper_integration_test.go b/cluster/zookeeper/zookeeper_integration_test.go index 3eed273..ba812b9 100644 --- a/cluster/zookeeper/zookeeper_integration_test.go +++ b/cluster/zookeeper/zookeeper_integration_test.go @@ -36,8 +36,35 @@ func TestLockIntegration(t *testing.T) { // This lock should time out. err2 := lock2.Lock(ctx) + defer lock2.Unlock(ctx) + assert.Equal(t, err2, ErrLockingTimedOut) +} + +func TestLockTTLIntegration(t *testing.T) { + cfg := ZooKeeperLockConfig{ + Address: TESTING_ZK_ADDR, + Path: "/registry/locks", + TTL: 1000, + } + + lock, err := NewZooKeeperLock(cfg) + assert.Nil(t, err) + lock2, err := NewZooKeeperLock(cfg) + assert.Nil(t, err) + + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + + // This lock should succeed normally. + err = lock.Lock(ctx) defer lock.Unlock(ctx) - assert.Equal(t, err2, ErrLockingTimedOut, "Expected ErrLockingTimedOut") + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + // This lock should succeed by purging the now stale lock. + err2 := lock2.Lock(ctx) + defer lock2.Unlock(ctx) + assert.Equal(t, err2, nil) } func TestUnlockIntegration(t *testing.T) { From 9610b3a57c0a25e385f69429974115517de9b1ad Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Fri, 4 Feb 2022 17:59:38 -0500 Subject: [PATCH 24/26] [cluster/zookeeper] TestExpireLockAhead --- cluster/zookeeper/locking_test.go | 49 ++++++++++++++++++++++++ cluster/zookeeper/locks.go | 2 +- cluster/zookeeper/locks_test.go | 40 +++++++++++++------ cluster/zookeeper/zookeeper_test_mock.go | 38 +++++++++++++----- 4 files changed, 107 insertions(+), 22 deletions(-) diff --git a/cluster/zookeeper/locking_test.go b/cluster/zookeeper/locking_test.go index 1dd65b7..c69ded4 100644 --- a/cluster/zookeeper/locking_test.go +++ b/cluster/zookeeper/locking_test.go @@ -2,9 +2,11 @@ package zookeeper import ( "context" + "fmt" "testing" "time" + "github.com/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) @@ -55,3 +57,50 @@ func TestUnlock(t *testing.T) { err = lock.Lock(ctx) assert.Nil(t, err) } + +func TestExpireLockAhead(t *testing.T) { + lock := newMockZooKeeperLock() + ctx, cf := context.WithTimeout(context.Background(), 60*time.Second) + _ = cf + ctx = context.WithValue(ctx, "owner", "test_owner") + + // This lock should succeed normally. + err := lock.Lock(ctx) + assert.Nil(t, err) + + // Enter a pending claim. This mimics the initial znode entry in the ZooKeeperLock + // Lock method. We do this rather than calling the Lock method entirely + // to exclude other operations that may affect what we really want to test. + lockPath := fmt.Sprintf("%s/lock-", lock.Path) + node, _ := lock.c.CreateProtectedEphemeralSequential(lockPath, nil, zk.WorldACL(31)) + id, _ := idFromZnode(node) + + // Check that the lock state has been populated. + assert.Equal(t, lock.owner, "test_owner") + assert.Equal(t, lock.lockZnode, "/locks/_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001") + + // Get the current lock entries. + le, _ := lock.locks() + + // Ensure we exceed the mock ZooKeeperLock.TTL of 10ms. + time.Sleep(30 * time.Millisecond) + + // This scenario should result in an expiry. We have an active lock ID 1 + // from the above Lock() call. + expired, err := lock.expireLockAhead(le, id) + assert.Nil(t, err) + assert.True(t, expired) + + // Refresh the lock entries. + le, _ = lock.locks() + + // This should now fail; the lock was expired and the only entry is ID 2 + // for the pending claim we entered above. + expired, err = lock.expireLockAhead(le, id) + assert.Equal(t, err, ErrExpireLockFailed{message: "unable to determine which lock to enqueue behind"}) + assert.False(t, expired) + + // Check that the lock state has been cleared. + assert.Nil(t, lock.owner) + assert.Equal(t, lock.lockZnode, "") +} diff --git a/cluster/zookeeper/locks.go b/cluster/zookeeper/locks.go index 03551ff..d2358c1 100644 --- a/cluster/zookeeper/locks.go +++ b/cluster/zookeeper/locks.go @@ -68,7 +68,7 @@ func (le LockEntries) LockPath(id int) (string, error) { // LockAhead returns the lock ahead of the ID provided. func (le LockEntries) LockAhead(id int) (int, error) { for i, next := range le.idList { - if next == id && i >= 0 { + if next == id && i > 0 { return le.idList[i-1], nil } } diff --git a/cluster/zookeeper/locks_test.go b/cluster/zookeeper/locks_test.go index 574beac..b2cd525 100644 --- a/cluster/zookeeper/locks_test.go +++ b/cluster/zookeeper/locks_test.go @@ -9,9 +9,13 @@ import ( func TestIDs(t *testing.T) { c := &mockZooKeeperClient{ znodeNameTemplate: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-00000000", - locks: []string{ - "/locks/_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", - "/locks/_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + locks: []fakeLock{ + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", + }, + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + }, }, } @@ -25,9 +29,13 @@ func TestIDs(t *testing.T) { func TestFirst(t *testing.T) { c := &mockZooKeeperClient{ znodeNameTemplate: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-00000000", - locks: []string{ - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + locks: []fakeLock{ + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", + }, + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + }, }, } @@ -44,9 +52,13 @@ func TestFirst(t *testing.T) { func TestLockPath(t *testing.T) { c := &mockZooKeeperClient{ znodeNameTemplate: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-00000000", - locks: []string{ - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + locks: []fakeLock{ + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", + }, + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + }, }, } @@ -69,9 +81,13 @@ func TestLockPath(t *testing.T) { func TestLockAhead(t *testing.T) { c := &mockZooKeeperClient{ znodeNameTemplate: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-00000000", - locks: []string{ - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", - "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + locks: []fakeLock{ + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001", + }, + { + path: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000002", + }, }, } diff --git a/cluster/zookeeper/zookeeper_test_mock.go b/cluster/zookeeper/zookeeper_test_mock.go index 9375665..0c5ce72 100644 --- a/cluster/zookeeper/zookeeper_test_mock.go +++ b/cluster/zookeeper/zookeeper_test_mock.go @@ -11,22 +11,31 @@ import ( type mockZooKeeperClient struct { mu sync.Mutex + lockZnode string + owner string + ttl int znodeNameTemplate string - locks []string + locks []fakeLock nextID int32 path string } +type fakeLock struct { + path string + data []byte +} + func newMockZooKeeperLock() *ZooKeeperLock { return &ZooKeeperLock{ c: &mockZooKeeperClient{ znodeNameTemplate: "_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-00000000", - locks: []string{}, + locks: []fakeLock{}, nextID: 0, path: "/locks", }, OwnerKey: "owner", Path: "/locks", + TTL: 10, } } @@ -41,7 +50,7 @@ func newMockZooKeeperLockWithClient(c *mockZooKeeperClient) *ZooKeeperLock { func (m *mockZooKeeperClient) Children(s string) ([]string, *zk.Stat, error) { var names []string for _, lock := range m.locks { - names = append(names, strings.Trim(lock, m.path)) + names = append(names, strings.Trim(lock.path, m.path)) } return names, nil, nil } @@ -54,17 +63,22 @@ func (m *mockZooKeeperClient) CreateProtectedEphemeralSequential(s string, b []b // Mimic the sequential znode naming scheme. If s == "/locks/lock-", we want // "/locks/_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001" parts := strings.Split(s, "/") - fakeZnode := fmt.Sprintf("/%s/%s%d", parts[1], m.znodeNameTemplate, atomic.AddInt32(&m.nextID, 1)) + + var l = fakeLock{ + path: fmt.Sprintf("/%s/%s%d", parts[1], m.znodeNameTemplate, atomic.AddInt32(&m.nextID, 1)), + data: b, + } + // Store the fake lock name. - m.locks = append(m.locks, fakeZnode) + m.locks = append(m.locks, l) - return fakeZnode, nil + return l.path, nil } func (m *mockZooKeeperClient) Delete(s string, i int32) error { - var l []string + var l []fakeLock for _, e := range m.locks { - if e != s { + if e.path != s { l = append(l, e) } } @@ -74,7 +88,13 @@ func (m *mockZooKeeperClient) Delete(s string, i int32) error { } func (m *mockZooKeeperClient) Get(s string) ([]byte, *zk.Stat, error) { - return nil, &zk.Stat{Version: 1}, nil + var lock fakeLock + for _, l := range m.locks { + if l.path == s { + lock = l + } + } + return lock.data, &zk.Stat{Version: 1}, nil } func (m *mockZooKeeperClient) GetW(s string) ([]byte, *zk.Stat, <-chan zk.Event, error) { From 09cdbd245a8c29876695aa35852f12cfa5b46398 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 8 Feb 2022 12:00:31 -0500 Subject: [PATCH 25/26] [registry] context has reqID filled for lock ownership --- cmd/registry/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/registry/main.go b/cmd/registry/main.go index c3de8ec..43e477c 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -118,6 +118,7 @@ func main() { log.Println("Registry running") ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, "reqID", "registry-server") wg := &sync.WaitGroup{} // Initialize Server. From 8a900449d96f72efc23e0110ee0f15a7730f5bca Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Tue, 8 Feb 2022 12:16:12 -0500 Subject: [PATCH 26/26] [registry] relocates tag cleanup locking --- registry/server/tag_cleanup.go | 32 ++++++++++++++++--------- registry/server/tag_cleanup_test.go | 36 ++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/registry/server/tag_cleanup.go b/registry/server/tag_cleanup.go index ae25401..ea18699 100644 --- a/registry/server/tag_cleanup.go +++ b/registry/server/tag_cleanup.go @@ -27,25 +27,20 @@ func (tc *TagCleaner) RunTagCleanup(s *Server, ctx context.Context, c Config) { for tc.running { <-t.C - if err := s.Locking.Lock(ctx); err != nil { - log.Println(err) + if err := s.MarkForDeletion(ctx, time.Now); err != nil { + log.Println("error marking tags for deletion: ", err) continue } - defer s.Locking.UnlockLogError(ctx) - err := s.MarkForDeletion(time.Now) - if err != nil { - log.Println(err) - continue + if err := s.DeleteStaleTags(ctx, time.Now, c); err != nil { + log.Println("error deleting stale tags: ", err) } - - s.DeleteStaleTags(time.Now, c) } } // MarkForDeletion marks stored tags that have been stranded without an associated // kafka resource. -func (s *Server) MarkForDeletion(now func() time.Time) error { +func (s *Server) MarkForDeletion(ctx context.Context, now func() time.Time) error { markTimeMinutes := fmt.Sprint(now().Unix()) // Get all brokers from ZK. @@ -54,6 +49,12 @@ func (s *Server) MarkForDeletion(now func() time.Time) error { return ErrFetchingBrokers } + // Lock. + if err := s.Locking.Lock(ctx); err != nil { + return err + } + defer s.Locking.UnlockLogError(ctx) + // Get all topics from ZK topics, err := s.ZK.GetTopics([]*regexp.Regexp{topicRegex}) topicSet := TopicSetFromSlice(topics) @@ -110,8 +111,15 @@ func (s *Server) MarkForDeletion(now func() time.Time) error { } // DeleteStaleTags deletes any tags that have not had a kafka resource associated with them. -func (s *Server) DeleteStaleTags(now func() time.Time, c Config) { +func (s *Server) DeleteStaleTags(ctx context.Context, now func() time.Time, c Config) error { sweepTime := now().Unix() + + // Lock. + if err := s.Locking.Lock(ctx); err != nil { + return err + } + defer s.Locking.UnlockLogError(ctx) + allTags, _ := s.Tags.Store.GetAllTags() for kafkaObject, tags := range allTags { @@ -132,6 +140,8 @@ func (s *Server) DeleteStaleTags(now func() time.Time, c Config) { log.Printf("deleted tags for non-existent %s %s\n", kafkaObject.Type, kafkaObject.ID) } } + + return nil } // TopicSetFromSlice converts a slice into a TopicSet for convenience diff --git a/registry/server/tag_cleanup_test.go b/registry/server/tag_cleanup_test.go index 8a2f5cf..ba92d78 100644 --- a/registry/server/tag_cleanup_test.go +++ b/registry/server/tag_cleanup_test.go @@ -1,10 +1,12 @@ package server import ( + "context" "fmt" - "github.com/DataDog/kafka-kit/v3/kafkazk" "testing" "time" + + "github.com/DataDog/kafka-kit/v3/kafkazk" ) func TestMarkStaleTags(t *testing.T) { @@ -21,13 +23,16 @@ func TestMarkStaleTags(t *testing.T) { zk := kafkazk.NewZooKeeperStub() th := testTagHandler() - s := Server{Tags: th, ZK: zk} + + s := testServer() + s.Tags = th + s.ZK = zk // WHEN th.Store.SetTags(topic, tt) th.Store.SetTags(broker, bt) th.Store.SetTags(noBroker, nbt) - s.MarkForDeletion(time.Now) + s.MarkForDeletion(context.Background(), time.Now) // THEN nbtags, _ := th.Store.GetTags(noBroker) @@ -54,13 +59,16 @@ func TestDeleteStaleTags(t *testing.T) { bt := TagSet{"foo": "bar", TagMarkTimeKey: fmt.Sprint(markTime.Unix())} broker := KafkaObject{Type: "broker", ID: "not found"} - th := testTagHandler() zk := kafkazk.NewZooKeeperStub() - s := Server{Tags: th, ZK: zk} + th := testTagHandler() + + s := testServer() + s.Tags = th + s.ZK = zk //WHEN th.Store.SetTags(broker, bt) - s.DeleteStaleTags(func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10}) + s.DeleteStaleTags(context.Background(), func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10}) //THEN btags, _ := th.Store.GetTags(broker) @@ -77,13 +85,16 @@ func TestUnmarkedTagsAreSafe(t *testing.T) { bt := TagSet{"foo": "bar"} broker := KafkaObject{Type: "broker", ID: "not found"} - th := testTagHandler() zk := kafkazk.NewZooKeeperStub() - s := Server{Tags: th, ZK: zk} + th := testTagHandler() + + s := testServer() + s.Tags = th + s.ZK = zk //WHEN th.Store.SetTags(broker, bt) - s.DeleteStaleTags(func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10}) + s.DeleteStaleTags(context.Background(), func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10}) //THEN btags, _ := th.Store.GetTags(broker) @@ -102,12 +113,15 @@ func TestKafkaObjectComesBack(t *testing.T) { zk := kafkazk.NewZooKeeperStub() th := testTagHandler() - s := Server{Tags: th, ZK: zk} + + s := testServer() + s.Tags = th + s.ZK = zk // WHEN th.Store.SetTags(broker, bt) th.Store.SetTags(topic, tt) - s.MarkForDeletion(time.Now) + s.MarkForDeletion(context.Background(), time.Now) // THEN btags, _ := th.Store.GetTags(broker)