diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f0c9b63999..9ad0b3f255 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.16, 1.17, 1.18, 1.19] + go-version: ['1.18', '1.19', '1.20', '1.21'] steps: - uses: actions/checkout@v3 - name: clean docker cache diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000000..869a19ad80 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "CodeQL" + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + schedule: + - cron: '43 13 * * 0' + +jobs: + analyze: + name: Analyze + # Runner size impacts CodeQL analysis time. To learn more, please see: + # - https://gh.io/recommended-hardware-resources-for-running-codeql + # - https://gh.io/supported-runners-and-hardware-resources + # - https://gh.io/using-larger-runners + # Consider using larger runners for possible analysis time improvements. + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} + permissions: + # required for all workflows + security-events: write + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ] + # Use only 'java-kotlin' to analyze code written in Java, Kotlin or both + # Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both + # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + + # Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v3 + + # ℹ️ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + + # If the Autobuild fails above, remove it and uncomment the following three lines. + # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + + # - run: | + # echo "Run, Build Application using script" + # ./location_of_script_within_repo/buildscript.sh + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: + category: "/language:${{matrix.language}}" diff --git a/CHANGELOG.md b/CHANGELOG.md index d7afe2c3a0..7a4c81cb74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,85 @@ All notable changes to this project will be documented in this file. +[0.12.0] 2024-01-10 + +## What's Changed +* Improved the performance of schema and schema cache by @gunli in https://github.com/apache/pulsar-client-go/pull/1033 +* Fixed return when registerSendOrAckOp() failed by @gunli in https://github.com/apache/pulsar-client-go/pull/1045 +* Fixed the incorrect link in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1050 +* Fixed Producer by checking if message is nil by @gunli in https://github.com/apache/pulsar-client-go/pull/1047 +* Added 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1048 +* Fixed 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1054 +* Fixed issue 877 where ctx in partitionProducer.Send() was not performing as expected by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1053 +* Fixed Producer by stopping block request even if Value and Payload are both set by @gunli in https://github.com/apache/pulsar-client-go/pull/1052 +* Improved Producer by simplifying the flush logic by @gunli in https://github.com/apache/pulsar-client-go/pull/1049 +* Fixed issue 1051: inaccurate producer memory limit in chunking and schema by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1055 +* Fixed issue by sending Close Command on Producer/Consumer create timeout by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1061 +* Fixed issue 1057: producer flush operation is not guaranteed to flush all messages by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1058 +* Fixed issue 1064: panic when trying to flush in DisableBatching=true by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1065 +* Fixed transaction acknowledgement and send logic for chunk message by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1069 +* Fixed issue by closing consumer resources if creation fails by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1070 +* Fixed issue where client reconnected every authenticationRefreshCheckSeconds when using TLS authentication by @jffp113 in https://github.com/apache/pulsar-client-go/pull/1062 +* Corrected the SendAsync() description by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1066 +* CI: replaced license header checker and formatter by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1077 +* Chore: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1080 +* Adopted pulsar-admin-go sources by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1079 +* Reverted: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1081 +* Fixed producer by failing all messages that are pending requests when closing like Java by @graysonzeng in https://github.com/apache/pulsar-client-go/pull/1059 +* Supported load config from env by @tuteng in https://github.com/apache/pulsar-client-go/pull/1089 +* Fixed issue where multiple calls to client.Close causes panic by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1046 +* Improved client by implementing GetLastMSgID for Reader by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1087 +* Fixed comment for ConnectionMaxIdleTime by @massakam in https://github.com/apache/pulsar-client-go/pull/1091 +* Issue 1094: connectionTimeout respects net.Dialer default timeout by @zzzming in https://github.com/apache/pulsar-client-go/pull/1095 +* Supported OAuth2 with scope field by @labuladong in https://github.com/apache/pulsar-client-go/pull/1097 +* Fixed issue where DisableReplication flag does not work by @massakam in https://github.com/apache/pulsar-client-go/pull/1100 +* Double-checked before consumer reconnect by @zccold in https://github.com/apache/pulsar-client-go/pull/1084 +* Fixed schema error by @leizhiyuan in https://github.com/apache/pulsar-client-go/pull/823 +* PR-1071-1: renamed pendingItem.Complete() to pendingItem.done() by @gunli in https://github.com/apache/pulsar-client-go/pull/1109 +* PR-1071-2: added sendRequest.done() to release resource together by @gunli in https://github.com/apache/pulsar-client-go/pull/1110 +* Refactor: factored out validateMsg by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1117 +* Refactor: factored out prepareTransaction by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1118 +* Completed comment on ProducerInterceptor interface BeforeSend method by @ojcm in https://github.com/apache/pulsar-client-go/pull/1119 +* Refactor: prepared sendrequest and moved to internalSendAsync by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1120 +* Fix: normalized all send request resource release into sr.done by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1121 +* Improvement: added func blockIfQueueFull() to encapsulate DisableBlockIfQue… by @gunli in https://github.com/apache/pulsar-client-go/pull/1122 +* Improved debug log clarity in ReceivedSendReceipt() by @gunli in https://github.com/apache/pulsar-client-go/pull/1123 +* Fixed issue 1098 by checking batchBuilder in case batch is disabled by @zzzming in https://github.com/apache/pulsar-client-go/pull/1099 +* Fixed Producer by fixing reconnection backoff logic by @gunli in https://github.com/apache/pulsar-client-go/pull/1125 +* Added 0.11.1 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1092 +* Fixed dead link to the KEYS file in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1127 +* Improved performance by pooling sendRequest by @gunli in https://github.com/apache/pulsar-client-go/pull/1126 +* Fixed argument order to Errorf in TableView message handling by @ojcm in https://github.com/apache/pulsar-client-go/pull/1130 +* Fixed Producer by double-checking before reconnect by @gunli in https://github.com/apache/pulsar-client-go/pull/1131 +* Fixed issue where client must not retry connecting to broker when topic is terminated by @pkumar-singh in https://github.com/apache/pulsar-client-go/pull/1128 +* Issue 1132: Fixed JSONSchema unmarshalling in TableView by @ojcm in https://github.com/apache/pulsar-client-go/pull/1133 +* Improved by setting dlq producerName by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1137 +* Fixed channel deadlock in regexp consumer by @goncalo-rodrigues in https://github.com/apache/pulsar-client-go/pull/1141 +* Fixed Producer: handled TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting by @gunli in https://github.com/apache/pulsar-client-go/pull/1134 +* Transaction: Avoided a panic when using transaction by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1144 +* Improved by updating connection.lastDataReceivedTime when connection is ready by @gunli in https://github.com/apache/pulsar-client-go/pull/1145 +* Improved Producer by normalizing and exporting the errors by @gunli in https://github.com/apache/pulsar-client-go/pull/1143 +* Updated Unsubscribe() interface comment by @geniusjoe in https://github.com/apache/pulsar-client-go/pull/1146 +* Issue 1105: Fixed AutoTopicCreation for type non-partitioned by @tomjo in https://github.com/apache/pulsar-client-go/pull/1107 +* Added test for admin topic creation by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1152 +* Implemented GetTopicAutoCreation by @jiangpengcheng in https://github.com/apache/pulsar-client-go/pull/1151 +* Bumped github.com/dvsekhvalnov/jose2go from 1.5.0 to 1.6.0 by @dependabot in https://github.com/apache/pulsar-client-go/pull/1150 +* Bump golang.org/x/net from 0.0.0-20220225172249-27dd8689420f to 0.17.0 by @BewareMyPower in https://github.com/apache/pulsar-client-go/pull/1155 +* Fix DLQ producer name conflicts when multiples consumers send messages to DLQ by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1156 + +## New Contributors +* @jffp113 made their first contribution in https://github.com/apache/pulsar-client-go/pull/1062 +* @tuteng made their first contribution in https://github.com/apache/pulsar-client-go/pull/1089 +* @zccold made their first contribution in https://github.com/apache/pulsar-client-go/pull/1084 +* @ojcm made their first contribution in https://github.com/apache/pulsar-client-go/pull/1119 +* @pkumar-singh made their first contribution in https://github.com/apache/pulsar-client-go/pull/1128 +* @goncalo-rodrigues made their first contribution in https://github.com/apache/pulsar-client-go/pull/1141 +* @Gilthoniel made their first contribution in https://github.com/apache/pulsar-client-go/pull/1144 +* @geniusjoe made their first contribution in https://github.com/apache/pulsar-client-go/pull/1146 +* @tomjo made their first contribution in https://github.com/apache/pulsar-client-go/pull/1107 +* @jiangpengcheng made their first contribution in https://github.com/apache/pulsar-client-go/pull/1151 +* @dependabot made their first contribution in https://github.com/apache/pulsar-client-go/pull/1150 + [0.11.1] 2023-09-11 - Close consumer resources if the creation fails by @michaeljmarshall in [#1070](https://github.com/apache/pulsar-client-go/pull/1070) diff --git a/Makefile b/Makefile index 4eb590b06b..d044237221 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ # IMAGE_NAME = pulsar-client-go-test:latest -PULSAR_VERSION ?= 2.10.3 +PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 GOLANG_IMAGE = golang:$(GO_VERSION) diff --git a/VERSION b/VERSION index dbf0637bab..725659e7c0 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.11.1 +v0.12.0 diff --git a/go.mod b/go.mod index 52bfe43a96..3257d51047 100644 --- a/go.mod +++ b/go.mod @@ -24,13 +24,16 @@ require ( github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.0 go.uber.org/atomic v1.7.0 - golang.org/x/mod v0.5.1 + golang.org/x/mod v0.8.0 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/protobuf v1.30.0 ) -require github.com/golang/protobuf v1.5.2 +require ( + github.com/golang/protobuf v1.5.2 + github.com/hashicorp/go-multierror v1.1.1 +) require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -38,13 +41,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/danieljoos/wincred v1.1.2 // indirect - github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -56,10 +58,10 @@ require ( github.com/prometheus/procfs v0.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect - golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect - golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index e8a9e76be7..50a1ba3ebb 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= -github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= -github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= +github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -323,6 +323,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -357,6 +359,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -418,9 +422,13 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -429,6 +437,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7daf6f62ab..65aed3b963 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -40,14 +40,15 @@ const ( ) type client struct { - cnxPool internal.ConnectionPool - rpcClient internal.RPCClient - handlers internal.ClientHandlers - lookupService internal.LookupService - metrics *internal.Metrics - tcClient *transactionCoordinatorClient - memLimit internal.MemoryLimitController - closeOnce sync.Once + cnxPool internal.ConnectionPool + rpcClient internal.RPCClient + handlers internal.ClientHandlers + lookupService internal.LookupService + metrics *internal.Metrics + tcClient *transactionCoordinatorClient + memLimit internal.MemoryLimitController + closeOnce sync.Once + operationTimeout time.Duration log log.Logger } @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), - log: logger, - metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + log: logger, + metrics: metrics, + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + operationTimeout: operationTimeout, } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66cd..fea94cf6a3 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d701ab16d6..0c31a1aafc 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log) if err != nil { return nil, err } @@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata: metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + + var wg sync.WaitGroup + wg.Add(len(c.consumers)) + + hasNext := make(chan bool) + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + select { + case hasNext <- true: + case <-ctx.Done(): + } + } + }() + } + + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false +} + +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1cb..162565b2a9 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,6 +174,8 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker ackGroupingTracker ackGroupingTracker + + lastMessageInBroker *trackingMessageID } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -568,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") - return nil, errors.New("failed to redeliver closing or closed consumer") + pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") + return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - req := &getLastMsgIDRequest{doneCh: make(chan struct{})} - pc.eventsCh <- req + remainTime := pc.client.operationTimeout + var backoff internal.BackoffPolicy + if pc.options.backoffPolicy != nil { + backoff = pc.options.backoffPolicy + } else { + backoff = &internal.DefaultBackoff{} + } + request := func() (*trackingMessageID, error) { + req := &getLastMsgIDRequest{doneCh: make(chan struct{})} + pc.eventsCh <- req - // wait for the request to complete - <-req.doneCh - return req.msgID, req.err + // wait for the request to complete + <-req.doneCh + return req.msgID, req.err + } + for { + msgID, err := request() + if err == nil { + return msgID, nil + } + if remainTime <= 0 { + pc.log.WithError(err).Error("Failed to getLastMessageID") + return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) + } + nextDelay := backoff.Next() + if nextDelay > remainTime { + nextDelay = remainTime + } + remainTime -= nextDelay + pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) + time.Sleep(nextDelay) + } } func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { @@ -1091,7 +1119,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) - var bytesReceived int + var ( + bytesReceived int + skippedMessages int32 + ) for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1100,6 +1131,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if ackSet != nil && !ackSet.Test(uint(i)) { pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + skippedMessages++ continue } @@ -1118,6 +1150,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) + skippedMessages++ continue } @@ -1142,6 +1175,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if pc.ackGroupingTracker.isDuplicate(msgID) { + skippedMessages++ continue } @@ -1216,6 +1250,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } + if skippedMessages > 0 { + pc.availablePermits.add(skippedMessages) + } + // send messages to the dispatcher pc.queueCh <- messages return nil @@ -1970,6 +2008,35 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, pc.availablePermits.inc() } +func (pc *partitionConsumer) hasNext() bool { + if pc.lastMessageInBroker != nil && pc.hasMoreMessages() { + return true + } + + lastMsgID, err := pc.getLastMessageID() + if err != nil { + return false + } + pc.lastMessageInBroker = lastMsgID + + return pc.hasMoreMessages() +} + +func (pc *partitionConsumer) hasMoreMessages() bool { + if pc.lastDequeuedMsg != nil { + return pc.lastMessageInBroker.isEntryIDValid() && pc.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greaterEqual(pc.startMessageID.get().messageID) + } + + // Non-inclusive + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greater(pc.startMessageID.get().messageID) +} + // _setConn sets the internal connection field of this partition consumer atomically. // Note: should only be called by this partition consumer when a new connection is available. func (pc *partitionConsumer) _setConn(conn internal.Connection) { diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 3e5f1d61db..ebd7e4e196 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { @@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8b983d0d19..4a3b532d05 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { } wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // send another batch @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { }, nil) } - producer.Flush() + producer.FlushWithCtx(context.Background()) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) @@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { if prodOpt != nil { dlqPolicy.ProducerOptions = *prodOpt } - sub := "my-sub" + sub, consumerName := "my-sub", "my-consumer" + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: sub, NackRedeliveryDelay: 1 * time.Second, Type: Shared, DLQ: &dlqPolicy, + Name: consumerName, }) assert.Nil(t, err) defer consumer.Close() @@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { assert.Equal(t, []byte(expectMsg), msg.Payload()) // check dql produceName - assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub)) + assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName)) // check original messageId assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID]) @@ -2217,6 +2219,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer producer.Close() + // Increase number of partitions to 10 + makeHTTPCall(t, http.MethodPost, testURL, "10") + + // Wait for the producer/consumers to pick up the change + time.Sleep(1 * time.Second) + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub", @@ -2225,12 +2233,6 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - // Increase number of partitions to 10 - makeHTTPCall(t, http.MethodPost, testURL, "10") - - // Wait for the producer/consumers to pick up the change - time.Sleep(1 * time.Second) - // Publish messages ensuring that they will go to all the partitions ctx := context.Background() for i := 0; i < 10; i++ { @@ -3930,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } - assert.Nil(t, producer.Flush()) + assert.Nil(t, producer.FlushWithCtx(context.Background())) msgIds := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 5b9314bddc..6be35d7485 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -34,16 +34,18 @@ type dlqRouter struct { closeCh chan interface{} topicName string subscriptionName string + consumerName string log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string, logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy, topicName: topicName, subscriptionName: subscriptionName, + consumerName: consumerName, log: logger, } @@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema if opt.Name == "" { - opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName) + opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName) } // the origin code sets to LZ4 compression with no options diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 25429e2512..dde54ae29e 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -25,19 +25,23 @@ package compression import ( + "sync" + "github.com/DataDog/zstd" log "github.com/sirupsen/logrus" ) type zstdCGoProvider struct { - ctx zstd.Ctx + ctxPool sync.Pool level Level zstdLevel int } func newCGoZStdProvider(level Level) Provider { z := &zstdCGoProvider{ - ctx: zstd.NewCtx(), + ctxPool: sync.Pool{New: func() any { + return zstd.NewCtx() + }}, } switch level { @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { } func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { - out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + out, err := ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") } @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { } func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { - return z.ctx.Decompress(dst, src) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + return ctx.Decompress(dst, src) } func (z *zstdCGoProvider) Close() error { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 8d8e6965b8..1c2c712fcf 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error { return nil } +func (p *mockProducer) FlushWithCtx(ctx context.Context) error { + return nil +} + func (p *mockProducer) Close() {} diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go index 9378d9dcb6..2dc8210147 100644 --- a/pulsar/internal/utils.go +++ b/pulsar/internal/utils.go @@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 { // GetAndAdd perform atomic read and update func GetAndAdd(n *uint64, diff uint64) uint64 { for { - v := *n + v := atomic.LoadUint64(n) if atomic.CompareAndSwapUint64(n, v, v+diff) { return v } diff --git a/pulsar/producer.go b/pulsar/producer.go index 70d152c78b..f8013a16ff 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -237,10 +237,13 @@ type Producer interface { // return the last sequence id published by this producer. LastSequenceID() int64 - // Flush all the messages buffered in the client and wait until all messages have been successfully - // persisted. + // Deprecated: Use `FlushWithCtx()` instead. Flush() error + // Flush all the messages buffered in the client and wait until all messageshave been successfully + // persisted. + FlushWithCtx(ctx context.Context) error + // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597d0..ca923108fe 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { - if err := pp.Flush(); err != nil { + if err := pp.FlushWithCtx(ctx); err != nil { return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6cb1885305..268da5f94c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1426,15 +1426,27 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { flushReq := &flushRequest{ doneCh: make(chan struct{}), err: nil, } - p.cmdChan <- flushReq + select { + case <-ctx.Done(): + return ctx.Err() + case p.cmdChan <- flushReq: + } // wait for the flush request to complete - <-flushReq.doneCh - return flushReq.err + select { + case <-ctx.Done(): + return ctx.Err() + case <-flushReq.doneCh: + return flushReq.err + } } func (p *partitionProducer) getProducerState() producerState { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 9bea99ea59..9eea9abec2 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) { }) assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) { assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { } // After flush, should be able to consume. - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { assert.NotNil(t, id) }) } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) { cancel() wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } +func TestSendConcurrently(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + CompressionType: ZSTD, + CompressionLevel: Better, + DisableBatching: true, + }) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + _, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 100), + }) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() +} + func TestProducerSendDuplicatedMessages(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b988..4daa889062 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -113,6 +113,7 @@ type Reader interface { Next(context.Context) (Message, error) // HasNext checks if there is any message available to read from the current position + // If there is any errors, it will return false HasNext() bool // Close the reader and stop the broker to push more messages @@ -136,5 +137,6 @@ type Reader interface { SeekByTime(time time.Time) error // GetLastMessageID get the last message id available for consume. + // It only works for single topic reader. It will return an error when the reader is the multi-topic reader. GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 0999e88fee..bf91c67fa5 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -34,12 +34,11 @@ const ( type reader struct { sync.Mutex - client *client - pc *partitionConsumer - messageCh chan ConsumerMessage - lastMessageInBroker *trackingMessageID - log log.Logger - metrics *internal.LeveledMetrics + client *client + messageCh chan ConsumerMessage + log log.Logger + metrics *internal.LeveledMetrics + c *consumer } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -98,25 +97,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { options.ExpireTimeOfIncompleteChunk = time.Minute } - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, + consumerOptions := &ConsumerOptions{ + Topic: options.Topic, + Name: options.Name, + SubscriptionName: subscriptionName, + Type: Exclusive, + ReceiverQueueSize: receiverQueueSize, + SubscriptionMode: NonDurable, + ReadCompacted: options.ReadCompacted, + Properties: options.Properties, + NackRedeliveryDelay: defaultNackRedeliveryDelay, + ReplicateSubscriptionState: false, + Decryption: options.Decryption, + Schema: options.Schema, + BackoffPolicy: options.BackoffPolicy, + MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, + ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: NonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, - schema: options.Schema, - backoffPolicy: options.BackoffPolicy, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + StartMessageIDInclusive: options.StartMessageIDInclusive, } reader := &reader{ @@ -127,24 +126,29 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log) + if err != nil { + return nil, err + } + // Provide dummy rlq router with not dlq policy + rlq, err := newRetryRouter(client, nil, false, client.log) if err != nil { return nil, err } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) if err != nil { close(reader.messageCh) return nil, err } + reader.c = c - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.c.topic } func (r *reader) Next(ctx context.Context) (Message, error) { @@ -158,9 +162,14 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() - mid := toTrackingMessageID(msgID) - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + err := r.c.setLastDequeuedMsg(msgID) + if err != nil { + return nil, err + } + err = r.c.AckID(msgID) + if err != nil { + return nil, err + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -169,41 +178,11 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if r.lastMessageInBroker != nil && r.hasMoreMessages() { - return true - } - - for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - r.lastMessageInBroker = lastMsgID - break - } - } - - return r.hasMoreMessages() -} - -func (r *reader) hasMoreMessages() bool { - if r.pc.lastDequeuedMsg != nil { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) - } - - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.get().messageID) - } - - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greater(r.pc.startMessageID.get().messageID) + return r.c.hasNext() } func (r *reader) Close() { - r.pc.Close() + r.c.Close() r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() } @@ -235,16 +214,19 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.c.Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + return r.c.SeekByTime(time) } func (r *reader) GetLastMessageID() (MessageID, error) { - return r.pc.getLastMessageID() + if len(r.c.consumers) > 1 { + return nil, fmt.Errorf("GetLastMessageID is not supported for multi-topics reader") + } + return r.c.consumers[0].getLastMessageID() } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..78c222dac7 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,7 +24,11 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -90,10 +94,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r1.Close() // verify specified chunk options - pcOpts := r1.(*reader).pc.options - assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk) - assert.True(t, pcOpts.autoAckIncompleteChunk) + pcOpts := r1.(*reader).c.options + assert.Equal(t, 50, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, 30*time.Second, pcOpts.ExpireTimeOfIncompleteChunk) + assert.True(t, pcOpts.AutoAckIncompleteChunk) r2, err := client.CreateReader(ReaderOptions{ Topic: "my-topic2", @@ -103,10 +107,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r2.Close() // verify default chunk options - pcOpts = r2.(*reader).pc.options - assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk) - assert.False(t, pcOpts.autoAckIncompleteChunk) + pcOpts = r2.(*reader).c.options + assert.Equal(t, 100, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, time.Minute, pcOpts.ExpireTimeOfIncompleteChunk) + assert.False(t, pcOpts.AutoAckIncompleteChunk) } func TestReader(t *testing.T) { @@ -153,6 +157,50 @@ func TestReader(t *testing.T) { } } +func TestReaderOnPartitionedTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + ctx := context.Background() + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650", @@ -277,7 +325,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -353,7 +401,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -422,7 +470,6 @@ func TestReaderHasNext(t *testing.T) { assert.NotNil(t, msgID) } - // create reader on 5th message (not included) reader, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), @@ -592,7 +639,7 @@ func TestReaderSeek(t *testing.T) { seekID = id } } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) for i := 0; i < N; i++ { @@ -880,7 +927,7 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.NotNil(t, _reader) assert.Nil(t, err) - partitionConsumerImp := _reader.(*reader).pc + partitionConsumerImp := _reader.(*reader).c.consumers[0] // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker() @@ -943,3 +990,100 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +func TestReaderGetLastMessageIDOnMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + _, err = reader.GetLastMessageID() + assert.NotNil(t, err) +} + +func createPartitionedTopic(topic string, n int) error { + admin, err := pulsaradmin.NewClient(&config.Config{}) + if err != nil { + return err + } + + topicName, err := utils.GetTopicName(topic) + if err != nil { + return err + } + err = admin.Topics().Create(*topicName, n) + if err != nil { + return err + } + return nil +} + +func TestReaderHasNextFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + r.(*reader).c.consumers[0].state.Store(consumerClosing) + assert.False(t, r.HasNext()) +} + +func TestReaderHasNextRetryFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + OperationTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + + c := make(chan interface{}) + defer close(c) + + // Close the consumer events loop and assign a mock eventsCh + pc := r.(*reader).c.consumers[0] + pc.Close() + pc.state.Store(consumerReady) + pc.eventsCh = c + + go func() { + for e := range c { + req, ok := e.(*getLastMsgIDRequest) + assert.True(t, ok, "unexpected event type") + req.err = errors.New("expected error") + close(req.doneCh) + } + }() + minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s + maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s + done := make(chan bool) + go func() { + assert.False(t, r.HasNext()) + done <- true + }() + + select { + case <-maxTimer.C: + t.Fatal("r.HasNext() blocked for more than 3s") + case <-done: + assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s") + assert.True(t, maxTimer.Stop()) + } + +} diff --git a/pulsar/schema.go b/pulsar/schema.go index fd9d412dc6..3427fb263a 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s var schemaDef = string(schemaData) var s Schema switch schemaType { + case BYTES: + s = NewBytesSchema(properties) case STRING: s = NewStringSchema(properties) case JSON: diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6de9..34216c4779 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "log" "testing" + "time" pb "github.com/apache/pulsar-client-go/integration-tests/pb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testJSON struct { @@ -55,6 +58,67 @@ func createClient() Client { return client } +func TestBytesSchema(t *testing.T) { + client := createClient() + defer client.Close() + + topic := newTopicName() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + producerSchemaBytes := NewBytesSchema(properties) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: producerSchemaBytes, + }) + assert.NoError(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`{"key": "value"}`), + }) + require.NoError(t, err) + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`something else`), + }) + require.NoError(t, err) + producer.Close() + + // Create consumer + consumerSchemaBytes := NewBytesSchema(nil) + assert.NotNil(t, consumerSchemaBytes) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Schema: consumerSchemaBytes, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Receive first message + var out1 []byte + msg1, err := consumer.Receive(ctx) + assert.NoError(t, err) + err = msg1.GetSchemaValue(&out1) + assert.NoError(t, err) + assert.Equal(t, []byte(`{"key": "value"}`), out1) + consumer.Ack(msg1) + require.NoError(t, err) + + // Receive second message + var out2 []byte + msg2, err := consumer.Receive(ctx) + fmt.Println(string(msg2.Payload())) + assert.NoError(t, err) + err = msg2.GetSchemaValue(&out2) + assert.NoError(t, err) + assert.Equal(t, []byte(`something else`), out2) + + defer consumer.Close() +} + func TestJsonSchema(t *testing.T) { client := createClient() defer client.Close() diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b9441169..2368e3d846 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) { expValueOut interface{} valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks }{ + { + name: "BytesSchema", + schema: NewBytesSchema(nil), + schemaType: []byte(`any`), + producerValue: []byte(`hello pulsar`), + expValueOut: []byte(`hello pulsar`), + }, { name: "StringSchema", schema: NewStringSchema(nil), diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 732441e8c2..782ae3ae25 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -75,6 +75,9 @@ type Namespaces interface { // RemoveBacklogQuota removes a backlog quota policy from a namespace RemoveBacklogQuota(namespace string) error + // GetTopicAutoCreation returns the topic auto-creation config for a namespace + GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) + // SetTopicAutoCreation sets topic auto-creation config for a namespace, overriding broker settings SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error @@ -445,6 +448,13 @@ func (n *namespaces) RemoveBacklogQuota(namespace string) error { return n.pulsar.Client.DeleteWithQueryParams(endpoint, params) } +func (n *namespaces) GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) { + var topicAutoCreation utils.TopicAutoCreationConfig + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") + err := n.pulsar.Client.Get(endpoint, &topicAutoCreation) + return &topicAutoCreation, err +} + func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") return n.pulsar.Client.Post(endpoint, &config) diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go new file mode 100644 index 0000000000..f934a96865 --- /dev/null +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func ptr(n int) *int { + return &n +} + +func TestSetTopicAutoCreation(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + tests := []struct { + name string + namespace string + config utils.TopicAutoCreationConfig + errReason string + }{ + { + name: "Set partitioned type topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + Partitions: ptr(3), + }, + errReason: "", + }, + { + name: "Set partitioned type topic auto creation without partitions", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] " + + "cannot be null when the type is partitioned.", + }, + { + name: "Set partitioned type topic auto creation with partitions < 1", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.Partitioned, + Partitions: ptr(-1), + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] " + + "cannot be less than 1 for partition type.", + }, + { + name: "Set non-partitioned type topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "", + }, + { + name: "Set non-partitioned type topic auto creation with partitions", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + Partitions: ptr(3), + }, + errReason: "Invalid configuration for autoTopicCreationOverride. the detail is [defaultNumPartitions] is " + + "not allowed to be set when the type is non-partition.", + }, + { + name: "Disable topic auto creation", + namespace: "public/default", + config: utils.TopicAutoCreationConfig{ + Allow: false, + }, + errReason: "", + }, + { + name: "Set topic auto creation on a non-exist namespace", + namespace: "public/nonexist", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "Namespace does not exist", + }, + { + name: "Set topic auto creation on a non-exist tenant", + namespace: "non-exist/default", + config: utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }, + errReason: "Tenant does not exist", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + namespace, _ := utils.GetNamespaceName(tt.namespace) + err := admin.Namespaces().SetTopicAutoCreation(*namespace, tt.config) + if tt.errReason == "" { + assert.Equal(t, nil, err) + + err = admin.Namespaces().RemoveTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + } + if err != nil { + restError := err.(rest.Error) + assert.Equal(t, tt.errReason, restError.Reason) + } + }) + } +} + +func TestGetTopicAutoCreation(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set the topic auto creation config and get it + err = admin.Namespaces().SetTopicAutoCreation(*namespace, utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + }) + assert.Equal(t, nil, err) + topicAutoCreation, err := admin.Namespaces().GetTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + expected := utils.TopicAutoCreationConfig{ + Allow: true, + Type: utils.NonPartitioned, + } + assert.Equal(t, expected, *topicAutoCreation) + + // remove the topic auto creation config and get it + err = admin.Namespaces().RemoveTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + + topicAutoCreation, err = admin.Namespaces().GetTopicAutoCreation(*namespace) + assert.Equal(t, nil, err) + expected = utils.TopicAutoCreationConfig{ + Allow: false, + Type: "", + } + assert.Equal(t, expected, *topicAutoCreation) +} diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index c888827bf8..e6057413d2 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -75,6 +75,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetStats(utils.TopicName) (utils.TopicStats, error) + // GetStatsWithOption returns the stats for the topic + GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error) + // GetInternalStats returns the internal stats for the topic. GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error) @@ -82,6 +85,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error) + // GetPartitionedStatsWithOption returns the stats for the partitioned topic + GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error) + // Terminate the topic and prevent any more messages being published on it Terminate(utils.TopicName) (utils.MessageID, error) @@ -395,6 +401,19 @@ func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) { err := t.pulsar.Client.Get(endpoint, &stats) return stats, err } +func (t *topics) GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error) { + var stats utils.TopicStats + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats") + params := map[string]string{ + "getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog), + "subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize), + "getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog), + "excludePublishers": strconv.FormatBool(option.ExcludePublishers), + "excludeConsumers": strconv.FormatBool(option.ExcludeConsumers), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) + return stats, err +} func (t *topics) GetInternalStats(topic utils.TopicName) (utils.PersistentTopicInternalStats, error) { var stats utils.PersistentTopicInternalStats @@ -412,6 +431,21 @@ func (t *topics) GetPartitionedStats(topic utils.TopicName, perPartition bool) ( _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) return stats, err } +func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName, perPartition bool, + option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) { + var stats utils.PartitionedTopicStats + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats") + params := map[string]string{ + "perPartition": strconv.FormatBool(perPartition), + "getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog), + "subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize), + "getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog), + "excludePublishers": strconv.FormatBool(option.ExcludePublishers), + "excludeConsumers": strconv.FormatBool(option.ExcludeConsumers), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) + return stats, err +} func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "terminate") diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go new file mode 100644 index 0000000000..a9b1f00283 --- /dev/null +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/stretchr/testify/assert" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +var ( + lookupURL = "pulsar://localhost:6650" +) + +func TestCreateTopic(t *testing.T) { + checkError := func(err error) { + if err != nil { + t.Error(err) + } + } + + cfg := &config.Config{} + admin, err := New(cfg) + checkError(err) + + topic := "persistent://public/default/testCreateTopic" + + topicName, err := utils.GetTopicName(topic) + checkError(err) + + err = admin.Topics().Create(*topicName, 0) + checkError(err) + + topicLists, err := admin.Namespaces().GetTopics("public/default") + checkError(err) + + for _, t := range topicLists { + if t == topic { + return + } + } + t.Error("Couldn't find topic: " + topic) +} + +func TestPartitionState(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Send message + ctx := context.Background() + + // create consumer + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: pulsar.Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + stats, err := admin.Topics().GetPartitionedStatsWithOption(*topicName, true, utils.GetStatsOptions{ + GetPreciseBacklog: false, + SubscriptionBacklogSize: false, + GetEarliestTimeInBacklog: false, + ExcludePublishers: true, + ExcludeConsumers: true, + }) + assert.Nil(t, err) + assert.Equal(t, len(stats.Publishers), 0) + + for _, topicStats := range stats.Partitions { + assert.Equal(t, len(topicStats.Publishers), 0) + for _, subscriptionStats := range topicStats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + } + + for _, subscriptionStats := range stats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + +} +func TestNonPartitionState(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create non-partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 0) + assert.NoError(t, err) + + // Send message + ctx := context.Background() + + // create consumer + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: pulsar.Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + stats, err := admin.Topics().GetStatsWithOption(*topicName, utils.GetStatsOptions{ + GetPreciseBacklog: false, + SubscriptionBacklogSize: false, + GetEarliestTimeInBacklog: false, + ExcludePublishers: true, + ExcludeConsumers: true, + }) + assert.Nil(t, err) + assert.Equal(t, len(stats.Publishers), 0) + for _, subscriptionStats := range stats.Subscriptions { + assert.Equal(t, len(subscriptionStats.Consumers), 0) + } + +} + +func newTopicName() string { + return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond()) +} diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go index 55888aab2b..cc797d1892 100644 --- a/pulsaradmin/pkg/utils/data.go +++ b/pulsaradmin/pkg/utils/data.go @@ -465,3 +465,11 @@ type CompactedLedger struct { Offloaded bool `json:"offloaded"` UnderReplicated bool `json:"underReplicated"` } + +type GetStatsOptions struct { + GetPreciseBacklog bool `json:"get_precise_backlog"` + SubscriptionBacklogSize bool `json:"subscription_backlog_size"` + GetEarliestTimeInBacklog bool `json:"get_earliest_time_in_backlog"` + ExcludePublishers bool `json:"exclude_publishers"` + ExcludeConsumers bool `json:"exclude_consumers"` +} diff --git a/pulsaradmin/pkg/utils/function_state.go b/pulsaradmin/pkg/utils/function_state.go index 63fa15057c..c8a8c4b8cb 100644 --- a/pulsaradmin/pkg/utils/function_state.go +++ b/pulsaradmin/pkg/utils/function_state.go @@ -19,8 +19,8 @@ package utils type FunctionState struct { Key string `json:"key"` - StringValue string `json:"stringValue"` - ByteValue []byte `json:"byteValue"` - NumValue int64 `json:"numberValue"` - Version int64 `json:"version"` + StringValue string `json:"stringValue,omitempty"` + ByteValue []byte `json:"byteValue,omitempty"` + NumValue int64 `json:"numberValue,omitempty"` + Version int64 `json:"version,omitempty"` } diff --git a/pulsaradmin/pkg/utils/topic_auto_creation_config.go b/pulsaradmin/pkg/utils/topic_auto_creation_config.go index 6664655974..8444514250 100644 --- a/pulsaradmin/pkg/utils/topic_auto_creation_config.go +++ b/pulsaradmin/pkg/utils/topic_auto_creation_config.go @@ -20,5 +20,5 @@ package utils type TopicAutoCreationConfig struct { Allow bool `json:"allowAutoTopicCreation"` Type TopicType `json:"topicType"` - Partitions int `json:"defaultNumPartitions"` + Partitions *int `json:"defaultNumPartitions"` } diff --git a/stable.txt b/stable.txt index 7e38472ca7..8a5f2e8dfe 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.11.1 +v0.12.0 \ No newline at end of file