From cae74c52064df302633acfa4fc9ad073cf2848ae Mon Sep 17 00:00:00 2001 From: Tom Jorissen Date: Tue, 9 Jan 2024 05:00:10 +0100 Subject: [PATCH 01/18] [Issue 1105][pulsaradmin] fix AutoTopicCreation for type non-partitioned (#1107) Fixes #1105 ### Motivation To allow setting the AutoTopicCreation policy to non-partitioned it is required to not send the partitions parameter, by making Partitions a pointer instead of an int this is possible. ### Modifications Change the type of Partitions in TopicAutoCreationConfig from int to *int --- pulsaradmin/pkg/utils/topic_auto_creation_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsaradmin/pkg/utils/topic_auto_creation_config.go b/pulsaradmin/pkg/utils/topic_auto_creation_config.go index 6664655974..8444514250 100644 --- a/pulsaradmin/pkg/utils/topic_auto_creation_config.go +++ b/pulsaradmin/pkg/utils/topic_auto_creation_config.go @@ -20,5 +20,5 @@ package utils type TopicAutoCreationConfig struct { Allow bool `json:"allowAutoTopicCreation"` Type TopicType `json:"topicType"` - Partitions int `json:"defaultNumPartitions"` + Partitions *int `json:"defaultNumPartitions"` } From ad7887e3f42171681a6fd4b16d7b4b8d389f5f89 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 9 Jan 2024 12:00:36 +0800 Subject: [PATCH 02/18] Add test for admin topic creation (#1152) ### Motivation Lacking tests for creating topic using pulsarAdmin ### Modifications - Add test for admin topic creation --- pulsaradmin/pkg/admin/topic_test.go | 55 +++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 pulsaradmin/pkg/admin/topic_test.go diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go new file mode 100644 index 0000000000..06c33f2ef5 --- /dev/null +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +func TestCreateTopic(t *testing.T) { + checkError := func(err error) { + if err != nil { + t.Error(err) + } + } + + cfg := &config.Config{} + admin, err := New(cfg) + checkError(err) + + topic := "persistent://public/default/testCreateTopic" + + topicName, err := utils.GetTopicName(topic) + checkError(err) + + err = admin.Topics().Create(*topicName, 0) + checkError(err) + + topicLists, err := admin.Namespaces().GetTopics("public/default") + checkError(err) + + for _, t := range topicLists { + if t == topic { + return + } + } + t.Error("Couldn't find topic: " + topic) +} From 3388eae9f03f43b1446c3dd573a104b3f95fe302 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 9 Jan 2024 12:01:50 +0800 Subject: [PATCH 03/18] [Improve] Implement GetTopicAutoCreation (#1151) ### Motivation The `GetTopicAutoCreation` endpoint is missed, needs to add it ### Modifications Implement the `GetTopicAutoCreation` method --- pulsaradmin/pkg/admin/namespace.go | 10 ++ pulsaradmin/pkg/admin/namespace_test.go | 175 ++++++++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 pulsaradmin/pkg/admin/namespace_test.go diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 732441e8c2..782ae3ae25 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -75,6 +75,9 @@ type Namespaces interface { // RemoveBacklogQuota removes a backlog quota policy from a namespace RemoveBacklogQuota(namespace string) error + // GetTopicAutoCreation returns the topic auto-creation config for a namespace + GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) + // SetTopicAutoCreation sets topic auto-creation config for a namespace, overriding broker settings SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error @@ -445,6 +448,13 @@ func (n *namespaces) RemoveBacklogQuota(namespace string) error { return n.pulsar.Client.DeleteWithQueryParams(endpoint, params) } +func (n *namespaces) GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) { + var topicAutoCreation utils.TopicAutoCreationConfig + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") + err := n.pulsar.Client.Get(endpoint, &topicAutoCreation) + return &topicAutoCreation, err +} + func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") return n.pulsar.Client.Post(endpoint, &config) diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go new file mode 100644 index 0000000000..f934a96865 --- /dev/null +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func ptr(n int) *int { + return &n +} + +func TestSetTopicAutoCreation(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + tests := []struct { + name string + namespace string + config utils.TopicAutoCreationConfig + errReason string + }{ + { + name: "Set partitioned type topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + Partitions: ptr(3), + }, + errReason: "", + }, + { + name: "Set partitioned type topic auto creation without partitions", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] " + + "cannot be null when the type is partitioned.", + }, + { + name: "Set partitioned type topic auto creation with partitions < 1", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + Partitions: ptr(-1), + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] " + + "cannot be less than 1 for partition type.", + }, + { + name: "Set non-partitioned type topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "", + }, + { + name: "Set non-partitioned type topic auto creation with partitions", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + Partitions: ptr(3), + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] is " + + "not allowed to be set when the type is non-partition.", + }, + { + name: "Disable topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: false, + }, + errReason: "", + }, + { + name: "Set topic auto creation on a non-exist namespace", + namespace: "public/nonexist", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "Namespace does not exist", + }, + { + name: "Set topic auto creation on a non-exist tenant", + namespace: "non-exist/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "Tenant does not exist", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + namespace, _ := utils.GetNamespaceName(tt.namespace) + err := admin.Namespaces().SetTopicAutoCreation(*namespace, tt.config) + if tt.errReason == "" { + assert.Equal(t, nil, err) + + err = admin.Namespaces().RemoveTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + } + if err != nil { + restError := err.(rest.Error) + assert.Equal(t, tt.errReason, restError.Reason) + } + }) + } +} + +func TestGetTopicAutoCreation(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set the topic auto creation config and get it + err = admin.Namespaces().SetTopicAutoCreation(*namespace, utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }) + assert.Equal(t, nil, err) + topicAutoCreation, err := admin.Namespaces().GetTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + expected := utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + } + assert.Equal(t, expected, *topicAutoCreation) + + // remove the topic auto creation config and get it + err = admin.Namespaces().RemoveTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + + topicAutoCreation, err = admin.Namespaces().GetTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + expected = utils.TopicAutoCreationConfig{ + Allow: false, + Type: "", + } + assert.Equal(t, expected, *topicAutoCreation) +} From 58941348a7fa467df21ddd9a3ddf11e9103cc4ac Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 9 Jan 2024 19:01:14 -0800 Subject: [PATCH 04/18] chore(deps): bump github.com/dvsekhvalnov/jose2go from 1.5.0 to 1.6.0 (#1150) Bumps [github.com/dvsekhvalnov/jose2go](https://github.com/dvsekhvalnov/jose2go) from 1.5.0 to 1.6.0. - [Commits](https://github.com/dvsekhvalnov/jose2go/compare/v1.5...v1.6.0) --- updated-dependencies: - dependency-name: github.com/dvsekhvalnov/jose2go dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 8 +++++--- go.sum | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 52bfe43a96..df1024759e 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,10 @@ require ( google.golang.org/protobuf v1.30.0 ) -require github.com/golang/protobuf v1.5.2 +require ( + github.com/golang/protobuf v1.5.2 + github.com/hashicorp/go-multierror v1.1.1 +) require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -38,13 +41,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/danieljoos/wincred v1.1.2 // indirect - github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect diff --git a/go.sum b/go.sum index e8a9e76be7..b75adeae6e 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= -github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= -github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= +github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= From 93476909e36b7344c1b623fbe1b0d5b93792eb2f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 12 Jan 2024 16:03:55 +0800 Subject: [PATCH 05/18] chore(deps): bump golang.org/x/net from 0.0.0-20220225172249-27dd8689420f to 0.17.0 (#1155) Golang 1.16 is nolonger compatible with pulsar-client-go because `unsafe.Slice` is not defined. Update the CI workflows to test the latest 4 Golang major releases. Actually Golang only maintains the latest two major releases, see https://go.dev/doc/devel/release#policy --- .github/workflows/ci.yml | 2 +- go.mod | 10 +++++----- go.sum | 10 ++++++++++ 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f0c9b63999..9ad0b3f255 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.16, 1.17, 1.18, 1.19] + go-version: ['1.18', '1.19', '1.20', '1.21'] steps: - uses: actions/checkout@v3 - name: clean docker cache diff --git a/go.mod b/go.mod index df1024759e..3257d51047 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.0 go.uber.org/atomic v1.7.0 - golang.org/x/mod v0.5.1 + golang.org/x/mod v0.8.0 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/protobuf v1.30.0 @@ -58,10 +58,10 @@ require ( github.com/prometheus/procfs v0.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect - golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect - golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index b75adeae6e..50a1ba3ebb 100644 --- a/go.sum +++ b/go.sum @@ -323,6 +323,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -357,6 +359,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -418,9 +422,13 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -429,6 +437,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= From 4e138228fe501ec39856afbc5e541e87a143b73f Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 15 Jan 2024 10:02:23 +0800 Subject: [PATCH 06/18] [fix] Fix DLQ producer name conflicts when multiples consumers send messages to DLQ (#1156) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/21890 ### Modifications Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)` --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_regex_test.go | 6 ++++-- pulsar/consumer_test.go | 6 ++++-- pulsar/dlq_router.go | 6 ++++-- pulsar/reader_impl.go | 2 +- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d701ab16d6..75d839b412 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log) if err != nil { return nil, err } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 3e5f1d61db..ebd7e4e196 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { @@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8b983d0d19..df70b0dd0b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { if prodOpt != nil { dlqPolicy.ProducerOptions = *prodOpt } - sub := "my-sub" + sub, consumerName := "my-sub", "my-consumer" + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: sub, NackRedeliveryDelay: 1 * time.Second, Type: Shared, DLQ: &dlqPolicy, + Name: consumerName, }) assert.Nil(t, err) defer consumer.Close() @@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { assert.Equal(t, []byte(expectMsg), msg.Payload()) // check dql produceName - assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub)) + assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName)) // check original messageId assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID]) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 5b9314bddc..6be35d7485 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -34,16 +34,18 @@ type dlqRouter struct { closeCh chan interface{} topicName string subscriptionName string + consumerName string log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string, logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy, topicName: topicName, subscriptionName: subscriptionName, + consumerName: consumerName, log: logger, } @@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema if opt.Name == "" { - opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName) + opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName) } // the origin code sets to LZ4 compression with no options diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 0999e88fee..7b260b88db 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log) if err != nil { return nil, err } From 5768f009c3a28cf58453acf7f67dcdeb6c7ff5d3 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 29 Jan 2024 09:39:10 +0800 Subject: [PATCH 07/18] Add 0.12.0 change log (#1153) * Add 0.12.0 change log --- CHANGELOG.md | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ VERSION | 2 +- stable.txt | 2 +- 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7afe2c3a0..7a4c81cb74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,85 @@ All notable changes to this project will be documented in this file. +[0.12.0] 2024-01-10 + +## What's Changed +* Improved the performance of schema and schema cache by @gunli in https://github.com/apache/pulsar-client-go/pull/1033 +* Fixed return when registerSendOrAckOp() failed by @gunli in https://github.com/apache/pulsar-client-go/pull/1045 +* Fixed the incorrect link in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1050 +* Fixed Producer by checking if message is nil by @gunli in https://github.com/apache/pulsar-client-go/pull/1047 +* Added 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1048 +* Fixed 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1054 +* Fixed issue 877 where ctx in partitionProducer.Send() was not performing as expected by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1053 +* Fixed Producer by stopping block request even if Value and Payload are both set by @gunli in https://github.com/apache/pulsar-client-go/pull/1052 +* Improved Producer by simplifying the flush logic by @gunli in https://github.com/apache/pulsar-client-go/pull/1049 +* Fixed issue 1051: inaccurate producer memory limit in chunking and schema by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1055 +* Fixed issue by sending Close Command on Producer/Consumer create timeout by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1061 +* Fixed issue 1057: producer flush operation is not guaranteed to flush all messages by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1058 +* Fixed issue 1064: panic when trying to flush in DisableBatching=true by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1065 +* Fixed transaction acknowledgement and send logic for chunk message by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1069 +* Fixed issue by closing consumer resources if creation fails by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1070 +* Fixed issue where client reconnected every authenticationRefreshCheckSeconds when using TLS authentication by @jffp113 in https://github.com/apache/pulsar-client-go/pull/1062 +* Corrected the SendAsync() description by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1066 +* CI: replaced license header checker and formatter by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1077 +* Chore: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1080 +* Adopted pulsar-admin-go sources by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1079 +* Reverted: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1081 +* Fixed producer by failing all messages that are pending requests when closing like Java by @graysonzeng in https://github.com/apache/pulsar-client-go/pull/1059 +* Supported load config from env by @tuteng in https://github.com/apache/pulsar-client-go/pull/1089 +* Fixed issue where multiple calls to client.Close causes panic by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1046 +* Improved client by implementing GetLastMSgID for Reader by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1087 +* Fixed comment for ConnectionMaxIdleTime by @massakam in https://github.com/apache/pulsar-client-go/pull/1091 +* Issue 1094: connectionTimeout respects net.Dialer default timeout by @zzzming in https://github.com/apache/pulsar-client-go/pull/1095 +* Supported OAuth2 with scope field by @labuladong in https://github.com/apache/pulsar-client-go/pull/1097 +* Fixed issue where DisableReplication flag does not work by @massakam in https://github.com/apache/pulsar-client-go/pull/1100 +* Double-checked before consumer reconnect by @zccold in https://github.com/apache/pulsar-client-go/pull/1084 +* Fixed schema error by @leizhiyuan in https://github.com/apache/pulsar-client-go/pull/823 +* PR-1071-1: renamed pendingItem.Complete() to pendingItem.done() by @gunli in https://github.com/apache/pulsar-client-go/pull/1109 +* PR-1071-2: added sendRequest.done() to release resource together by @gunli in https://github.com/apache/pulsar-client-go/pull/1110 +* Refactor: factored out validateMsg by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1117 +* Refactor: factored out prepareTransaction by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1118 +* Completed comment on ProducerInterceptor interface BeforeSend method by @ojcm in https://github.com/apache/pulsar-client-go/pull/1119 +* Refactor: prepared sendrequest and moved to internalSendAsync by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1120 +* Fix: normalized all send request resource release into sr.done by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1121 +* Improvement: added func blockIfQueueFull() to encapsulate DisableBlockIfQue… by @gunli in https://github.com/apache/pulsar-client-go/pull/1122 +* Improved debug log clarity in ReceivedSendReceipt() by @gunli in https://github.com/apache/pulsar-client-go/pull/1123 +* Fixed issue 1098 by checking batchBuilder in case batch is disabled by @zzzming in https://github.com/apache/pulsar-client-go/pull/1099 +* Fixed Producer by fixing reconnection backoff logic by @gunli in https://github.com/apache/pulsar-client-go/pull/1125 +* Added 0.11.1 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1092 +* Fixed dead link to the KEYS file in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1127 +* Improved performance by pooling sendRequest by @gunli in https://github.com/apache/pulsar-client-go/pull/1126 +* Fixed argument order to Errorf in TableView message handling by @ojcm in https://github.com/apache/pulsar-client-go/pull/1130 +* Fixed Producer by double-checking before reconnect by @gunli in https://github.com/apache/pulsar-client-go/pull/1131 +* Fixed issue where client must not retry connecting to broker when topic is terminated by @pkumar-singh in https://github.com/apache/pulsar-client-go/pull/1128 +* Issue 1132: Fixed JSONSchema unmarshalling in TableView by @ojcm in https://github.com/apache/pulsar-client-go/pull/1133 +* Improved by setting dlq producerName by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1137 +* Fixed channel deadlock in regexp consumer by @goncalo-rodrigues in https://github.com/apache/pulsar-client-go/pull/1141 +* Fixed Producer: handled TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting by @gunli in https://github.com/apache/pulsar-client-go/pull/1134 +* Transaction: Avoided a panic when using transaction by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1144 +* Improved by updating connection.lastDataReceivedTime when connection is ready by @gunli in https://github.com/apache/pulsar-client-go/pull/1145 +* Improved Producer by normalizing and exporting the errors by @gunli in https://github.com/apache/pulsar-client-go/pull/1143 +* Updated Unsubscribe() interface comment by @geniusjoe in https://github.com/apache/pulsar-client-go/pull/1146 +* Issue 1105: Fixed AutoTopicCreation for type non-partitioned by @tomjo in https://github.com/apache/pulsar-client-go/pull/1107 +* Added test for admin topic creation by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1152 +* Implemented GetTopicAutoCreation by @jiangpengcheng in https://github.com/apache/pulsar-client-go/pull/1151 +* Bumped github.com/dvsekhvalnov/jose2go from 1.5.0 to 1.6.0 by @dependabot in https://github.com/apache/pulsar-client-go/pull/1150 +* Bump golang.org/x/net from 0.0.0-20220225172249-27dd8689420f to 0.17.0 by @BewareMyPower in https://github.com/apache/pulsar-client-go/pull/1155 +* Fix DLQ producer name conflicts when multiples consumers send messages to DLQ by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1156 + +## New Contributors +* @jffp113 made their first contribution in https://github.com/apache/pulsar-client-go/pull/1062 +* @tuteng made their first contribution in https://github.com/apache/pulsar-client-go/pull/1089 +* @zccold made their first contribution in https://github.com/apache/pulsar-client-go/pull/1084 +* @ojcm made their first contribution in https://github.com/apache/pulsar-client-go/pull/1119 +* @pkumar-singh made their first contribution in https://github.com/apache/pulsar-client-go/pull/1128 +* @goncalo-rodrigues made their first contribution in https://github.com/apache/pulsar-client-go/pull/1141 +* @Gilthoniel made their first contribution in https://github.com/apache/pulsar-client-go/pull/1144 +* @geniusjoe made their first contribution in https://github.com/apache/pulsar-client-go/pull/1146 +* @tomjo made their first contribution in https://github.com/apache/pulsar-client-go/pull/1107 +* @jiangpengcheng made their first contribution in https://github.com/apache/pulsar-client-go/pull/1151 +* @dependabot made their first contribution in https://github.com/apache/pulsar-client-go/pull/1150 + [0.11.1] 2023-09-11 - Close consumer resources if the creation fails by @michaeljmarshall in [#1070](https://github.com/apache/pulsar-client-go/pull/1070) diff --git a/VERSION b/VERSION index dbf0637bab..725659e7c0 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.11.1 +v0.12.0 diff --git a/stable.txt b/stable.txt index 7e38472ca7..8a5f2e8dfe 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.11.1 +v0.12.0 \ No newline at end of file From b0487429672a1f9939335f47fd48551625c3eb54 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 31 Jan 2024 16:37:20 +0800 Subject: [PATCH 08/18] Fix SIGSEGV with zstd compression enabled --- pulsar/internal/compression/zstd_cgo.go | 5 +++++ pulsar/producer_test.go | 28 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 25429e2512..3b32323001 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -25,6 +25,8 @@ package compression import ( + "sync" + "github.com/DataDog/zstd" log "github.com/sirupsen/logrus" ) @@ -33,6 +35,7 @@ type zstdCGoProvider struct { ctx zstd.Ctx level Level zstdLevel int + mu sync.Mutex } func newCGoZStdProvider(level Level) Provider { @@ -61,6 +64,8 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { } func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { + z.mu.Lock() + defer z.mu.Unlock() out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0d74cdeefe..3b9ea7e8da 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } +func TestSendConcurrently(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + CompressionType: ZSTD, + CompressionLevel: Better, + DisableBatching: true, + }) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + _, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 100), + }) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() +} + type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer From 1ebc162a98156972ccddd5300b5dbffe15e901c1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 31 Jan 2024 16:44:26 +0800 Subject: [PATCH 09/18] Revert "Fix SIGSEGV with zstd compression enabled" This reverts commit b0487429672a1f9939335f47fd48551625c3eb54. --- pulsar/internal/compression/zstd_cgo.go | 5 ----- pulsar/producer_test.go | 28 ------------------------- 2 files changed, 33 deletions(-) diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 3b32323001..25429e2512 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -25,8 +25,6 @@ package compression import ( - "sync" - "github.com/DataDog/zstd" log "github.com/sirupsen/logrus" ) @@ -35,7 +33,6 @@ type zstdCGoProvider struct { ctx zstd.Ctx level Level zstdLevel int - mu sync.Mutex } func newCGoZStdProvider(level Level) Provider { @@ -64,8 +61,6 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { } func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { - z.mu.Lock() - defer z.mu.Unlock() out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 3b9ea7e8da..0d74cdeefe 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2357,34 +2357,6 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } -func TestSendConcurrently(t *testing.T) { - client, err := NewClient(ClientOptions{ - URL: lookupURL, - }) - assert.NoError(t, err) - defer client.Close() - testProducer, err := client.CreateProducer(ProducerOptions{ - Topic: newTopicName(), - CompressionType: ZSTD, - CompressionLevel: Better, - DisableBatching: true, - }) - assert.NoError(t, err) - - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - _, err := testProducer.Send(context.Background(), &ProducerMessage{ - Payload: make([]byte, 100), - }) - assert.NoError(t, err) - wg.Done() - }() - } - wg.Wait() -} - type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer From 877613503b70c4ee67a8408f31881d88fc086456 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 2 Feb 2024 01:24:50 +0800 Subject: [PATCH 10/18] Fix SIGSEGV with zstd compression enabled (#1164) * Fix SIGSEGV with zstd compression enabled * Use sync.Pool to cache zstd ctx * Fix race in sequenceID assignment * Fix GetAndAdd --- pulsar/internal/compression/zstd_cgo.go | 16 ++++++++++---- pulsar/internal/utils.go | 2 +- pulsar/producer_test.go | 28 +++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 25429e2512..dde54ae29e 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -25,19 +25,23 @@ package compression import ( + "sync" + "github.com/DataDog/zstd" log "github.com/sirupsen/logrus" ) type zstdCGoProvider struct { - ctx zstd.Ctx + ctxPool sync.Pool level Level zstdLevel int } func newCGoZStdProvider(level Level) Provider { z := &zstdCGoProvider{ - ctx: zstd.NewCtx(), + ctxPool: sync.Pool{New: func() any { + return zstd.NewCtx() + }}, } switch level { @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { } func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { - out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + out, err := ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") } @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { } func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { - return z.ctx.Decompress(dst, src) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + return ctx.Decompress(dst, src) } func (z *zstdCGoProvider) Close() error { diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go index 9378d9dcb6..2dc8210147 100644 --- a/pulsar/internal/utils.go +++ b/pulsar/internal/utils.go @@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 { // GetAndAdd perform atomic read and update func GetAndAdd(n *uint64, diff uint64) uint64 { for { - v := *n + v := atomic.LoadUint64(n) if atomic.CompareAndSwapUint64(n, v, v+diff) { return v } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0d74cdeefe..3b9ea7e8da 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } +func TestSendConcurrently(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + CompressionType: ZSTD, + CompressionLevel: Better, + DisableBatching: true, + }) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + _, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 100), + }) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() +} + type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer From 2a28e21c59d005515e118fed5bf8f333d6699e39 Mon Sep 17 00:00:00 2001 From: Jayant Date: Thu, 1 Feb 2024 21:52:52 -0500 Subject: [PATCH 11/18] [Producer] respect context cancellation in Flush (#1165) ### Motivation The producer's `Flush` method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing. ### Modifications This change adds a `FlushWithCtx` method which takes a context and selects on two channels. --- pulsar/consumer_test.go | 6 ++--- .../producer_interceptor_test.go | 4 ++++ pulsar/producer.go | 7 ++++-- pulsar/producer_impl.go | 6 ++++- pulsar/producer_partition.go | 18 +++++++++++--- pulsar/producer_test.go | 24 +++++++++---------- pulsar/reader_test.go | 6 ++--- 7 files changed, 47 insertions(+), 24 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index df70b0dd0b..d66e23765d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { } wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // send another batch @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { }, nil) } - producer.Flush() + producer.FlushWithCtx(context.Background()) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } - assert.Nil(t, producer.Flush()) + assert.Nil(t, producer.FlushWithCtx(context.Background())) msgIds := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 8d8e6965b8..1c2c712fcf 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error { return nil } +func (p *mockProducer) FlushWithCtx(ctx context.Context) error { + return nil +} + func (p *mockProducer) Close() {} diff --git a/pulsar/producer.go b/pulsar/producer.go index 70d152c78b..f8013a16ff 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -237,10 +237,13 @@ type Producer interface { // return the last sequence id published by this producer. LastSequenceID() int64 - // Flush all the messages buffered in the client and wait until all messages have been successfully - // persisted. + // Deprecated: Use `FlushWithCtx()` instead. Flush() error + // Flush all the messages buffered in the client and wait until all messageshave been successfully + // persisted. + FlushWithCtx(ctx context.Context) error + // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597d0..ca923108fe 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { - if err := pp.Flush(); err != nil { + if err := pp.FlushWithCtx(ctx); err != nil { return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..fbcc5b9776 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { flushReq := &flushRequest{ doneCh: make(chan struct{}), err: nil, } - p.cmdChan <- flushReq + select { + case <-ctx.Done(): + return ctx.Err() + case p.cmdChan <- flushReq: + } // wait for the flush request to complete - <-flushReq.doneCh - return flushReq.err + select { + case <-ctx.Done(): + return ctx.Err() + case <-flushReq.doneCh: + return flushReq.err + } } func (p *partitionProducer) getProducerState() producerState { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 3b9ea7e8da..ba5911565e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) { }) assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) { assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { } // After flush, should be able to consume. - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { assert.NotNil(t, id) }) } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) { cancel() wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..c8228a7ca9 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) { seekID = id } } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) for i := 0; i < N; i++ { From f476814a9e1bac484e9589d4fb3f299066f54ddc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 6 Feb 2024 19:24:55 -0800 Subject: [PATCH 12/18] Added CodeQL static code scanner (#1169) * Added CodeQL static code scanner * Update codeql.yml --- .github/workflows/codeql.yml | 90 ++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 .github/workflows/codeql.yml diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000000..869a19ad80 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "CodeQL" + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + schedule: + - cron: '43 13 * * 0' + +jobs: + analyze: + name: Analyze + # Runner size impacts CodeQL analysis time. To learn more, please see: + # - https://gh.io/recommended-hardware-resources-for-running-codeql + # - https://gh.io/supported-runners-and-hardware-resources + # - https://gh.io/using-larger-runners + # Consider using larger runners for possible analysis time improvements. + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} + permissions: + # required for all workflows + security-events: write + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ] + # Use only 'java-kotlin' to analyze code written in Java, Kotlin or both + # Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both + # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + + # Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v3 + + # ℹ️ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + + # If the Autobuild fails above, remove it and uncomment the following three lines. + # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + + # - run: | + # echo "Run, Build Application using script" + # ./location_of_script_within_repo/buildscript.sh + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: + category: "/language:${{matrix.language}}" From c2ca7e81f0c609cd8fb7b13695664519e63e4501 Mon Sep 17 00:00:00 2001 From: Peter Hull <56369394+petermnhull@users.noreply.github.com> Date: Tue, 20 Feb 2024 02:31:00 +0000 Subject: [PATCH 13/18] [Fix] Fix Bytes Schema (#1173) --- pulsar/schema.go | 2 ++ pulsar/schema_test.go | 64 +++++++++++++++++++++++++++++++++++++++ pulsar/table_view_test.go | 7 +++++ 3 files changed, 73 insertions(+) diff --git a/pulsar/schema.go b/pulsar/schema.go index fd9d412dc6..3427fb263a 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s var schemaDef = string(schemaData) var s Schema switch schemaType { + case BYTES: + s = NewBytesSchema(properties) case STRING: s = NewStringSchema(properties) case JSON: diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6de9..34216c4779 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "log" "testing" + "time" pb "github.com/apache/pulsar-client-go/integration-tests/pb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testJSON struct { @@ -55,6 +58,67 @@ func createClient() Client { return client } +func TestBytesSchema(t *testing.T) { + client := createClient() + defer client.Close() + + topic := newTopicName() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + producerSchemaBytes := NewBytesSchema(properties) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: producerSchemaBytes, + }) + assert.NoError(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`{"key": "value"}`), + }) + require.NoError(t, err) + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`something else`), + }) + require.NoError(t, err) + producer.Close() + + // Create consumer + consumerSchemaBytes := NewBytesSchema(nil) + assert.NotNil(t, consumerSchemaBytes) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Schema: consumerSchemaBytes, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Receive first message + var out1 []byte + msg1, err := consumer.Receive(ctx) + assert.NoError(t, err) + err = msg1.GetSchemaValue(&out1) + assert.NoError(t, err) + assert.Equal(t, []byte(`{"key": "value"}`), out1) + consumer.Ack(msg1) + require.NoError(t, err) + + // Receive second message + var out2 []byte + msg2, err := consumer.Receive(ctx) + fmt.Println(string(msg2.Payload())) + assert.NoError(t, err) + err = msg2.GetSchemaValue(&out2) + assert.NoError(t, err) + assert.Equal(t, []byte(`something else`), out2) + + defer consumer.Close() +} + func TestJsonSchema(t *testing.T) { client := createClient() defer client.Close() diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b9441169..2368e3d846 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) { expValueOut interface{} valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks }{ + { + name: "BytesSchema", + schema: NewBytesSchema(nil), + schemaType: []byte(`any`), + producerValue: []byte(`hello pulsar`), + expValueOut: []byte(`hello pulsar`), + }, { name: "StringSchema", schema: NewStringSchema(nil), From 3b9b1f8895d8924ec98db4612806b9871f1d135b Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 23 Feb 2024 15:49:08 +0800 Subject: [PATCH 14/18] [feat] Support partitioned topic reader (#1178) Master Issue: #1177 ### Motivation Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 ### Modifications - Support partitioned topic reader --- pulsar/consumer.go | 7 +++ pulsar/consumer_impl.go | 47 ++++++++++++++- pulsar/consumer_partition.go | 36 +++++++++++ pulsar/reader.go | 1 + pulsar/reader_impl.go | 112 +++++++++++++++-------------------- pulsar/reader_test.go | 100 +++++++++++++++++++++++++++---- 6 files changed, 227 insertions(+), 76 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66cd..fea94cf6a3 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 75d839b412..0c31a1aafc 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata: metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + + var wg sync.WaitGroup + wg.Add(len(c.consumers)) + + hasNext := make(chan bool) + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + select { + case hasNext <- true: + case <-ctx.Done(): + } + } + }() + } + + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false +} + +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1cb..95b5bc0946 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,6 +174,8 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker ackGroupingTracker ackGroupingTracker + + lastMessageInBroker *trackingMessageID } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -1970,6 +1972,40 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, pc.availablePermits.inc() } +func (pc *partitionConsumer) hasNext() bool { + if pc.lastMessageInBroker != nil && pc.hasMoreMessages() { + return true + } + + for { + lastMsgID, err := pc.getLastMessageID() + if err != nil { + pc.log.WithError(err).Error("Failed to get last message id from broker") + continue + } else { + pc.lastMessageInBroker = lastMsgID + break + } + } + + return pc.hasMoreMessages() +} + +func (pc *partitionConsumer) hasMoreMessages() bool { + if pc.lastDequeuedMsg != nil { + return pc.lastMessageInBroker.isEntryIDValid() && pc.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greaterEqual(pc.startMessageID.get().messageID) + } + + // Non-inclusive + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greater(pc.startMessageID.get().messageID) +} + // _setConn sets the internal connection field of this partition consumer atomically. // Note: should only be called by this partition consumer when a new connection is available. func (pc *partitionConsumer) _setConn(conn internal.Connection) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b988..1c5235d422 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -136,5 +136,6 @@ type Reader interface { SeekByTime(time time.Time) error // GetLastMessageID get the last message id available for consume. + // It only works for single topic reader. It will return an error when the reader is the multi-topic reader. GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7b260b88db..bf91c67fa5 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -34,12 +34,11 @@ const ( type reader struct { sync.Mutex - client *client - pc *partitionConsumer - messageCh chan ConsumerMessage - lastMessageInBroker *trackingMessageID - log log.Logger - metrics *internal.LeveledMetrics + client *client + messageCh chan ConsumerMessage + log log.Logger + metrics *internal.LeveledMetrics + c *consumer } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -98,25 +97,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { options.ExpireTimeOfIncompleteChunk = time.Minute } - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, + consumerOptions := &ConsumerOptions{ + Topic: options.Topic, + Name: options.Name, + SubscriptionName: subscriptionName, + Type: Exclusive, + ReceiverQueueSize: receiverQueueSize, + SubscriptionMode: NonDurable, + ReadCompacted: options.ReadCompacted, + Properties: options.Properties, + NackRedeliveryDelay: defaultNackRedeliveryDelay, + ReplicateSubscriptionState: false, + Decryption: options.Decryption, + Schema: options.Schema, + BackoffPolicy: options.BackoffPolicy, + MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, + ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: NonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, - schema: options.Schema, - backoffPolicy: options.BackoffPolicy, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + StartMessageIDInclusive: options.StartMessageIDInclusive, } reader := &reader{ @@ -131,20 +130,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { if err != nil { return nil, err } + // Provide dummy rlq router with not dlq policy + rlq, err := newRetryRouter(client, nil, false, client.log) + if err != nil { + return nil, err + } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) if err != nil { close(reader.messageCh) return nil, err } + reader.c = c - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.c.topic } func (r *reader) Next(ctx context.Context) (Message, error) { @@ -158,9 +162,14 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() - mid := toTrackingMessageID(msgID) - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + err := r.c.setLastDequeuedMsg(msgID) + if err != nil { + return nil, err + } + err = r.c.AckID(msgID) + if err != nil { + return nil, err + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -169,41 +178,11 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if r.lastMessageInBroker != nil && r.hasMoreMessages() { - return true - } - - for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - r.lastMessageInBroker = lastMsgID - break - } - } - - return r.hasMoreMessages() -} - -func (r *reader) hasMoreMessages() bool { - if r.pc.lastDequeuedMsg != nil { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) - } - - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.get().messageID) - } - - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greater(r.pc.startMessageID.get().messageID) + return r.c.hasNext() } func (r *reader) Close() { - r.pc.Close() + r.c.Close() r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() } @@ -235,16 +214,19 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.c.Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + return r.c.SeekByTime(time) } func (r *reader) GetLastMessageID() (MessageID, error) { - return r.pc.getLastMessageID() + if len(r.c.consumers) > 1 { + return nil, fmt.Errorf("GetLastMessageID is not supported for multi-topics reader") + } + return r.c.consumers[0].getLastMessageID() } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c8228a7ca9..ccf52875ba 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,6 +24,9 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -90,10 +93,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r1.Close() // verify specified chunk options - pcOpts := r1.(*reader).pc.options - assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk) - assert.True(t, pcOpts.autoAckIncompleteChunk) + pcOpts := r1.(*reader).c.options + assert.Equal(t, 50, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, 30*time.Second, pcOpts.ExpireTimeOfIncompleteChunk) + assert.True(t, pcOpts.AutoAckIncompleteChunk) r2, err := client.CreateReader(ReaderOptions{ Topic: "my-topic2", @@ -103,10 +106,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r2.Close() // verify default chunk options - pcOpts = r2.(*reader).pc.options - assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk) - assert.False(t, pcOpts.autoAckIncompleteChunk) + pcOpts = r2.(*reader).c.options + assert.Equal(t, 100, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, time.Minute, pcOpts.ExpireTimeOfIncompleteChunk) + assert.False(t, pcOpts.AutoAckIncompleteChunk) } func TestReader(t *testing.T) { @@ -153,6 +156,50 @@ func TestReader(t *testing.T) { } } +func TestReaderOnPartitionedTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + ctx := context.Background() + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650", @@ -422,7 +469,6 @@ func TestReaderHasNext(t *testing.T) { assert.NotNil(t, msgID) } - // create reader on 5th message (not included) reader, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), @@ -880,7 +926,7 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.NotNil(t, _reader) assert.Nil(t, err) - partitionConsumerImp := _reader.(*reader).pc + partitionConsumerImp := _reader.(*reader).c.consumers[0] // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker() @@ -943,3 +989,37 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +func TestReaderGetLastMessageIDOnMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + _, err = reader.GetLastMessageID() + assert.NotNil(t, err) +} + +func createPartitionedTopic(topic string, n int) error { + admin, err := pulsaradmin.NewClient(&config.Config{}) + if err != nil { + return err + } + + topicName, err := utils.GetTopicName(topic) + if err != nil { + return err + } + err = admin.Topics().Create(*topicName, n) + if err != nil { + return err + } + return nil +} From 5d258272cb83444fe156dcbb57cbf8f2d475a50b Mon Sep 17 00:00:00 2001 From: Jinjun Pan <75996911+panszobe@users.noreply.github.com> Date: Fri, 23 Feb 2024 21:47:37 +0800 Subject: [PATCH 15/18] [Fix] Fix available permits in MessageReceived (#1181) Fixes #1180 ### Motivation In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent. --------- Co-authored-by: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg> --- pulsar/consumer_partition.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 95b5bc0946..3572a52269 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) - var bytesReceived int + var ( + bytesReceived int + skippedMessages int32 + ) for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if ackSet != nil && !ackSet.Test(uint(i)) { pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + skippedMessages++ continue } @@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) + skippedMessages++ continue } @@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if pc.ackGroupingTracker.isDuplicate(msgID) { + skippedMessages++ continue } @@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } + if skippedMessages > 0 { + pc.availablePermits.add(skippedMessages) + } + // send messages to the dispatcher pc.queueCh <- messages return nil From a881240db862e48f1791575aecb10ecc149bfa8b Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Feb 2024 00:59:34 +0800 Subject: [PATCH 16/18] fix: make function state values `omitempty` (#1185) * make function state values omitempty * fix --- pulsaradmin/pkg/utils/function_state.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsaradmin/pkg/utils/function_state.go b/pulsaradmin/pkg/utils/function_state.go index 63fa15057c..c8a8c4b8cb 100644 --- a/pulsaradmin/pkg/utils/function_state.go +++ b/pulsaradmin/pkg/utils/function_state.go @@ -19,8 +19,8 @@ package utils type FunctionState struct { Key string `json:"key"` - StringValue string `json:"stringValue"` - ByteValue []byte `json:"byteValue"` - NumValue int64 `json:"numberValue"` - Version int64 `json:"version"` + StringValue string `json:"stringValue,omitempty"` + ByteValue []byte `json:"byteValue,omitempty"` + NumValue int64 `json:"numberValue,omitempty"` + Version int64 `json:"version,omitempty"` } From 88a8d85cf6d6a4f282a5b39a2140a7bb06ba0f3b Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Feb 2024 18:39:00 +0800 Subject: [PATCH 17/18] [fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182) Fixes #1171 ### Motivation If `getLastMessageId` continually fails, the reader.HasNext can get stuck in an infinite loop. Without any backoff, the reader would keep trying forever. ### Modifications - Implemented a backoff policy for `getLastMessageID`. - If HasNext fails, it now returns false. #### Should the reader.HasNext returned `false` in case of failure? Currently, the `HasNext` method doesn't report errors. However, failure is still possible. For instance, if `getLastMessageID` repeatedly fails and hits the retry limit. An option is to keep trying forever, but this would stall all user code. This isn't user-friendly, so I rejected this solution. #### Couldn't utilize the BackOffPolicy in the Reader Options The `HasNext` retry mechanism requires to use of `IsMaxBackoffReached` for the backoff. But it isn't exposed in the `BackOffPolicy` interface. Introducing a new method to the `BackOffPolicy` would introduce breaking changes for the user backoff implementation. So, I choose not to implement it. Before we do it, we need to refine the BackOffPolicy. --- pulsar/client_impl.go | 24 +++++++------- pulsar/consumer_partition.go | 53 ++++++++++++++++++++--------- pulsar/reader.go | 1 + pulsar/reader_test.go | 64 ++++++++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7daf6f62ab..65aed3b963 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -40,14 +40,15 @@ const ( ) type client struct { - cnxPool internal.ConnectionPool - rpcClient internal.RPCClient - handlers internal.ClientHandlers - lookupService internal.LookupService - metrics *internal.Metrics - tcClient *transactionCoordinatorClient - memLimit internal.MemoryLimitController - closeOnce sync.Once + cnxPool internal.ConnectionPool + rpcClient internal.RPCClient + handlers internal.ClientHandlers + lookupService internal.LookupService + metrics *internal.Metrics + tcClient *transactionCoordinatorClient + memLimit internal.MemoryLimitController + closeOnce sync.Once + operationTimeout time.Duration log log.Logger } @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), - log: logger, - metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + log: logger, + metrics: metrics, + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + operationTimeout: operationTimeout, } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3572a52269..162565b2a9 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") - return nil, errors.New("failed to redeliver closing or closed consumer") + pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") + return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - req := &getLastMsgIDRequest{doneCh: make(chan struct{})} - pc.eventsCh <- req + remainTime := pc.client.operationTimeout + var backoff internal.BackoffPolicy + if pc.options.backoffPolicy != nil { + backoff = pc.options.backoffPolicy + } else { + backoff = &internal.DefaultBackoff{} + } + request := func() (*trackingMessageID, error) { + req := &getLastMsgIDRequest{doneCh: make(chan struct{})} + pc.eventsCh <- req - // wait for the request to complete - <-req.doneCh - return req.msgID, req.err + // wait for the request to complete + <-req.doneCh + return req.msgID, req.err + } + for { + msgID, err := request() + if err == nil { + return msgID, nil + } + if remainTime <= 0 { + pc.log.WithError(err).Error("Failed to getLastMessageID") + return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) + } + nextDelay := backoff.Next() + if nextDelay > remainTime { + nextDelay = remainTime + } + remainTime -= nextDelay + pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) + time.Sleep(nextDelay) + } } func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { @@ -1987,16 +2013,11 @@ func (pc *partitionConsumer) hasNext() bool { return true } - for { - lastMsgID, err := pc.getLastMessageID() - if err != nil { - pc.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - pc.lastMessageInBroker = lastMsgID - break - } + lastMsgID, err := pc.getLastMessageID() + if err != nil { + return false } + pc.lastMessageInBroker = lastMsgID return pc.hasMoreMessages() } diff --git a/pulsar/reader.go b/pulsar/reader.go index 1c5235d422..4daa889062 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -113,6 +113,7 @@ type Reader interface { Next(context.Context) (Message, error) // HasNext checks if there is any message available to read from the current position + // If there is any errors, it will return false HasNext() bool // Close the reader and stop the broker to push more messages diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ccf52875ba..78c222dac7 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -1023,3 +1024,66 @@ func createPartitionedTopic(topic string, n int) error { } return nil } + +func TestReaderHasNextFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + r.(*reader).c.consumers[0].state.Store(consumerClosing) + assert.False(t, r.HasNext()) +} + +func TestReaderHasNextRetryFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + OperationTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + + c := make(chan interface{}) + defer close(c) + + // Close the consumer events loop and assign a mock eventsCh + pc := r.(*reader).c.consumers[0] + pc.Close() + pc.state.Store(consumerReady) + pc.eventsCh = c + + go func() { + for e := range c { + req, ok := e.(*getLastMsgIDRequest) + assert.True(t, ok, "unexpected event type") + req.err = errors.New("expected error") + close(req.doneCh) + } + }() + minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s + maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s + done := make(chan bool) + go func() { + assert.False(t, r.HasNext()) + done <- true + }() + + select { + case <-maxTimer.C: + t.Fatal("r.HasNext() blocked for more than 3s") + case <-done: + assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s") + assert.True(t, maxTimer.Stop()) + } + +} From c4f47abd7c82402e7d4e95b55b24ebfef4bdb029 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 5 Mar 2024 18:40:40 +0800 Subject: [PATCH 18/18] [Improve] Add optional parameters for getPartitionedStats (#1193) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/21611 ### Modifications Add `GetStatsOptions` params. --- Makefile | 2 +- pulsar/consumer_test.go | 12 +-- pulsaradmin/pkg/admin/topic.go | 34 ++++++ pulsaradmin/pkg/admin/topic_test.go | 160 ++++++++++++++++++++++++++++ pulsaradmin/pkg/utils/data.go | 8 ++ 5 files changed, 209 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 4eb590b06b..d044237221 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ # IMAGE_NAME = pulsar-client-go-test:latest -PULSAR_VERSION ?= 2.10.3 +PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 GOLANG_IMAGE = golang:$(GO_VERSION) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index d66e23765d..4a3b532d05 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -2219,6 +2219,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer producer.Close() + // Increase number of partitions to 10 + makeHTTPCall(t, http.MethodPost, testURL, "10") + + // Wait for the producer/consumers to pick up the change + time.Sleep(1 * time.Second) + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub", @@ -2227,12 +2233,6 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - // Increase number of partitions to 10 - makeHTTPCall(t, http.MethodPost, testURL, "10") - - // Wait for the producer/consumers to pick up the change - time.Sleep(1 * time.Second) - // Publish messages ensuring that they will go to all the partitions ctx := context.Background() for i := 0; i < 10; i++ { diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index c888827bf8..e6057413d2 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -75,6 +75,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetStats(utils.TopicName) (utils.TopicStats, error) + // GetStatsWithOption returns the stats for the topic + GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error) + // GetInternalStats returns the internal stats for the topic. GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error) @@ -82,6 +85,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error) + // GetPartitionedStatsWithOption returns the stats for the partitioned topic + GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error) + // Terminate the topic and prevent any more messages being published on it Terminate(utils.TopicName) (utils.MessageID, error) @@ -395,6 +401,19 @@ func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) { err := t.pulsar.Client.Get(endpoint, &stats) return stats, err } +func (t *topics) GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error) { + var stats utils.TopicStats + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats") + params := map[string]string{ + "getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog), + "subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize), + "getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog), + "excludePublishers": strconv.FormatBool(option.ExcludePublishers), + "excludeConsumers": strconv.FormatBool(option.ExcludeConsumers), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) + return stats, err +} func (t *topics) GetInternalStats(topic utils.TopicName) (utils.PersistentTopicInternalStats, error) { var stats utils.PersistentTopicInternalStats @@ -412,6 +431,21 @@ func (t *topics) GetPartitionedStats(topic utils.TopicName, perPartition bool) ( _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) return stats, err } +func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName, perPartition bool, + option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) { + var stats utils.PartitionedTopicStats + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats") + params := map[string]string{ + "perPartition": strconv.FormatBool(perPartition), + "getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog), + "subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize), + "getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog), + "excludePublishers": strconv.FormatBool(option.ExcludePublishers), + "excludeConsumers": strconv.FormatBool(option.ExcludeConsumers), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) + return stats, err +} func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "terminate") diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index 06c33f2ef5..a9b1f00283 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -18,12 +18,23 @@ package admin import ( + "context" + "fmt" + "log" "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/stretchr/testify/assert" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) +var ( + lookupURL = "pulsar://localhost:6650" +) + func TestCreateTopic(t *testing.T) { checkError := func(err error) { if err != nil { @@ -53,3 +64,152 @@ func TestCreateTopic(t *testing.T) { } t.Error("Couldn't find topic: " + topic) } + +func TestPartitionState(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Send message + ctx := context.Background() + + // create consumer + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: pulsar.Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + stats, err := admin.Topics().GetPartitionedStatsWithOption(*topicName, true, utils.GetStatsOptions{ + GetPreciseBacklog: false, + SubscriptionBacklogSize: false, + GetEarliestTimeInBacklog: false, + ExcludePublishers: true, + ExcludeConsumers: true, + }) + assert.Nil(t, err) + assert.Equal(t, len(stats.Publishers), 0) + + for _, topicStats := range stats.Partitions { + assert.Equal(t, len(topicStats.Publishers), 0) + for _, subscriptionStats := range topicStats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + } + + for _, subscriptionStats := range stats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + +} +func TestNonPartitionState(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create non-partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 0) + assert.NoError(t, err) + + // Send message + ctx := context.Background() + + // create consumer + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: pulsar.Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + stats, err := admin.Topics().GetStatsWithOption(*topicName, utils.GetStatsOptions{ + GetPreciseBacklog: false, + SubscriptionBacklogSize: false, + GetEarliestTimeInBacklog: false, + ExcludePublishers: true, + ExcludeConsumers: true, + }) + assert.Nil(t, err) + assert.Equal(t, len(stats.Publishers), 0) + for _, subscriptionStats := range stats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + +} + +func newTopicName() string { + return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond()) +} diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go index 55888aab2b..cc797d1892 100644 --- a/pulsaradmin/pkg/utils/data.go +++ b/pulsaradmin/pkg/utils/data.go @@ -465,3 +465,11 @@ type CompactedLedger struct { Offloaded bool `json:"offloaded"` UnderReplicated bool `json:"underReplicated"` } + +type GetStatsOptions struct { + GetPreciseBacklog bool `json:"get_precise_backlog"` + SubscriptionBacklogSize bool `json:"subscription_backlog_size"` + GetEarliestTimeInBacklog bool `json:"get_earliest_time_in_backlog"` + ExcludePublishers bool `json:"exclude_publishers"` + ExcludeConsumers bool `json:"exclude_consumers"` +}