From 310fb94d29624e380b88968f3c2d030f12a1c4af Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 26 Oct 2024 09:34:35 +0800 Subject: [PATCH] Bump the minimum Go version to 1.22 (#1300) --- .github/workflows/ci.yml | 8 +- Dockerfile | 2 +- Makefile | 2 +- go.mod | 2 +- oauth2/auth_suite_test.go | 10 +- oauth2/authorization_tokenretriever_test.go | 170 +++++++++--------- oauth2/client_credentials_flow_test.go | 55 +++--- oauth2/clock/testing/fake_clock.go | 8 +- oauth2/config_tokenprovider_test.go | 28 +-- oauth2/device_code_flow_test.go | 70 ++++---- oauth2/oidc_endpoint_provider_test.go | 42 ++--- perf/perf-consumer.go | 2 +- perf/perf-producer.go | 4 +- perf/pulsar-perf-go.go | 2 +- pulsar/ack_grouping_tracker.go | 2 +- pulsar/ack_grouping_tracker_test.go | 4 +- pulsar/auth/athenz_test.go | 8 +- pulsar/auth/disabled.go | 4 +- pulsar/auth/oauth2_test.go | 6 +- pulsar/auth/token.go | 3 +- pulsar/client_impl_test.go | 6 +- pulsar/consumer_impl.go | 3 +- pulsar/consumer_multitopic.go | 4 +- pulsar/consumer_partition.go | 31 ++-- pulsar/consumer_test.go | 70 ++++---- pulsar/crypto/default_message_crypto.go | 2 +- pulsar/dlq_router.go | 7 +- pulsar/helper_for_test.go | 4 +- pulsar/internal/channel_cond_test.go | 4 +- pulsar/internal/compression/noop.go | 2 +- pulsar/internal/compression/zstd_cgo.go | 2 +- pulsar/internal/compression/zstd_go.go | 2 +- pulsar/internal/crypto/consumer_decryptor.go | 2 +- pulsar/internal/crypto/noop_decryptor.go | 2 +- pulsar/internal/crypto/noop_encryptor.go | 2 +- pulsar/internal/http_client.go | 4 +- pulsar/internal/lookup_service.go | 2 +- pulsar/internal/lookup_service_test.go | 36 ++-- .../pulsartracing/consumer_interceptor.go | 4 +- .../consumer_interceptor_test.go | 26 +-- .../pulsartracing/message_carrier_adaptors.go | 8 +- .../message_carrier_util_test.go | 2 +- .../pulsartracing/producer_interceptor.go | 6 +- .../producer_interceptor_test.go | 2 +- pulsar/log/log.go | 44 ++--- pulsar/negative_acks_tracker.go | 10 +- pulsar/negative_acks_tracker_test.go | 64 +++---- pulsar/producer_partition.go | 95 +++++----- pulsar/producer_test.go | 56 +++--- pulsar/reader_test.go | 4 +- pulsar/retry_router.go | 9 +- pulsar/table_view_test.go | 6 +- pulsar/transaction_impl.go | 4 +- pulsaradmin/pkg/admin/admin_test.go | 2 +- pulsaradmin/pkg/admin/auth/oauth2.go | 4 +- pulsaradmin/pkg/admin/auth/oauth2_test.go | 6 +- pulsaradmin/pkg/admin/namespace.go | 6 +- pulsaradmin/pkg/admin/subscription_test.go | 2 +- pulsaradmin/pkg/utils/data.go | 2 +- pulsaradmin/pkg/utils/schema_util.go | 6 +- 60 files changed, 488 insertions(+), 497 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c0b77c6097..52781048ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.21', '1.22' ] + go-version: [ '1.22', '1.23' ] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v5 @@ -36,19 +36,19 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v5 with: - go-version: '1.20' + go-version: '1.22' - name: Check license header run: docker run --rm -v $(pwd):/github/workspace ghcr.io/korandoru/hawkeye-native:v3 check - name: Run golangci-lint uses: golangci/golangci-lint-action@v6 with: - version: v1.51.2 + version: v1.61.0 integration-tests: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.21', '1.22' ] + go-version: [ '1.22', '1.23' ] steps: - uses: actions/checkout@v3 - name: clean docker cache diff --git a/Dockerfile b/Dockerfile index fecfa98f8a..8895c07619 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ # set via the Makefile or CLI ARG PULSAR_IMAGE=apachepulsar/pulsar:latest -ARG GO_VERSION=1.20 +ARG GO_VERSION=1.22 FROM golang:$GO_VERSION as golang FROM $PULSAR_IMAGE diff --git a/Makefile b/Makefile index 23844c41c9..5c38303eaf 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ IMAGE_NAME = pulsar-client-go-test:latest PULSAR_VERSION ?= 3.2.2 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) -GO_VERSION ?= 1.21 +GO_VERSION ?= 1.22 CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/) # Golang standard bin directory. diff --git a/go.mod b/go.mod index 1a7025fd1d..c2f3457ac0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/apache/pulsar-client-go -go 1.21 +go 1.22 require ( github.com/99designs/keyring v1.2.1 diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go index 95accff2fa..54b24299a9 100644 --- a/oauth2/auth_suite_test.go +++ b/oauth2/auth_suite_test.go @@ -21,13 +21,13 @@ import ( "context" "testing" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) func TestAuth(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "cloud-cli Auth Suite") + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "cloud-cli Auth Suite") } type MockTokenExchanger struct { @@ -52,7 +52,7 @@ func (te *MockTokenExchanger) ExchangeClientCredentials(req ClientCredentialsExc return te.ReturnsTokens, te.ReturnsError } -func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context, +func (te *MockTokenExchanger) ExchangeDeviceCode(_ context.Context, req DeviceCodeExchangeRequest) (*TokenResult, error) { te.CalledWithRequest = &req return te.ReturnsTokens, te.ReturnsError diff --git a/oauth2/authorization_tokenretriever_test.go b/oauth2/authorization_tokenretriever_test.go index 6ae55d8fbd..d57f9dc336 100644 --- a/oauth2/authorization_tokenretriever_test.go +++ b/oauth2/authorization_tokenretriever_test.go @@ -26,8 +26,8 @@ import ( "strings" "time" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) type MockTransport struct { @@ -37,7 +37,7 @@ type MockTransport struct { var _ HTTPAuthTransport = &MockTransport{} -func (t *MockTransport) Do(req *http.Request) (*http.Response, error) { +func (t *MockTransport) Do(_ *http.Request) (*http.Response, error) { if len(t.Responses) > 0 { r := t.Responses[0] t.Responses = t.Responses[1:] @@ -46,9 +46,9 @@ func (t *MockTransport) Do(req *http.Request) (*http.Response, error) { return nil, t.ReturnError } -var _ = Describe("CodetokenExchanger", func() { - Describe("newExchangeCodeRequest", func() { - It("creates the request", func() { +var _ = ginkgo.Describe("CodetokenExchanger", func() { + ginkgo.Describe("newExchangeCodeRequest", func() { + ginkgo.It("creates the request", func() { tokenRetriever := TokenRetriever{} exchangeRequest := AuthorizationCodeExchangeRequest{ TokenEndpoint: "https://issuer/oauth/token", @@ -62,32 +62,32 @@ var _ = Describe("CodetokenExchanger", func() { result.ParseForm() - Expect(err).To(BeNil()) - Expect(result.FormValue("grant_type")).To(Equal("authorization_code")) - Expect(result.FormValue("client_id")).To(Equal("clientID")) - Expect(result.FormValue("code_verifier")).To(Equal("Verifier")) - Expect(result.FormValue("code")).To(Equal("code")) - Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect")) - Expect(result.URL.String()).To(Equal("https://issuer/oauth/token")) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("authorization_code")) + gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID")) + gomega.Expect(result.FormValue("code_verifier")).To(gomega.Equal("Verifier")) + gomega.Expect(result.FormValue("code")).To(gomega.Equal("code")) + gomega.Expect(result.FormValue("redirect_uri")).To(gomega.Equal("https://redirect")) + gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token")) - Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded")) - Expect(result.Header.Get("Content-Length")).To(Equal("117")) + gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded")) + gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("117")) }) - It("returns an error when NewRequest returns an error", func() { + ginkgo.It("returns an error when NewRequest returns an error", func() { tokenRetriever := TokenRetriever{} result, err := tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{ TokenEndpoint: "://issuer/oauth/token", }) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) }) }) - Describe("handleAuthTokensResponse", func() { - It("handles the response", func() { + ginkgo.Describe("handleAuthTokensResponse", func() { + ginkgo.It("handles the response", func() { tokenRetriever := TokenRetriever{} response := buildResponse(200, AuthorizationTokenResponse{ ExpiresIn: 1, @@ -97,49 +97,49 @@ var _ = Describe("CodetokenExchanger", func() { result, err := tokenRetriever.handleAuthTokensResponse(response) - Expect(err).To(BeNil()) - Expect(result).To(Equal(&TokenResult{ + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(result).To(gomega.Equal(&TokenResult{ ExpiresIn: 1, AccessToken: "myAccessToken", RefreshToken: "myRefreshToken", })) }) - It("returns error when status code is not successful", func() { + ginkgo.It("returns error when status code is not successful", func() { tokenRetriever := TokenRetriever{} response := buildResponse(500, nil) result, err := tokenRetriever.handleAuthTokensResponse(response) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Not(BeNil())) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Not(gomega.BeNil())) }) - It("returns typed error when response body contains error information", func() { + ginkgo.It("returns typed error when response body contains error information", func() { errorBody := TokenErrorResponse{Error: "test", ErrorDescription: "test description"} tokenRetriever := TokenRetriever{} response := buildResponse(400, errorBody) result, err := tokenRetriever.handleAuthTokensResponse(response) - Expect(result).To(BeNil()) - Expect(err).To(Equal(&TokenError{ErrorCode: "test", ErrorDescription: "test description"})) - Expect(err.Error()).To(Equal("test description (test)")) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err).To(gomega.Equal(&TokenError{ErrorCode: "test", ErrorDescription: "test description"})) + gomega.Expect(err.Error()).To(gomega.Equal("test description (test)")) }) - It("returns error when deserialization fails", func() { + ginkgo.It("returns error when deserialization fails", func() { tokenRetriever := TokenRetriever{} response := buildResponse(200, "") result, err := tokenRetriever.handleAuthTokensResponse(response) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Equal( + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal( "json: cannot unmarshal string into Go value of type oauth2.AuthorizationTokenResponse")) }) }) - Describe("newRefreshTokenRequest", func() { - It("creates the request", func() { + ginkgo.Describe("newRefreshTokenRequest", func() { + ginkgo.It("creates the request", func() { tokenRetriever := TokenRetriever{} exchangeRequest := RefreshTokenExchangeRequest{ TokenEndpoint: "https://issuer/oauth/token", @@ -151,30 +151,30 @@ var _ = Describe("CodetokenExchanger", func() { result.ParseForm() - Expect(err).To(BeNil()) - Expect(result.FormValue("grant_type")).To(Equal("refresh_token")) - Expect(result.FormValue("client_id")).To(Equal("clientID")) - Expect(result.FormValue("refresh_token")).To(Equal("refreshToken")) - Expect(result.URL.String()).To(Equal("https://issuer/oauth/token")) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("refresh_token")) + gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID")) + gomega.Expect(result.FormValue("refresh_token")).To(gomega.Equal("refreshToken")) + gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token")) - Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded")) - Expect(result.Header.Get("Content-Length")).To(Equal("70")) + gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded")) + gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("70")) }) - It("returns an error when NewRequest returns an error", func() { + ginkgo.It("returns an error when NewRequest returns an error", func() { tokenRetriever := TokenRetriever{} result, err := tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{ TokenEndpoint: "://issuer/oauth/token", }) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) }) }) - Describe("newClientCredentialsRequest", func() { - It("creates the request", func() { + ginkgo.Describe("newClientCredentialsRequest", func() { + ginkgo.It("creates the request", func() { tokenRetriever := TokenRetriever{} exchangeRequest := ClientCredentialsExchangeRequest{ TokenEndpoint: "https://issuer/oauth/token", @@ -187,31 +187,31 @@ var _ = Describe("CodetokenExchanger", func() { result.ParseForm() - Expect(err).To(BeNil()) - Expect(result.FormValue("grant_type")).To(Equal("client_credentials")) - Expect(result.FormValue("client_id")).To(Equal("clientID")) - Expect(result.FormValue("client_secret")).To(Equal("clientSecret")) - Expect(result.FormValue("audience")).To(Equal("audience")) - Expect(result.URL.String()).To(Equal("https://issuer/oauth/token")) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("client_credentials")) + gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID")) + gomega.Expect(result.FormValue("client_secret")).To(gomega.Equal("clientSecret")) + gomega.Expect(result.FormValue("audience")).To(gomega.Equal("audience")) + gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token")) - Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded")) - Expect(result.Header.Get("Content-Length")).To(Equal("93")) + gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded")) + gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("93")) }) - It("returns an error when NewRequest returns an error", func() { + ginkgo.It("returns an error when NewRequest returns an error", func() { tokenRetriever := TokenRetriever{} result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{ TokenEndpoint: "://issuer/oauth/token", }) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) }) }) - Describe("newDeviceCodeExchangeRequest", func() { - It("creates the request", func() { + ginkgo.Describe("newDeviceCodeExchangeRequest", func() { + ginkgo.It("creates the request", func() { tokenRetriever := TokenRetriever{} exchangeRequest := DeviceCodeExchangeRequest{ TokenEndpoint: "https://issuer/oauth/token", @@ -224,35 +224,35 @@ var _ = Describe("CodetokenExchanger", func() { result.ParseForm() - Expect(err).To(BeNil()) - Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code")) - Expect(result.FormValue("client_id")).To(Equal("clientID")) - Expect(result.FormValue("device_code")).To(Equal("deviceCode")) - Expect(result.URL.String()).To(Equal("https://issuer/oauth/token")) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("urn:ietf:params:oauth:grant-type:device_code")) + gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID")) + gomega.Expect(result.FormValue("device_code")).To(gomega.Equal("deviceCode")) + gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token")) - Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded")) - Expect(result.Header.Get("Content-Length")).To(Equal("107")) + gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded")) + gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("107")) }) - It("returns an error when NewRequest returns an error", func() { + ginkgo.It("returns an error when NewRequest returns an error", func() { tokenRetriever := TokenRetriever{} result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{ TokenEndpoint: "://issuer/oauth/token", }) - Expect(result).To(BeNil()) - Expect(err.Error()).To(Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) + gomega.Expect(result).To(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("parse \"://issuer/oauth/token\": missing protocol scheme")) }) }) - Describe("ExchangeDeviceCode", func() { + ginkgo.Describe("ExchangeDeviceCode", func() { var mockTransport *MockTransport var tokenRetriever *TokenRetriever var exchangeRequest DeviceCodeExchangeRequest var tokenResult TokenResult - BeforeEach(func() { + ginkgo.BeforeEach(func() { mockTransport = &MockTransport{} tokenRetriever = &TokenRetriever{ transport: mockTransport, @@ -270,21 +270,21 @@ var _ = Describe("CodetokenExchanger", func() { } }) - It("returns a token", func() { + ginkgo.It("returns a token", func() { }) - It("supports cancellation", func() { + ginkgo.It("supports cancellation", func() { mockTransport.Responses = []*http.Response{ buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}), } ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := tokenRetriever.ExchangeDeviceCode(ctx, exchangeRequest) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("cancelled")) + gomega.Expect(err).ToNot(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("cancelled")) }) - It("implements authorization_pending and slow_down", func() { + ginkgo.It("implements authorization_pending and slow_down", func() { startTime := time.Now() mockTransport.Responses = []*http.Response{ buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}), @@ -293,28 +293,28 @@ var _ = Describe("CodetokenExchanger", func() { buildResponse(200, &tokenResult), } token, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest) - Expect(err).To(BeNil()) - Expect(token).To(Equal(&tokenResult)) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(token).To(gomega.Equal(&tokenResult)) endTime := time.Now() - Expect(endTime.Sub(startTime)).To(BeNumerically(">", exchangeRequest.PollInterval*3)) + gomega.Expect(endTime.Sub(startTime)).To(gomega.BeNumerically(">", exchangeRequest.PollInterval*3)) }) - It("implements expired_token", func() { + ginkgo.It("implements expired_token", func() { mockTransport.Responses = []*http.Response{ buildResponse(400, &TokenErrorResponse{"expired_token", ""}), } _, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("the device code has expired")) + gomega.Expect(err).ToNot(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("the device code has expired")) }) - It("implements access_denied", func() { + ginkgo.It("implements access_denied", func() { mockTransport.Responses = []*http.Response{ buildResponse(400, &TokenErrorResponse{"access_denied", ""}), } _, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("the device was not authorized")) + gomega.Expect(err).ToNot(gomega.BeNil()) + gomega.Expect(err.Error()).To(gomega.Equal("the device was not authorized")) }) }) }) diff --git a/oauth2/client_credentials_flow_test.go b/oauth2/client_credentials_flow_test.go index 6d9db3548e..8fd0a110ac 100644 --- a/oauth2/client_credentials_flow_test.go +++ b/oauth2/client_credentials_flow_test.go @@ -23,9 +23,8 @@ import ( "github.com/apache/pulsar-client-go/oauth2/clock" "github.com/apache/pulsar-client-go/oauth2/clock/testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) type MockClientCredentialsProvider struct { @@ -50,13 +49,13 @@ var clientCredentials = KeyFile{ Scope: "test_scope", } -var _ = Describe("ClientCredentialsFlow", func() { - Describe("Authorize", func() { +var _ = ginkgo.Describe("ClientCredentialsFlow", func() { + ginkgo.Describe("Authorize", func() { var mockClock clock.Clock var mockTokenExchanger *MockTokenExchanger - BeforeEach(func() { + ginkgo.BeforeEach(func() { mockClock = testing.NewFakeClock(time.Unix(0, 0)) expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234} mockTokenExchanger = &MockTokenExchanger{ @@ -64,7 +63,7 @@ var _ = Describe("ClientCredentialsFlow", func() { } }) - It("invokes TokenExchanger with credentials", func() { + ginkgo.It("invokes TokenExchanger with credentials", func() { additionalScope := "additional_scope" provider := newClientCredentialsFlow( ClientCredentialsFlowOptions{ @@ -78,8 +77,8 @@ var _ = Describe("ClientCredentialsFlow", func() { ) _, err := provider.Authorize("test_audience") - Expect(err).ToNot(HaveOccurred()) - Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{ + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{ TokenEndpoint: oidcEndpoints.TokenEndpoint, ClientID: clientCredentials.ClientID, ClientSecret: clientCredentials.ClientSecret, @@ -88,7 +87,7 @@ var _ = Describe("ClientCredentialsFlow", func() { })) }) - It("returns TokensResult from TokenExchanger", func() { + ginkgo.It("returns TokensResult from TokenExchanger", func() { provider := newClientCredentialsFlow( ClientCredentialsFlowOptions{ KeyFile: "test_keyfile", @@ -100,12 +99,12 @@ var _ = Describe("ClientCredentialsFlow", func() { ) grant, err := provider.Authorize("test_audience") - Expect(err).ToNot(HaveOccurred()) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock) - Expect(*grant.Token).To(Equal(expected)) + gomega.Expect(*grant.Token).To(gomega.Equal(expected)) }) - It("returns an error if token exchanger errors", func() { + ginkgo.It("returns an error if token exchanger errors", func() { mockTokenExchanger.ReturnsError = errors.New("someerror") mockTokenExchanger.ReturnsTokens = nil @@ -120,19 +119,19 @@ var _ = Describe("ClientCredentialsFlow", func() { ) _, err := provider.Authorize("test_audience") - Expect(err.Error()).To(Equal("authentication failed using client credentials: " + + gomega.Expect(err.Error()).To(gomega.Equal("authentication failed using client credentials: " + "could not exchange client credentials: someerror")) }) }) }) -var _ = Describe("ClientCredentialsGrantRefresher", func() { +var _ = ginkgo.Describe("ClientCredentialsGrantRefresher", func() { - Describe("Refresh", func() { + ginkgo.Describe("Refresh", func() { var mockClock clock.Clock var mockTokenExchanger *MockTokenExchanger - BeforeEach(func() { + ginkgo.BeforeEach(func() { mockClock = testing.NewFakeClock(time.Unix(0, 0)) expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234} mockTokenExchanger = &MockTokenExchanger{ @@ -140,7 +139,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() { } }) - It("invokes TokenExchanger with credentials", func() { + ginkgo.It("invokes TokenExchanger with credentials", func() { refresher := &ClientCredentialsGrantRefresher{ clock: mockClock, exchanger: mockTokenExchanger, @@ -154,8 +153,8 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() { Scopes: []string{"profile"}, } _, err := refresher.Refresh(og) - Expect(err).ToNot(HaveOccurred()) - Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{ + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{ TokenEndpoint: oidcEndpoints.TokenEndpoint, ClientID: clientCredentials.ClientID, ClientSecret: clientCredentials.ClientSecret, @@ -164,7 +163,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() { })) }) - It("returns a valid grant", func() { + ginkgo.It("returns a valid grant", func() { refresher := &ClientCredentialsGrantRefresher{ clock: mockClock, exchanger: mockTokenExchanger, @@ -178,14 +177,14 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() { Scopes: []string{"profile"}, } ng, err := refresher.Refresh(og) - Expect(err).ToNot(HaveOccurred()) - Expect(ng.Audience).To(Equal("test_audience")) - Expect(ng.ClientID).To(Equal("")) - Expect(*ng.ClientCredentials).To(Equal(clientCredentials)) - Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint)) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(ng.Audience).To(gomega.Equal("test_audience")) + gomega.Expect(ng.ClientID).To(gomega.Equal("")) + gomega.Expect(*ng.ClientCredentials).To(gomega.Equal(clientCredentials)) + gomega.Expect(ng.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint)) expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock) - Expect(*ng.Token).To(Equal(expected)) - Expect(ng.Scopes).To(Equal([]string{"profile"})) + gomega.Expect(*ng.Token).To(gomega.Equal(expected)) + gomega.Expect(ng.Scopes).To(gomega.Equal([]string{"profile"})) }) }) }) diff --git a/oauth2/clock/testing/fake_clock.go b/oauth2/clock/testing/fake_clock.go index 6fcbf4c035..1a0bacffcf 100644 --- a/oauth2/clock/testing/fake_clock.go +++ b/oauth2/clock/testing/fake_clock.go @@ -196,24 +196,24 @@ func (i *IntervalClock) Since(ts time.Time) time.Duration { // After is unimplemented, will panic. // TODO: make interval clock use FakeClock so this can be implemented. -func (*IntervalClock) After(d time.Duration) <-chan time.Time { +func (*IntervalClock) After(_ time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement After") } // NewTimer is unimplemented, will panic. // TODO: make interval clock use FakeClock so this can be implemented. -func (*IntervalClock) NewTimer(d time.Duration) clock.Timer { +func (*IntervalClock) NewTimer(_ time.Duration) clock.Timer { panic("IntervalClock doesn't implement NewTimer") } // Tick is unimplemented, will panic. // TODO: make interval clock use FakeClock so this can be implemented. -func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { +func (*IntervalClock) Tick(_ time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement Tick") } // Sleep is unimplemented, will panic. -func (*IntervalClock) Sleep(d time.Duration) { +func (*IntervalClock) Sleep(_ time.Duration) { panic("IntervalClock doesn't implement Sleep") } diff --git a/oauth2/config_tokenprovider_test.go b/oauth2/config_tokenprovider_test.go index d949a5a5e9..bfb56bfb85 100644 --- a/oauth2/config_tokenprovider_test.go +++ b/oauth2/config_tokenprovider_test.go @@ -18,8 +18,8 @@ package oauth2 import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) type mockConfigProvider struct { @@ -42,15 +42,15 @@ func (m *mockConfigProvider) SaveTokens(identifier, accessToken, refreshToken st m.SavedRefreshToken = refreshToken } -var _ = Describe("main", func() { - Describe("configCachingProvider", func() { - It("sets up the identifier using the clientID and audience", func() { +var _ = ginkgo.Describe("main", func() { + ginkgo.Describe("configCachingProvider", func() { + ginkgo.It("sets up the identifier using the clientID and audience", func() { p := NewConfigBackedCachingProvider("iamclientid", "iamaudience", &mockConfigProvider{}) - Expect(p.identifier).To(Equal("iamclientid-iamaudience")) + gomega.Expect(p.identifier).To(gomega.Equal("iamclientid-iamaudience")) }) - It("gets tokens from the config provider", func() { + ginkgo.It("gets tokens from the config provider", func() { c := &mockConfigProvider{ ReturnAccessToken: "accessToken", ReturnRefreshToken: "refreshToken", @@ -62,15 +62,15 @@ var _ = Describe("main", func() { r, err := p.GetTokens() - Expect(err).NotTo(HaveOccurred()) - Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier)) - Expect(r).To(Equal(&TokenResult{ + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(c.GetTokensCalledIdentifier).To(gomega.Equal(p.identifier)) + gomega.Expect(r).To(gomega.Equal(&TokenResult{ AccessToken: c.ReturnAccessToken, RefreshToken: c.ReturnRefreshToken, })) }) - It("caches the tokens in the config provider", func() { + ginkgo.It("caches the tokens in the config provider", func() { c := &mockConfigProvider{} p := ConfigBackedCachingProvider{ identifier: "iamidentifier", @@ -83,9 +83,9 @@ var _ = Describe("main", func() { p.CacheTokens(toSave) - Expect(c.SavedIdentifier).To(Equal(p.identifier)) - Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken)) - Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken)) + gomega.Expect(c.SavedIdentifier).To(gomega.Equal(p.identifier)) + gomega.Expect(c.SavedAccessToken).To(gomega.Equal(toSave.AccessToken)) + gomega.Expect(c.SavedRefreshToken).To(gomega.Equal(toSave.RefreshToken)) }) }) }) diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go index a238a484d7..deb09003a7 100644 --- a/oauth2/device_code_flow_test.go +++ b/oauth2/device_code_flow_test.go @@ -25,8 +25,8 @@ import ( "github.com/apache/pulsar-client-go/oauth2/clock/testing" "golang.org/x/oauth2" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) type MockDeviceCodeProvider struct { @@ -59,9 +59,9 @@ func (c *MockDeviceCodeCallback) Callback(code *DeviceCodeResult) error { return nil } -var _ = Describe("DeviceCodeFlow", func() { +var _ = ginkgo.Describe("DeviceCodeFlow", func() { - Describe("Authorize", func() { + ginkgo.Describe("Authorize", func() { const audience = "test_clientID" var mockClock clock.Clock @@ -70,7 +70,7 @@ var _ = Describe("DeviceCodeFlow", func() { var mockCallback *MockDeviceCodeCallback var flow *DeviceCodeFlow - BeforeEach(func() { + ginkgo.BeforeEach(func() { mockClock = testing.NewFakeClock(time.Unix(0, 0)) mockCodeProvider = &MockDeviceCodeProvider{ @@ -107,21 +107,21 @@ var _ = Describe("DeviceCodeFlow", func() { ) }) - It("invokes DeviceCodeProvider", func() { + ginkgo.It("invokes DeviceCodeProvider", func() { _, _ = flow.Authorize(audience) - Expect(mockCodeProvider.Called).To(BeTrue()) - Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access")) + gomega.Expect(mockCodeProvider.Called).To(gomega.BeTrue()) + gomega.Expect(mockCodeProvider.CalledWithAdditionalScopes).To(gomega.ContainElement("offline_access")) }) - It("invokes callback with returned code", func() { + ginkgo.It("invokes callback with returned code", func() { _, _ = flow.Authorize(audience) - Expect(mockCallback.Called).To(BeTrue()) - Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult)) + gomega.Expect(mockCallback.Called).To(gomega.BeTrue()) + gomega.Expect(mockCallback.DeviceCodeResult).To(gomega.Equal(mockCodeProvider.DeviceCodeResult)) }) - It("invokes TokenExchanger with returned code", func() { + ginkgo.It("invokes TokenExchanger with returned code", func() { _, _ = flow.Authorize(audience) - Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{ + gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&DeviceCodeExchangeRequest{ TokenEndpoint: oidcEndpoints.TokenEndpoint, ClientID: "test_clientID", PollInterval: time.Duration(5) * time.Second, @@ -129,28 +129,28 @@ var _ = Describe("DeviceCodeFlow", func() { })) }) - It("returns an authorization grant", func() { + ginkgo.It("returns an authorization grant", func() { grant, _ := flow.Authorize(audience) - Expect(grant).ToNot(BeNil()) - Expect(grant.Audience).To(Equal(audience)) - Expect(grant.ClientID).To(Equal("test_clientID")) - Expect(grant.ClientCredentials).To(BeNil()) - Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint)) + gomega.Expect(grant).ToNot(gomega.BeNil()) + gomega.Expect(grant.Audience).To(gomega.Equal(audience)) + gomega.Expect(grant.ClientID).To(gomega.Equal("test_clientID")) + gomega.Expect(grant.ClientCredentials).To(gomega.BeNil()) + gomega.Expect(grant.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint)) expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock) - Expect(*grant.Token).To(Equal(expected)) + gomega.Expect(*grant.Token).To(gomega.Equal(expected)) }) }) }) -var _ = Describe("DeviceAuthorizationGrantRefresher", func() { +var _ = ginkgo.Describe("DeviceAuthorizationGrantRefresher", func() { - Describe("Refresh", func() { + ginkgo.Describe("Refresh", func() { var mockClock clock.Clock var mockTokenExchanger *MockTokenExchanger var refresher *DeviceAuthorizationGrantRefresher var grant *AuthorizationGrant - BeforeEach(func() { + ginkgo.BeforeEach(func() { mockClock = testing.NewFakeClock(time.Unix(0, 0)) mockTokenExchanger = &MockTokenExchanger{} @@ -169,62 +169,62 @@ var _ = Describe("DeviceAuthorizationGrantRefresher", func() { } }) - It("invokes the token exchanger", func() { + ginkgo.It("invokes the token exchanger", func() { mockTokenExchanger.ReturnsTokens = &TokenResult{ AccessToken: "new token", } _, _ = refresher.Refresh(grant) - Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{ + gomega.Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(gomega.Equal(RefreshTokenExchangeRequest{ TokenEndpoint: oidcEndpoints.TokenEndpoint, ClientID: grant.ClientID, RefreshToken: "grt", })) }) - It("returns the refreshed access token from the TokenExchanger", func() { + ginkgo.It("returns the refreshed access token from the TokenExchanger", func() { mockTokenExchanger.ReturnsTokens = &TokenResult{ AccessToken: "new token", } grant, _ = refresher.Refresh(grant) - Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken)) + gomega.Expect(grant.Token.AccessToken).To(gomega.Equal(mockTokenExchanger.ReturnsTokens.AccessToken)) }) - It("preserves the existing refresh token from the TokenExchanger", func() { + ginkgo.It("preserves the existing refresh token from the TokenExchanger", func() { mockTokenExchanger.ReturnsTokens = &TokenResult{ AccessToken: "new token", } grant, _ = refresher.Refresh(grant) - Expect(grant.Token.RefreshToken).To(Equal("grt")) + gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("grt")) }) - It("returns the refreshed refresh token from the TokenExchanger", func() { + ginkgo.It("returns the refreshed refresh token from the TokenExchanger", func() { mockTokenExchanger.ReturnsTokens = &TokenResult{ AccessToken: "new token", RefreshToken: "new token", } grant, _ = refresher.Refresh(grant) - Expect(grant.Token.RefreshToken).To(Equal("new token")) + gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("new token")) }) - It("returns a meaningful expiration time", func() { + ginkgo.It("returns a meaningful expiration time", func() { mockTokenExchanger.ReturnsTokens = &TokenResult{ AccessToken: "new token", ExpiresIn: 60, } grant, _ = refresher.Refresh(grant) - Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) * time.Second))) + gomega.Expect(grant.Token.Expiry).To(gomega.Equal(mockClock.Now().Add(time.Duration(60) * time.Second))) }) - It("returns an error when TokenExchanger does", func() { + ginkgo.It("returns an error when TokenExchanger does", func() { mockTokenExchanger.ReturnsError = errors.New("someerror") _, err := refresher.Refresh(grant) - Expect(err.Error()).To(Equal("could not exchange refresh token: someerror")) + gomega.Expect(err.Error()).To(gomega.Equal("could not exchange refresh token: someerror")) }) }) }) diff --git a/oauth2/oidc_endpoint_provider_test.go b/oauth2/oidc_endpoint_provider_test.go index 065f927703..307f0a9e07 100644 --- a/oauth2/oidc_endpoint_provider_test.go +++ b/oauth2/oidc_endpoint_provider_test.go @@ -22,17 +22,17 @@ import ( "net/http" "net/http/httptest" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) -var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() { - It("calls and gets the well known data from the correct endpoint for the issuer", func() { +var _ = ginkgo.Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() { + ginkgo.It("calls and gets the well known data from the correct endpoint for the issuer", func() { var req *http.Request wkEndpointsResp := OIDCWellKnownEndpoints{ AuthorizationEndpoint: "the-auth-endpoint", TokenEndpoint: "the-token-endpoint"} responseBytes, err := json.Marshal(wkEndpointsResp) - Expect(err).ToNot(HaveOccurred()) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { req = r @@ -45,31 +45,31 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() { endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL) - Expect(err).ToNot(HaveOccurred()) - Expect(*endpoints).To(Equal(wkEndpointsResp)) - Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration")) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(*endpoints).To(gomega.Equal(wkEndpointsResp)) + gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration")) }) - It("errors when url.Parse errors", func() { + ginkgo.It("errors when url.Parse errors", func() { endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://") - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal( + gomega.Expect(err).To(gomega.HaveOccurred()) + gomega.Expect(err.Error()).To(gomega.Equal( "could not parse issuer url to build well known endpoints: parse \"://\": missing protocol scheme")) - Expect(endpoints).To(BeNil()) + gomega.Expect(endpoints).To(gomega.BeNil()) }) - It("errors when the get errors", func() { + ginkgo.It("errors when the get errors", func() { endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("https://") - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal( + gomega.Expect(err).To(gomega.HaveOccurred()) + gomega.Expect(err.Error()).To(gomega.Equal( "could not get well known endpoints from url https://.well-known/openid-configuration: " + "Get \"https://.well-known/openid-configuration\": dial tcp: lookup .well-known: no such host")) - Expect(endpoints).To(BeNil()) + gomega.Expect(endpoints).To(gomega.BeNil()) }) - It("errors when the json decoder errors", func() { + ginkgo.It("errors when the json decoder errors", func() { var req *http.Request ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -83,10 +83,10 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() { endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("could not decode json body when getting well" + + gomega.Expect(err).To(gomega.HaveOccurred()) + gomega.Expect(err.Error()).To(gomega.Equal("could not decode json body when getting well" + " known endpoints: invalid character '<' looking for beginning of value")) - Expect(endpoints).To(BeNil()) - Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration")) + gomega.Expect(endpoints).To(gomega.BeNil()) + gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration")) }) }) diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go index 2172af533e..5001d8d112 100644 --- a/perf/perf-consumer.go +++ b/perf/perf-consumer.go @@ -46,7 +46,7 @@ func newConsumerCommand() *cobra.Command { Use: "consume ", Short: "Consume from topic", Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, args []string) { stop := stopCh() if FlagProfile { RunProfiling(stop) diff --git a/perf/perf-producer.go b/perf/perf-producer.go index f8e14d22dc..da0a499e4f 100644 --- a/perf/perf-producer.go +++ b/perf/perf-producer.go @@ -49,7 +49,7 @@ func newProducerCommand() *cobra.Command { Use: "produce ", Short: "Produce on a topic and measure performance", Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, args []string) { stop := stopCh() if FlagProfile { RunProfiling(stop) @@ -125,7 +125,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) { producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: payload, - }, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) { + }, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, e error) { if e != nil { log.WithError(e).Fatal("Failed to publish") } diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index 40257e83c3..701ef41245 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -88,7 +88,7 @@ func initLogger(debug bool) { func main() { rootCmd := &cobra.Command{ - PersistentPreRun: func(cmd *cobra.Command, args []string) { + PersistentPreRun: func(_ *cobra.Command, _ []string) { initLogger(flagDebug) }, Use: "pulsar-perf-go", diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go index 97d9e05a64..c4ecc00389 100644 --- a/pulsar/ack_grouping_tracker.go +++ b/pulsar/ack_grouping_tracker.go @@ -97,7 +97,7 @@ func (i *immediateAckGroupingTracker) addCumulative(id MessageID) { i.ackCumulative(id) } -func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool { +func (i *immediateAckGroupingTracker) isDuplicate(_ MessageID) bool { return false } diff --git a/pulsar/ack_grouping_tracker_test.go b/pulsar/ack_grouping_tracker_test.go index 1fe5a568ca..57adf8887a 100644 --- a/pulsar/ack_grouping_tracker_test.go +++ b/pulsar/ack_grouping_tracker_test.go @@ -190,8 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) { } func TestTimedTrackerIsDuplicate(t *testing.T) { - tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {}, - func(id []*pb.MessageIdData) {}) + tracker := newAckGroupingTracker(nil, func(_ MessageID) {}, func(_ MessageID) {}, + func(_ []*pb.MessageIdData) {}) tracker.add(&messageID{batchIdx: 0, batchSize: 3}) tracker.add(&messageID{batchIdx: 2, batchSize: 3}) diff --git a/pulsar/auth/athenz_test.go b/pulsar/auth/athenz_test.go index 89118081f9..a4d1fe5dcc 100644 --- a/pulsar/auth/athenz_test.go +++ b/pulsar/auth/athenz_test.go @@ -48,13 +48,13 @@ type MockRoleToken struct { mock.Mock } -func (m *MockTokenBuilder) SetExpiration(t time.Duration) { +func (m *MockTokenBuilder) SetExpiration(_ time.Duration) { } -func (m *MockTokenBuilder) SetHostname(h string) { +func (m *MockTokenBuilder) SetHostname(_ string) { } -func (m *MockTokenBuilder) SetIPAddress(ip string) { +func (m *MockTokenBuilder) SetIPAddress(_ string) { } -func (m *MockTokenBuilder) SetKeyService(keyService string) { +func (m *MockTokenBuilder) SetKeyService(_ string) { } func (m *MockTokenBuilder) Token() zms.Token { result := m.Called() diff --git a/pulsar/auth/disabled.go b/pulsar/auth/disabled.go index 0389a39b4f..4614cbdfb4 100644 --- a/pulsar/auth/disabled.go +++ b/pulsar/auth/disabled.go @@ -49,7 +49,7 @@ func (disabled) Close() error { return nil } -func (d disabled) RoundTrip(req *http.Request) (*http.Response, error) { +func (d disabled) RoundTrip(_ *http.Request) (*http.Response, error) { return nil, nil } @@ -57,6 +57,6 @@ func (d disabled) Transport() http.RoundTripper { return nil } -func (d disabled) WithTransport(tripper http.RoundTripper) error { +func (d disabled) WithTransport(_ http.RoundTripper) error { return nil } diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go index 86d364044d..cc3d11c15b 100644 --- a/pulsar/auth/oauth2_test.go +++ b/pulsar/auth/oauth2_test.go @@ -35,7 +35,7 @@ func mockOAuthServer() *httptest.Server { // mock the used REST path for the tests mockedHandler := http.NewServeMux() - mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, _ *http.Request) { s := fmt.Sprintf(`{ "issuer":"%s", "authorization_endpoint":"%s/authorize", @@ -44,10 +44,10 @@ func mockOAuthServer() *httptest.Server { }`, server.URL, server.URL, server.URL, server.URL) fmt.Fprintln(writer, s) }) - mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "{\n \"access_token\": \"token-content\",\n \"token_type\": \"Bearer\"\n}") }) - mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "true") }) diff --git a/pulsar/auth/token.go b/pulsar/auth/token.go index 898b6537fd..1d7fcff1d5 100644 --- a/pulsar/auth/token.go +++ b/pulsar/auth/token.go @@ -38,9 +38,8 @@ func NewAuthenticationTokenWithParams(params map[string]string) (Provider, error return NewAuthenticationToken(params["token"]), nil } else if params["file"] != "" { return NewAuthenticationTokenFromFile(params["file"]), nil - } else { - return nil, errors.New("missing configuration for token auth") } + return nil, errors.New("missing configuration for token auth") } // NewAuthenticationToken returns a token auth provider that will use the specified token to diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 5b6b8f1ff3..0fdf6be73f 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -257,7 +257,7 @@ func mockOAuthServer() *httptest.Server { // mock the used REST path for the tests mockedHandler := http.NewServeMux() - mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, _ *http.Request) { s := fmt.Sprintf(`{ "issuer":"%s", "authorization_endpoint":"%s/authorize", @@ -266,13 +266,13 @@ func mockOAuthServer() *httptest.Server { }`, server.URL, server.URL, server.URL, server.URL) fmt.Fprintln(writer, s) }) - mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "{\n"+ " \"access_token\": \"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+ "tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+ " \"token_type\": \"Bearer\"\n}") }) - mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "true") }) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index a3d3e3ff80..92376ac1ba 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -492,7 +493,7 @@ func (c *consumer) unsubscribe(force bool) error { } } if errMsg != "" { - return fmt.Errorf(errMsg) + return errors.New(errMsg) } return nil } diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 020e38a065..3030bda327 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -294,11 +294,11 @@ func (c *multiTopicConsumer) Close() { }) } -func (c *multiTopicConsumer) Seek(msgID MessageID) error { +func (c *multiTopicConsumer) Seek(_ MessageID) error { return newError(SeekFailed, "seek command not allowed for multi topic consumer") } -func (c *multiTopicConsumer) SeekByTime(time time.Time) error { +func (c *multiTopicConsumer) SeekByTime(_ time.Time) error { return newError(SeekFailed, "seek command not allowed for multi topic consumer") } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 831f763a69..0e8274e67f 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -786,18 +786,18 @@ func (pc *partitionConsumer) NackMsg(msg Message) { pc.metrics.NacksCounter.Inc() } -func (pc *partitionConsumer) Redeliver(msgIds []messageID) { +func (pc *partitionConsumer) Redeliver(msgIDs []messageID) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") return } - pc.eventsCh <- &redeliveryRequest{msgIds} + pc.eventsCh <- &redeliveryRequest{msgIDs} - iMsgIds := make([]MessageID, len(msgIds)) - for i := range iMsgIds { - iMsgIds[i] = &msgIds[i] + iMsgIDs := make([]MessageID, len(msgIDs)) + for i := range iMsgIDs { + iMsgIDs[i] = &msgIDs[i] } - pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds) + pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIDs) } func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { @@ -805,14 +805,14 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") return } - msgIds := req.msgIds - pc.log.Debug("Request redelivery after negative ack for messages", msgIds) + msgIDs := req.msgIDs + pc.log.Debug("Request redelivery after negative ack for messages", msgIDs) - msgIDDataList := make([]*pb.MessageIdData, len(msgIds)) - for i := 0; i < len(msgIds); i++ { + msgIDDataList := make([]*pb.MessageIdData, len(msgIDs)) + for i := 0; i < len(msgIDs); i++ { msgIDDataList[i] = &pb.MessageIdData{ - LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)), - EntryId: proto.Uint64(uint64(msgIds[i].entryID)), + LedgerId: proto.Uint64(uint64(msgIDs[i].ledgerID)), + EntryId: proto.Uint64(uint64(msgIDs[i].entryID)), } } @@ -1560,7 +1560,7 @@ type closeRequest struct { } type redeliveryRequest struct { - msgIds []messageID + msgIDs []messageID } type getLastMsgIDRequest struct { @@ -1906,10 +1906,9 @@ func (pc *partitionConsumer) clearReceiverQueue() *trackingMessageID { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past return pc.lastDequeuedMsg - } else { - // No message was received or dequeued by this consumer. Next message would still be the startMessageId - return pc.startMessageID.get() } + // No message was received or dequeued by this consumer. Next message would still be the startMessageId + return pc.startMessageID.get() } func getPreviousMessage(mid *trackingMessageID) *trackingMessageID { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index a4a8d99558..37cc963257 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -982,7 +982,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { wg.Add(1) producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("msg-content-%d", i))}, - func(id MessageID, producerMessage *ProducerMessage, e error) { + func(_ MessageID, _ *ProducerMessage, e error) { assert.NoError(t, e) wg.Done() }) @@ -998,7 +998,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { wg.Add(1) producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("msg-content-%d", i))}, - func(id MessageID, producerMessage *ProducerMessage, e error) { + func(_ MessageID, _ *ProducerMessage, e error) { assert.NoError(t, e) wg.Done() }) @@ -2301,7 +2301,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, - MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int { + MessageRouter: func(msg *ProducerMessage, _ TopicMetadata) int { // The message key will contain the partition id where to route i, err := strconv.Atoi(msg.Key) assert.NoError(t, err) @@ -2422,11 +2422,11 @@ func TestProducerName(t *testing.T) { type noopConsumerInterceptor struct{} -func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} +func (noopConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {} -func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} +func (noopConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {} -func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} +func (noopConsumerInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {} // copyPropertyInterceptor copy all keys in message properties map and add a suffix type copyPropertyInterceptor struct { @@ -2435,31 +2435,31 @@ type copyPropertyInterceptor struct { func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { properties := message.Properties() - copy := make(map[string]string, len(properties)) + cp := make(map[string]string, len(properties)) for k, v := range properties { - copy[k+x.suffix] = v + cp[k+x.suffix] = v } - for ck, v := range copy { + for ck, v := range cp { properties[ck] = v } } -func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} +func (copyPropertyInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {} -func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} +func (copyPropertyInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {} type metricConsumerInterceptor struct { ackn int32 nackn int32 } -func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} +func (x *metricConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {} -func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { +func (x *metricConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) { atomic.AddInt32(&x.ackn, 1) } -func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { +func (x *metricConsumerInterceptor) OnNegativeAcksSend(_ Consumer, msgIDs []MessageID) { atomic.AddInt32(&x.nackn, int32(len(msgIDs))) } @@ -2512,7 +2512,7 @@ func TestConsumerWithInterceptors(t *testing.T) { } } - var nackIds []MessageID + var nackIDs []MessageID // receive 10 messages for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) @@ -2533,13 +2533,13 @@ func TestConsumerWithInterceptors(t *testing.T) { if i%2 == 0 { consumer.Ack(msg) } else { - nackIds = append(nackIds, msg.ID()) + nackIDs = append(nackIDs, msg.ID()) } } assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn)) - for i := range nackIds { - consumer.NackID(nackIds[i]) + for i := range nackIDs { + consumer.NackID(nackIDs[i]) } // receive 5 nack messages @@ -2633,7 +2633,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { producer.SendAsync(ctx, &ProducerMessage{ Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(_ MessageID, _ *ProducerMessage, err error) { assert.Nil(t, err) }, ) @@ -2728,7 +2728,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { producer.SendAsync(ctx, &ProducerMessage{ Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(_ MessageID, _ *ProducerMessage, err error) { assert.Nil(t, err) }, ) @@ -2762,7 +2762,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Key: u.String(), OrderingKey: k, Payload: []byte(fmt.Sprintf("value-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(_ MessageID, _ *ProducerMessage, err error) { assert.Nil(t, err) }, ) @@ -3511,12 +3511,12 @@ func NewEncKeyReader(publicKeyPath, privateKeyPath string) *EncKeyReader { } // GetPublicKey read public key from the given path -func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) { +func (d *EncKeyReader) PublicKey(keyName string, _ map[string]string) (*crypto.EncryptionKeyInfo, error) { return readKey(keyName, d.publicKeyPath, d.metaMap) } // GetPrivateKey read private key from the given path -func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) { +func (d *EncKeyReader) PrivateKey(keyName string, _ map[string]string) (*crypto.EncryptionKeyInfo, error) { return readKey(keyName, d.privateKeyPath, d.metaMap) } @@ -4022,31 +4022,31 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o for i := 0; i < BatchingMaxSize; i++ { producer.SendAsync(context.Background(), &ProducerMessage{ Payload: []byte(fmt.Sprintf("msg-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(id MessageID, _ *ProducerMessage, err error) { assert.Nil(t, err) log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } assert.Nil(t, producer.FlushWithCtx(context.Background())) - msgIds := make([]MessageID, BatchingMaxSize) + msgIDs := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { message, err := consumer.Receive(context.Background()) assert.Nil(t, err) - msgIds[i] = message.ID() + msgIDs[i] = message.ID() log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(), message.ID().BatchIdx(), message.ID().BatchSize()) } // Acknowledge half of the messages if cumulative { - msgID := msgIds[BatchingMaxSize/2-1] + msgID := msgIDs[BatchingMaxSize/2-1] err := consumer.AckIDCumulative(msgID) assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) } else { for i := 0; i < BatchingMaxSize; i++ { - msgID := msgIds[i] + msgID := msgIDs[i] if i%2 == 0 { consumer.AckID(msgID) log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx()) @@ -4066,17 +4066,17 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o index = i + BatchingMaxSize/2 } assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)), message.Payload()) - assert.Equal(t, msgIds[index].BatchIdx(), message.ID().BatchIdx()) + assert.Equal(t, msgIDs[index].BatchIdx(), message.ID().BatchIdx()) // We should not acknowledge message.ID() here because message.ID() shares a different // tracker with msgIds if !cumulative { - msgID := msgIds[index] + msgID := msgIDs[index] consumer.AckID(msgID) log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx()) } } if cumulative { - msgID := msgIds[BatchingMaxSize-1] + msgID := msgIDs[BatchingMaxSize-1] err := consumer.AckIDCumulative(msgID) assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) @@ -4167,7 +4167,7 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T) { p.SendAsync( context.Background(), &ProducerMessage{Payload: []byte("hello")}, - func(id MessageID, producerMessage *ProducerMessage, err error) { + func(_ MessageID, _ *ProducerMessage, _ error) { }, ) } @@ -4333,7 +4333,7 @@ func TestConsumerMemoryLimit(t *testing.T) { p1.SendAsync( context.Background(), &ProducerMessage{Payload: createTestMessagePayload(1)}, - func(id MessageID, producerMessage *ProducerMessage, err error) { + func(_ MessageID, _ *ProducerMessage, _ error) { }, ) } @@ -4373,7 +4373,7 @@ func TestConsumerMemoryLimit(t *testing.T) { p1.SendAsync( context.Background(), &ProducerMessage{Payload: createTestMessagePayload(1)}, - func(id MessageID, producerMessage *ProducerMessage, err error) { + func(_ MessageID, _ *ProducerMessage, _ error) { }, ) } @@ -4456,7 +4456,7 @@ func TestMultiConsumerMemoryLimit(t *testing.T) { p1.SendAsync( context.Background(), &ProducerMessage{Payload: createTestMessagePayload(1)}, - func(id MessageID, producerMessage *ProducerMessage, err error) { + func(_ MessageID, _ *ProducerMessage, _ error) { }, ) } diff --git a/pulsar/crypto/default_message_crypto.go b/pulsar/crypto/default_message_crypto.go index 74e4592dd5..2239fa394c 100644 --- a/pulsar/crypto/default_message_crypto.go +++ b/pulsar/crypto/default_message_crypto.go @@ -169,7 +169,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string, // we should never reach here msg := fmt.Sprintf("%v Failed to find encrypted Data key for key %v", d.logCtx, keyName) d.logger.Errorf(msg) - return nil, fmt.Errorf(msg) + return nil, errors.New(msg) } } diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 6b13b3298b..00c7e03b34 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -126,7 +126,7 @@ func (r *dlqRouter) run() { Properties: properties, EventTime: msg.EventTime(), ReplicationClusters: msg.replicationClusters, - }, func(messageID MessageID, producerMessage *ProducerMessage, err error) { + }, func(_ MessageID, _ *ProducerMessage, err error) { if err == nil { r.log.WithField("msgID", msgID).Debug("Succeed to send message to DLQ") // The Producer ack might be coming from the connection go-routine that @@ -185,9 +185,8 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { r.log.WithError(err).Error("Failed to create DLQ producer") time.Sleep(bo.Next()) continue - } else { - r.producer = producer - return producer } + r.producer = producer + return producer } } diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go index 426855b26f..1cb3f6cb32 100644 --- a/pulsar/helper_for_test.go +++ b/pulsar/helper_for_test.go @@ -70,7 +70,7 @@ func httpDelete(requestPaths ...string) error { var errs error for _, requestPath := range requestPaths { if err := httpDo(http.MethodDelete, requestPath, nil, nil); err != nil { - errs = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath) + errs = pkgerrors.Wrapf(err, "unable to delete url: %s", requestPath) } } return errs @@ -186,5 +186,5 @@ func retryAssert(t assert.TestingT, times int, milliseconds int, update func(), type fakeAssertT struct{} -func (fa fakeAssertT) Errorf(format string, args ...interface{}) { +func (fa fakeAssertT) Errorf(_ string, _ ...interface{}) { } diff --git a/pulsar/internal/channel_cond_test.go b/pulsar/internal/channel_cond_test.go index a73d44ea26..93a0408439 100644 --- a/pulsar/internal/channel_cond_test.go +++ b/pulsar/internal/channel_cond_test.go @@ -24,7 +24,7 @@ import ( "time" ) -func TestChCond(t *testing.T) { +func TestChCond(_ *testing.T) { cond := newCond(&sync.Mutex{}) wg := sync.WaitGroup{} wg.Add(1) @@ -39,7 +39,7 @@ func TestChCond(t *testing.T) { wg.Wait() } -func TestChCondWithContext(t *testing.T) { +func TestChCondWithContext(_ *testing.T) { cond := newCond(&sync.Mutex{}) wg := sync.WaitGroup{} ctx, cancel := context.WithCancel(context.Background()) diff --git a/pulsar/internal/compression/noop.go b/pulsar/internal/compression/noop.go index 78acb52d3c..1607c85f6e 100644 --- a/pulsar/internal/compression/noop.go +++ b/pulsar/internal/compression/noop.go @@ -42,7 +42,7 @@ func (noopProvider) Compress(dst, src []byte) []byte { return dst[:len(src)] } -func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { +func (noopProvider) Decompress(dst, src []byte, _ int) ([]byte, error) { if dst == nil { dst = make([]byte, len(src)) } diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index dde54ae29e..8cad78277e 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -75,7 +75,7 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { return out } -func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { +func (z *zstdCGoProvider) Decompress(dst, src []byte, _ int) ([]byte, error) { ctx := z.ctxPool.Get().(zstd.Ctx) defer z.ctxPool.Put(ctx) return ctx.Decompress(dst, src) diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go index ae850783dd..7738910313 100644 --- a/pulsar/internal/compression/zstd_go.go +++ b/pulsar/internal/compression/zstd_go.go @@ -58,7 +58,7 @@ func (p *zstdProvider) Compress(dst, src []byte) []byte { return p.encoder.EncodeAll(src, dst) } -func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { +func (p *zstdProvider) Decompress(dst, src []byte, _ int) ([]byte, error) { return p.decoder.DecodeAll(src, dst) } diff --git a/pulsar/internal/crypto/consumer_decryptor.go b/pulsar/internal/crypto/consumer_decryptor.go index bbc1f9b198..8db8902707 100644 --- a/pulsar/internal/crypto/consumer_decryptor.go +++ b/pulsar/internal/crypto/consumer_decryptor.go @@ -42,7 +42,7 @@ func NewConsumerDecryptor(keyReader crypto.KeyReader, } func (d *consumerDecryptor) Decrypt(payload []byte, - msgID *pb.MessageIdData, + _ *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error) { // encryption keys are not present in message metadta, no need decrypt the payload if len(msgMetadata.GetEncryptionKeys()) == 0 { diff --git a/pulsar/internal/crypto/noop_decryptor.go b/pulsar/internal/crypto/noop_decryptor.go index c049c47222..166b154159 100644 --- a/pulsar/internal/crypto/noop_decryptor.go +++ b/pulsar/internal/crypto/noop_decryptor.go @@ -31,7 +31,7 @@ func NewNoopDecryptor() Decryptor { // Decrypt noop decryptor func (d *noopDecryptor) Decrypt(payload []byte, - msgID *pb.MessageIdData, + _ *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error) { if len(msgMetadata.GetEncryptionKeys()) > 0 { return payload, fmt.Errorf("incoming message payload is encrypted, consumer is not configured to decrypt") diff --git a/pulsar/internal/crypto/noop_encryptor.go b/pulsar/internal/crypto/noop_encryptor.go index 4512e7bd69..7c52755fd6 100644 --- a/pulsar/internal/crypto/noop_encryptor.go +++ b/pulsar/internal/crypto/noop_encryptor.go @@ -28,6 +28,6 @@ func NewNoopEncryptor() Encryptor { } // Encrypt Noop ecryptor -func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) { +func (e *noopEncryptor) Encrypt(data []byte, _ *pb.MessageMetadata) ([]byte, error) { return data, nil } diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index f1420aeaa4..632d5a4f3f 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -159,9 +159,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) _, err = c.GetWithQueryParams(endpoint, obj, params, true) - if _, ok := err.(*url.Error); ok { - continue - } else { + if _, ok := err.(*url.Error); !ok { // We either succeeded or encountered a non connection error break } diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 81f496b3dc..6c07e7d2c4 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -367,7 +367,7 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic return topics, nil } -func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) { +func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, err error) { return nil, errors.New("GetSchema is not supported by httpLookupService") } diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index 349d1929fe..fd08898a4d 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -57,7 +57,7 @@ func (c *mockedLookupRPCClient) NewConsumerID() uint64 { return 1 } -func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, +func (c *mockedLookupRPCClient) RequestToAnyBroker(_ uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP) @@ -77,16 +77,16 @@ func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb. }, nil } -func (c *mockedLookupRPCClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64, +func (c *mockedLookupRPCClient) RequestToHost(_ *ServiceNameResolver, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { return c.RequestToAnyBroker(requestID, cmdType, message) } -func (c *mockedLookupRPCClient) LookupService(URL string) LookupService { +func (c *mockedLookupRPCClient) LookupService(_ string) LookupService { return nil } -func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, +func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, _ uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP) expectedRequest := &c.expectedRequests[0] @@ -108,14 +108,14 @@ func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url. }, nil } -func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { +func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _ pb.BaseCommand_Type, + _ proto.Message) (*RPCResult, error) { assert.Fail(c.t, "Shouldn't be called") return nil, nil } -func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, - message proto.Message) error { +func (c *mockedLookupRPCClient) RequestOnCnxNoWait(_ Connection, _ pb.BaseCommand_Type, + _ proto.Message) error { assert.Fail(c.t, "Shouldn't be called") return nil } @@ -458,7 +458,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) NewConsumerID() uint64 { } func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { + _ proto.Message) (*RPCResult, error) { assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA) expectedRequest := &m.expectedRequests[0] @@ -477,30 +477,30 @@ func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID ui }, nil } -func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, - cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { +func (m mockedPartitionedTopicMetadataRPCClient) Request(_ *url.URL, _ *url.URL, _ uint64, + _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) { assert.Fail(m.t, "Shouldn't be called") return nil, nil } -func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, - message proto.Message) error { +func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_ Connection, _ pb.BaseCommand_Type, + _ proto.Message) error { assert.Fail(m.t, "Shouldn't be called") return nil } -func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, requestID uint64, - cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { +func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(_ Connection, _ uint64, + _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) { assert.Fail(m.t, "Shouldn't be called") return nil, nil } -func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(serviceNameResolver *ServiceNameResolver, +func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(_ *ServiceNameResolver, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { return m.RequestToAnyBroker(requestID, cmdType, message) } -func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(URL string) LookupService { +func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) LookupService { return nil } @@ -575,7 +575,7 @@ type MockHTTPClient struct { func (c *MockHTTPClient) Close() {} -func (c *MockHTTPClient) Get(endpoint string, obj interface{}, params map[string]string) error { +func (c *MockHTTPClient) Get(endpoint string, obj interface{}, _ map[string]string) error { if strings.Contains(endpoint, HTTPLookupServiceBasePathV1) || strings.Contains(endpoint, HTTPLookupServiceBasePathV2) { return mockHTTPGetLookupResult(obj) diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go b/pulsar/internal/pulsartracing/consumer_interceptor.go index 3969d45d8b..3b91e7cbb4 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor.go @@ -33,9 +33,9 @@ func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) { buildAndInjectChildSpan(message).Finish() } -func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID pulsar.MessageID) {} +func (t *ConsumerInterceptor) OnAcknowledge(_ pulsar.Consumer, _ pulsar.MessageID) {} -func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, msgIDs []pulsar.MessageID) { +func (t *ConsumerInterceptor) OnNegativeAcksSend(_ pulsar.Consumer, _ []pulsar.MessageID) { } func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span { diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index 05953cf1a6..1fa1bf0d17 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -52,7 +52,7 @@ func (c *mockConsumer) Subscription() string { return "" } -func (c *mockConsumer) AckWithTxn(msg pulsar.Message, txn pulsar.Transaction) error { +func (c *mockConsumer) AckWithTxn(_ pulsar.Message, _ pulsar.Transaction) error { return nil } @@ -63,7 +63,7 @@ func (c *mockConsumer) UnsubscribeForce() error { return nil } -func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) { +func (c *mockConsumer) Receive(_ context.Context) (message pulsar.Message, err error) { return nil, nil } @@ -71,39 +71,39 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage { return nil } -func (c *mockConsumer) Ack(msg pulsar.Message) error { +func (c *mockConsumer) Ack(_ pulsar.Message) error { return nil } -func (c *mockConsumer) AckID(msgID pulsar.MessageID) error { +func (c *mockConsumer) AckID(_ pulsar.MessageID) error { return nil } -func (c *mockConsumer) AckCumulative(msg pulsar.Message) error { +func (c *mockConsumer) AckCumulative(_ pulsar.Message) error { return nil } -func (c *mockConsumer) AckIDCumulative(msgID pulsar.MessageID) error { +func (c *mockConsumer) AckIDCumulative(_ pulsar.MessageID) error { return nil } -func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {} +func (c *mockConsumer) ReconsumeLater(_ pulsar.Message, _ time.Duration) {} -func (c *mockConsumer) ReconsumeLaterWithCustomProperties(msg pulsar.Message, customProperties map[string]string, - delay time.Duration) { +func (c *mockConsumer) ReconsumeLaterWithCustomProperties(_ pulsar.Message, _ map[string]string, + _ time.Duration) { } -func (c *mockConsumer) Nack(msg pulsar.Message) {} +func (c *mockConsumer) Nack(_ pulsar.Message) {} -func (c *mockConsumer) NackID(msgID pulsar.MessageID) {} +func (c *mockConsumer) NackID(_ pulsar.MessageID) {} func (c *mockConsumer) Close() {} -func (c *mockConsumer) Seek(msgID pulsar.MessageID) error { +func (c *mockConsumer) Seek(_ pulsar.MessageID) error { return nil } -func (c *mockConsumer) SeekByTime(time time.Time) error { +func (c *mockConsumer) SeekByTime(_ time.Time) error { return nil } diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go b/pulsar/internal/pulsartracing/message_carrier_adaptors.go index 3b6394baa7..8fcfa17ef9 100644 --- a/pulsar/internal/pulsartracing/message_carrier_adaptors.go +++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go @@ -38,14 +38,14 @@ func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val string) return nil } -func (a *ProducerMessageExtractAdapter) Set(key, val string) {} +func (a *ProducerMessageExtractAdapter) Set(_, _ string) {} // ProducerMessageInjectAdapter Implements TextMap Interface type ProducerMessageInjectAdapter struct { message *pulsar.ProducerMessage } -func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error { +func (a *ProducerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error) error { return errors.New("iterator should never be used with Tracer.inject()") } @@ -68,14 +68,14 @@ func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val string) return nil } -func (a *ConsumerMessageExtractAdapter) Set(key, val string) {} +func (a *ConsumerMessageExtractAdapter) Set(_, _ string) {} // ConsumerMessageInjectAdapter Implements TextMap Interface type ConsumerMessageInjectAdapter struct { message pulsar.ConsumerMessage } -func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error { +func (a *ConsumerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error) error { return errors.New("iterator should never be used with tracer.inject()") } diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go index df78ae2fbf..90658c1abc 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util_test.go +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -112,7 +112,7 @@ func (msg *mockConsumerMessage) GetReplicatedFrom() string { return "" } -func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error { +func (msg *mockConsumerMessage) GetSchemaValue(_ interface{}) error { return nil } diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index 6c7728cf0a..77cb8bf94d 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -33,9 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *puls buildAndInjectSpan(message, producer).Finish() } -func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, - message *pulsar.ProducerMessage, - msgID pulsar.MessageID) { +func (t *ProducerInterceptor) OnSendAcknowledgement(_ pulsar.Producer, + _ *pulsar.ProducerMessage, + _ pulsar.MessageID) { } func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 1c2c712fcf..74890f7371 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,7 +67,7 @@ func (p *mockProducer) Flush() error { return nil } -func (p *mockProducer) FlushWithCtx(ctx context.Context) error { +func (p *mockProducer) FlushWithCtx(_ context.Context) error { return nil } diff --git a/pulsar/log/log.go b/pulsar/log/log.go index 7ed523174c..d68c687d5b 100644 --- a/pulsar/log/log.go +++ b/pulsar/log/log.go @@ -24,29 +24,29 @@ func DefaultNopLogger() Logger { type nopLogger struct{} -func (l nopLogger) SubLogger(fields Fields) Logger { return l } -func (l nopLogger) WithFields(fields Fields) Entry { return nopEntry{} } -func (l nopLogger) WithField(name string, value interface{}) Entry { return nopEntry{} } -func (l nopLogger) WithError(err error) Entry { return nopEntry{} } -func (l nopLogger) Debug(args ...interface{}) {} -func (l nopLogger) Info(args ...interface{}) {} -func (l nopLogger) Warn(args ...interface{}) {} -func (l nopLogger) Error(args ...interface{}) {} -func (l nopLogger) Debugf(format string, args ...interface{}) {} -func (l nopLogger) Infof(format string, args ...interface{}) {} -func (l nopLogger) Warnf(format string, args ...interface{}) {} -func (l nopLogger) Errorf(format string, args ...interface{}) {} +func (l nopLogger) SubLogger(_ Fields) Logger { return l } +func (l nopLogger) WithFields(_ Fields) Entry { return nopEntry{} } +func (l nopLogger) WithField(_ string, _ interface{}) Entry { return nopEntry{} } +func (l nopLogger) WithError(_ error) Entry { return nopEntry{} } +func (l nopLogger) Debug(_ ...interface{}) {} +func (l nopLogger) Info(_ ...interface{}) {} +func (l nopLogger) Warn(_ ...interface{}) {} +func (l nopLogger) Error(_ ...interface{}) {} +func (l nopLogger) Debugf(_ string, _ ...interface{}) {} +func (l nopLogger) Infof(_ string, _ ...interface{}) {} +func (l nopLogger) Warnf(_ string, _ ...interface{}) {} +func (l nopLogger) Errorf(_ string, _ ...interface{}) {} type nopEntry struct{} -func (e nopEntry) WithFields(fields Fields) Entry { return nopEntry{} } -func (e nopEntry) WithField(name string, value interface{}) Entry { return nopEntry{} } +func (e nopEntry) WithFields(_ Fields) Entry { return nopEntry{} } +func (e nopEntry) WithField(_ string, _ interface{}) Entry { return nopEntry{} } -func (e nopEntry) Debug(args ...interface{}) {} -func (e nopEntry) Info(args ...interface{}) {} -func (e nopEntry) Warn(args ...interface{}) {} -func (e nopEntry) Error(args ...interface{}) {} -func (e nopEntry) Debugf(format string, args ...interface{}) {} -func (e nopEntry) Infof(format string, args ...interface{}) {} -func (e nopEntry) Warnf(format string, args ...interface{}) {} -func (e nopEntry) Errorf(format string, args ...interface{}) {} +func (e nopEntry) Debug(_ ...interface{}) {} +func (e nopEntry) Info(_ ...interface{}) {} +func (e nopEntry) Warn(_ ...interface{}) {} +func (e nopEntry) Error(_ ...interface{}) {} +func (e nopEntry) Debugf(_ string, _ ...interface{}) {} +func (e nopEntry) Infof(_ string, _ ...interface{}) {} +func (e nopEntry) Warnf(_ string, _ ...interface{}) {} +func (e nopEntry) Errorf(_ string, _ ...interface{}) {} diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 58f567675b..1331c7dfae 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -25,7 +25,7 @@ import ( ) type redeliveryConsumer interface { - Redeliver(msgIds []messageID) + Redeliver(msgIDs []messageID) } type negativeAcksTracker struct { @@ -123,7 +123,7 @@ func (t *negativeAcksTracker) track() { case <-t.tick.C: { now := time.Now() - msgIds := make([]messageID, 0) + msgIDs := make([]messageID, 0) t.Lock() @@ -131,15 +131,15 @@ func (t *negativeAcksTracker) track() { t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) if targetTime.Before(now) { t.log.Debugf("Adding MsgId: %v", msgID) - msgIds = append(msgIds, msgID) + msgIDs = append(msgIDs, msgID) delete(t.negativeAcks, msgID) } } t.Unlock() - if len(msgIds) > 0 { - t.rc.Redeliver(msgIds) + if len(msgIDs) > 0 { + t.rc.Redeliver(msgIDs) } } } diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 9493412333..3f03ac446a 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -56,22 +56,22 @@ func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) *nackMockedConsu return t } -func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) { +func (nmc *nackMockedConsumer) Redeliver(msgIDs []messageID) { nmc.lock.Lock() defer nmc.lock.Unlock() if nmc.closed { return } - for _, id := range msgIds { + for _, id := range msgIDs { nmc.ch <- id } } -func sortMessageIds(msgIds []messageID) []messageID { - sort.Slice(msgIds, func(i, j int) bool { - return msgIds[i].ledgerID < msgIds[j].entryID +func sortMessageIDs(msgIDs []messageID) []messageID { + sort.Slice(msgIDs, func(i, j int) bool { + return msgIDs[i].ledgerID < msgIDs[j].entryID }) - return msgIds + return msgIDs } func (nmc *nackMockedConsumer) Wait() <-chan messageID { @@ -94,17 +94,17 @@ func TestNacksTracker(t *testing.T) { batchIdx: 1, }) - msgIds := make([]messageID, 0) + msgIDs := make([]messageID, 0) for id := range nmc.Wait() { - msgIds = append(msgIds, id) + msgIDs = append(msgIDs, id) } - msgIds = sortMessageIds(msgIds) + msgIDs = sortMessageIDs(msgIDs) - assert.Equal(t, 2, len(msgIds)) - assert.Equal(t, int64(1), msgIds[0].ledgerID) - assert.Equal(t, int64(1), msgIds[0].entryID) - assert.Equal(t, int64(2), msgIds[1].ledgerID) - assert.Equal(t, int64(2), msgIds[1].entryID) + assert.Equal(t, 2, len(msgIDs)) + assert.Equal(t, int64(1), msgIDs[0].ledgerID) + assert.Equal(t, int64(1), msgIDs[0].entryID) + assert.Equal(t, int64(2), msgIDs[1].ledgerID) + assert.Equal(t, int64(2), msgIDs[1].entryID) nacks.Close() // allow multiple Close without panicing @@ -139,17 +139,17 @@ func TestNacksWithBatchesTracker(t *testing.T) { batchIdx: 1, }) - msgIds := make([]messageID, 0) + msgIDs := make([]messageID, 0) for id := range nmc.Wait() { - msgIds = append(msgIds, id) + msgIDs = append(msgIDs, id) } - msgIds = sortMessageIds(msgIds) + msgIDs = sortMessageIDs(msgIDs) - assert.Equal(t, 2, len(msgIds)) - assert.Equal(t, int64(1), msgIds[0].ledgerID) - assert.Equal(t, int64(1), msgIds[0].entryID) - assert.Equal(t, int64(2), msgIds[1].ledgerID) - assert.Equal(t, int64(2), msgIds[1].entryID) + assert.Equal(t, 2, len(msgIDs)) + assert.Equal(t, int64(1), msgIDs[0].ledgerID) + assert.Equal(t, int64(1), msgIDs[0].entryID) + assert.Equal(t, int64(2), msgIDs[1].ledgerID) + assert.Equal(t, int64(2), msgIDs[1].entryID) nacks.Close() } @@ -161,17 +161,17 @@ func TestNackBackoffTracker(t *testing.T) { nacks.AddMessage(new(mockMessage1)) nacks.AddMessage(new(mockMessage2)) - msgIds := make([]messageID, 0) + msgIDs := make([]messageID, 0) for id := range nmc.Wait() { - msgIds = append(msgIds, id) + msgIDs = append(msgIDs, id) } - msgIds = sortMessageIds(msgIds) + msgIDs = sortMessageIDs(msgIDs) - assert.Equal(t, 2, len(msgIds)) - assert.Equal(t, int64(1), msgIds[0].ledgerID) - assert.Equal(t, int64(1), msgIds[0].entryID) - assert.Equal(t, int64(2), msgIds[1].ledgerID) - assert.Equal(t, int64(2), msgIds[1].entryID) + assert.Equal(t, 2, len(msgIDs)) + assert.Equal(t, int64(1), msgIDs[0].ledgerID) + assert.Equal(t, int64(1), msgIDs[0].entryID) + assert.Equal(t, int64(2), msgIDs[1].ledgerID) + assert.Equal(t, int64(2), msgIDs[1].entryID) nacks.Close() // allow multiple Close without panicing @@ -230,7 +230,7 @@ func (msg *mockMessage1) GetReplicatedFrom() string { return "" } -func (msg *mockMessage1) GetSchemaValue(v interface{}) error { +func (msg *mockMessage1) GetSchemaValue(_ interface{}) error { return nil } @@ -306,7 +306,7 @@ func (msg *mockMessage2) GetReplicatedFrom() string { return "" } -func (msg *mockMessage2) GetSchemaValue(v interface{}) error { +func (msg *mockMessage2) GetSchemaValue(_ interface{}) error { return nil } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f578ee4b96..b1fc3f02cf 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1093,7 +1093,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes isDone := uAtomic.NewBool(false) doneCh := make(chan struct{}) - p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { + p.internalSendAsync(ctx, msg, func(ID MessageID, _ *ProducerMessage, e error) { if isDone.CAS(false, true) { err = e msgID = ID @@ -1375,59 +1375,58 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) return - } else { - // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback - p.pendingQueue.Poll() + } + // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback + p.pendingQueue.Poll() - now := time.Now().UnixNano() + now := time.Now().UnixNano() - // lock the pending item while sending the requests - pi.Lock() - defer pi.Unlock() - p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) - batchSize := int32(len(pi.sendRequests)) - for idx, i := range pi.sendRequests { - sr := i.(*sendRequest) - atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - batchSize, - ) - - if sr.totalChunks > 1 { - if sr.chunkID == 0 { - sr.chunkRecorder.setFirstChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - } else if sr.chunkID == sr.totalChunks-1 { - sr.chunkRecorder.setLastChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - // use chunkMsgID to set msgID - msgID = &sr.chunkRecorder.chunkedMsgID - } + // lock the pending item while sending the requests + pi.Lock() + defer pi.Unlock() + p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) + batchSize := int32(len(pi.sendRequests)) + for idx, i := range pi.sendRequests { + sr := i.(*sendRequest) + atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) + + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + batchSize, + ) + + if sr.totalChunks > 1 { + if sr.chunkID == 0 { + sr.chunkRecorder.setFirstChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + } else if sr.chunkID == sr.totalChunks-1 { + sr.chunkRecorder.setLastChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + // use chunkMsgID to set msgID + msgID = &sr.chunkRecorder.chunkedMsgID } - - sr.done(msgID, nil) } - // Mark this pending item as done - pi.done(nil) + sr.done(msgID, nil) } + + // Mark this pending item as done + pi.done(nil) } func (p *partitionProducer) internalClose(req *closeProducer) { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index afd1f09af7..b58c0608a1 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -153,7 +153,7 @@ func TestProducerAsyncSend(t *testing.T) { for i := 0; i < 10; i++ { producer.SendAsync(context.Background(), &ProducerMessage{ Payload: []byte("hello"), - }, func(id MessageID, message *ProducerMessage, e error) { + }, func(id MessageID, _ *ProducerMessage, e error) { if e != nil { log.WithError(e).Error("Failed to publish") errors.Put(e) @@ -174,7 +174,7 @@ func TestProducerAsyncSend(t *testing.T) { assert.Equal(t, 0, errors.Size()) wg.Add(1) - producer.SendAsync(context.Background(), nil, func(id MessageID, m *ProducerMessage, e error) { + producer.SendAsync(context.Background(), nil, func(id MessageID, _ *ProducerMessage, e error) { assert.NotNil(t, e) assert.Nil(t, id) wg.Done() @@ -183,7 +183,7 @@ func TestProducerAsyncSend(t *testing.T) { wg.Add(1) producer.SendAsync(context.Background(), &ProducerMessage{Payload: []byte("hello"), Value: []byte("hello")}, - func(id MessageID, m *ProducerMessage, e error) { + func(id MessageID, _ *ProducerMessage, e error) { assert.NotNil(t, e) assert.Nil(t, id) wg.Done() @@ -214,7 +214,7 @@ func TestProducerFlushDisableBatching(t *testing.T) { for i := 0; i < 10; i++ { producer.SendAsync(context.Background(), &ProducerMessage{ Payload: []byte("hello"), - }, func(id MessageID, message *ProducerMessage, e error) { + }, func(id MessageID, _ *ProducerMessage, e error) { if e != nil { log.WithError(e).Error("Failed to publish") errors.Put(e) @@ -383,7 +383,7 @@ func TestFlushInProducer(t *testing.T) { messageContent := prefix + fmt.Sprintf("%d", i) producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(messageContent), - }, func(id MessageID, producerMessage *ProducerMessage, e error) { + }, func(id MessageID, _ *ProducerMessage, e error) { if e != nil { log.WithError(e).Error("Failed to publish") errors.Put(e) @@ -424,7 +424,7 @@ func TestFlushInProducer(t *testing.T) { messageContent := prefix + fmt.Sprintf("%d", i) producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(messageContent), - }, func(id MessageID, producerMessage *ProducerMessage, e error) { + }, func(id MessageID, _ *ProducerMessage, e error) { if e != nil { log.WithError(e).Error("Failed to publish") errors.Put(e) @@ -494,7 +494,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { messageContent := prefix + fmt.Sprintf("%d", i) producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(messageContent), - }, func(id MessageID, producerMessage *ProducerMessage, e error) { + }, func(id MessageID, _ *ProducerMessage, e error) { if e != nil { log.WithError(e).Error("Failed to publish") errors.Put(e) @@ -844,7 +844,7 @@ func TestBatchMessageFlushing(t *testing.T) { msg := &ProducerMessage{ Payload: msg, } - producer.SendAsync(ctx, msg, func(id MessageID, producerMessage *ProducerMessage, err error) { + producer.SendAsync(ctx, msg, func(_ MessageID, _ *ProducerMessage, _ error) { ch <- struct{}{} }) } @@ -898,7 +898,7 @@ func TestBatchDelayMessage(t *testing.T) { } var delayMsgID int64 ch := make(chan struct{}, 2) - producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { + producer.SendAsync(ctx, delayMsg, func(id MessageID, _ *ProducerMessage, _ error) { atomic.StoreInt64(&delayMsgID, id.(*messageID).entryID) ch <- struct{}{} }) @@ -914,7 +914,7 @@ func TestBatchDelayMessage(t *testing.T) { Payload: []byte("no delay"), } var noDelayMsgID int64 - producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { + producer.SendAsync(ctx, noDelayMsg, func(id MessageID, _ *ProducerMessage, _ error) { atomic.StoreInt64(&noDelayMsgID, id.(*messageID).entryID) }) for i := 0; i < 2; i++ { @@ -1174,7 +1174,7 @@ func TestFailedSchemaEncode(t *testing.T) { // producer should send return an error as message is Int64, but schema is String producer.SendAsync(ctx, &ProducerMessage{ Value: int64(1), - }, func(messageID MessageID, producerMessage *ProducerMessage, err error) { + }, func(messageID MessageID, _ *ProducerMessage, err error) { assert.NotNil(t, err) assert.Nil(t, messageID) wg.Done() @@ -1503,9 +1503,9 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) {} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(_ Producer, _ *ProducerMessage, _ MessageID) { } // copyPropertyIntercepotr copy all keys in message properties map and add a suffix @@ -1514,11 +1514,11 @@ type metricProduceInterceptor struct { ackn int } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { +func (x *metricProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) { x.sendn++ } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x *metricProduceInterceptor) OnSendAcknowledgement(_ Producer, _ *ProducerMessage, _ MessageID) { x.ackn++ } @@ -1734,7 +1734,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { Payload: messageContent, Key: key, Schema: schema, - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(id MessageID, _ *ProducerMessage, err error) { assert.NoError(t, err) assert.NotNil(t, id) }) @@ -1827,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { Payload: messageContent, Key: key, Schema: schema, - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(id MessageID, _ *ProducerMessage, err error) { assert.NoError(t, err) assert.NotNil(t, id) }) @@ -2025,16 +2025,16 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { for i := 0; i < n/2; i++ { producer1.SendAsync(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) producer2.SendAsync(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) } // Last message in order to reach the limit producer1.SendAsync(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage()) @@ -2112,18 +2112,18 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { producer1.SendAsync(context.Background(), &ProducerMessage{ Value: value, Schema: schema, - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) producer2.SendAsync(context.Background(), &ProducerMessage{ Value: value, Schema: schema, - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) } // Last message in order to reach the limit producer1.SendAsync(context.Background(), &ProducerMessage{ Value: value, Schema: schema, - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage()) @@ -2187,7 +2187,7 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { producer2.SendAsync(context.Background(), &ProducerMessage{ Payload: make([]byte, 5*1024+1), - }, func(id MessageID, message *ProducerMessage, e error) { + }, func(_ MessageID, _ *ProducerMessage, e error) { if e != nil { t.Fatal(e) } @@ -2247,7 +2247,7 @@ func TestMemLimitContextCancel(t *testing.T) { for i := 0; i < n; i++ { producer.SendAsync(ctx, &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) {}) + }, func(_ MessageID, _ *ProducerMessage, _ error) {}) } time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage()) @@ -2257,7 +2257,7 @@ func TestMemLimitContextCancel(t *testing.T) { go func() { producer.SendAsync(ctx, &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) { + }, func(_ MessageID, _ *ProducerMessage, e error) { assert.Error(t, e) assert.ErrorContains(t, e, getResultStr(TimeoutError)) wg.Done() @@ -2369,7 +2369,7 @@ func TestFailPendingMessageWithClose(t *testing.T) { for i := 0; i < 3; i++ { testProducer.SendAsync(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), - }, func(id MessageID, message *ProducerMessage, e error) { + }, func(_ MessageID, _ *ProducerMessage, e error) { if e != nil { assert.True(t, errors.Is(e, ErrProducerClosed)) } @@ -2517,7 +2517,7 @@ func TestProducerWithMaxConnectionsPerBroker(t *testing.T) { var ok int32 testProducer.SendAsync(context.Background(), &ProducerMessage{Value: []byte("hello")}, - func(id MessageID, producerMessage *ProducerMessage, err error) { + func(_ MessageID, _ *ProducerMessage, err error) { if err == nil { atomic.StoreInt32(&ok, 1) } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 3c928c1db7..d2fa559783 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -320,7 +320,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) { producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(id MessageID, _ *ProducerMessage, err error) { assert.NoError(t, err) assert.NotNil(t, id) msgIDs[idx] = id @@ -396,7 +396,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) { producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), - }, func(id MessageID, producerMessage *ProducerMessage, err error) { + }, func(id MessageID, _ *ProducerMessage, err error) { assert.NoError(t, err) assert.NotNil(t, id) msgIDs[idx] = id diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index c8aa0b9464..a9a67adb43 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -98,8 +98,8 @@ func (r *retryRouter) run() { producer := r.getProducer() msgID := rm.consumerMsg.ID() - producer.SendAsync(context.Background(), &rm.producerMsg, func(messageID MessageID, - producerMessage *ProducerMessage, err error) { + producer.SendAsync(context.Background(), &rm.producerMsg, func(_ MessageID, + _ *ProducerMessage, err error) { if err != nil { r.log.WithError(err).WithField("msgID", msgID).Error("Failed to send message to RLQ") rm.consumerMsg.Consumer.Nack(rm.consumerMsg) @@ -150,9 +150,8 @@ func (r *retryRouter) getProducer() Producer { r.log.WithError(err).Error("Failed to create RLQ producer") time.Sleep(bo.Next()) continue - } else { - r.producer = producer - return producer } + r.producer = producer + return producer } } diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 2368e3d846..f9f8aec1d0 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -55,7 +55,7 @@ func TestTableView(t *testing.T) { t.Log(key) _, err = producer.Send(context.Background(), &ProducerMessage{ Key: key, - Value: fmt.Sprintf(valuePrefix + key), + Value: valuePrefix + key, }) assert.NoError(t, err) } @@ -329,7 +329,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) { t.Log("foreach" + key) s, ok := value.(testJSON) assert.Truef(t, ok, "expected value to be testJSON type got %T", value) - assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name) + assert.Equal(t, valuePrefix+key, s.Name) return nil }) @@ -349,7 +349,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) { Key: key, Value: testJSON{ ID: i, - Name: fmt.Sprintf(valuePrefix + key), + Name: valuePrefix + key, }, }) assert.NoError(t, err) diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go index 2eb8aca9a8..82e6fcfa77 100644 --- a/pulsar/transaction_impl.go +++ b/pulsar/transaction_impl.go @@ -84,7 +84,7 @@ func (txn *transaction) GetState() TxnState { return txn.state } -func (txn *transaction) Commit(ctx context.Context) error { +func (txn *transaction) Commit(_ context.Context) error { if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnCommitting)) || txn.state == TxnCommitting) { return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string()) @@ -110,7 +110,7 @@ func (txn *transaction) Commit(ctx context.Context) error { return err } -func (txn *transaction) Abort(ctx context.Context) error { +func (txn *transaction) Abort(_ context.Context) error { if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnAborting)) || txn.state == TxnAborting) { return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string()) diff --git a/pulsaradmin/pkg/admin/admin_test.go b/pulsaradmin/pkg/admin/admin_test.go index c4fa529567..f6cd3263f2 100644 --- a/pulsaradmin/pkg/admin/admin_test.go +++ b/pulsaradmin/pkg/admin/admin_test.go @@ -61,7 +61,7 @@ type customAuthProvider struct { var _ auth.Provider = &customAuthProvider{} -func (c *customAuthProvider) RoundTrip(req *http.Request) (*http.Response, error) { +func (c *customAuthProvider) RoundTrip(_ *http.Request) (*http.Response, error) { panic("implement me") } diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go b/pulsaradmin/pkg/admin/auth/oauth2.go index 4bd1546195..59587abf0d 100644 --- a/pulsaradmin/pkg/admin/auth/oauth2.go +++ b/pulsaradmin/pkg/admin/auth/oauth2.go @@ -121,7 +121,7 @@ func NewAuthenticationOAuth2WithParams( issuerEndpoint, clientID, audience string, - scope string, + _ string, transport http.RoundTripper) (*OAuth2Provider, error) { issuer := oauth2.Issuer{ @@ -257,6 +257,6 @@ func makeKeyring() (keyring.Keyring, error) { }) } -func keyringPrompt(prompt string) (string, error) { +func keyringPrompt(_ string) (string, error) { return "", nil } diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go b/pulsaradmin/pkg/admin/auth/oauth2_test.go index b25e576119..b19133c7c7 100644 --- a/pulsaradmin/pkg/admin/auth/oauth2_test.go +++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go @@ -37,7 +37,7 @@ func mockOAuthServer() *httptest.Server { // mock the used REST path for the tests mockedHandler := http.NewServeMux() - mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, _ *http.Request) { s := fmt.Sprintf(`{ "issuer":"%s", "authorization_endpoint":"%s/authorize", @@ -46,10 +46,10 @@ func mockOAuthServer() *httptest.Server { }`, server.URL, server.URL, server.URL, server.URL) fmt.Fprintln(writer, s) }) - mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "{\n \"access_token\": \"token-content\",\n \"token_type\": \"Bearer\"\n}") }) - mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) { + mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, _ *http.Request) { fmt.Fprintln(writer, "true") }) diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 9c3f3e3fef..e92f020cb2 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -148,7 +148,7 @@ type Namespaces interface { GetNamespaceReplicationClusters(namespace string) ([]string, error) // SetNamespaceReplicationClusters returns the replication clusters for a namespace - SetNamespaceReplicationClusters(namespace string, clusterIds []string) error + SetNamespaceReplicationClusters(namespace string, clusterIDs []string) error // SetNamespaceAntiAffinityGroup sets anti-affinity group name for a namespace SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error @@ -616,13 +616,13 @@ func (n *namespaces) GetNamespaceReplicationClusters(namespace string) ([]string return data, err } -func (n *namespaces) SetNamespaceReplicationClusters(namespace string, clusterIds []string) error { +func (n *namespaces) SetNamespaceReplicationClusters(namespace string, clusterIDs []string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "replication") - return n.pulsar.Client.Post(endpoint, &clusterIds) + return n.pulsar.Client.Post(endpoint, &clusterIDs) } func (n *namespaces) SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error { diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index 98db696194..087a4a37c1 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -67,7 +67,7 @@ func TestGetMessagesByID(t *testing.T) { for i := 0; i <= numberMessages; i++ { producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), - }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { assert.Nil(t, err) messageIDMap[id.String()]++ wg.Done() diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go index 616079127f..c9664e716f 100644 --- a/pulsaradmin/pkg/utils/data.go +++ b/pulsaradmin/pkg/utils/data.go @@ -215,7 +215,7 @@ type NamespacesData struct { MessageTTL int `json:"messageTTL"` BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` - ClusterIds string `json:"clusterIds"` + ClusterIDs string `json:"clusterIds"` RetentionTimeStr string `json:"retentionTimeStr"` LimitStr string `json:"limitStr"` LimitTime int64 `json:"limitTime"` diff --git a/pulsaradmin/pkg/utils/schema_util.go b/pulsaradmin/pkg/utils/schema_util.go index 671627fa56..b8a17d9628 100644 --- a/pulsaradmin/pkg/utils/schema_util.go +++ b/pulsaradmin/pkg/utils/schema_util.go @@ -56,11 +56,9 @@ type IsCompatibility struct { func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaResponse) *SchemaInfo { info := new(SchemaInfo) schema := make([]byte, 0, 10) - if response.Type == "KEY_VALUE" { - // TODO: impl logic - } else { + if response.Type != "KEY_VALUE" { schema = []byte(response.Data) - } + } // TODO: impl logic for KEY_VALUE info.Schema = schema info.Type = response.Type