Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] normalize and export the errors #1143

Merged
merged 12 commits into from
Dec 29, 2023
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
}

func TestMultiConsumerMemoryLimit(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)

// Result used to represent pulsar processing is an alias of type int.
Expand Down Expand Up @@ -245,3 +246,10 @@ func getErrorFromServerError(serverError *proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}

// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}
36 changes: 36 additions & 0 deletions pulsar/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_joinErrors(t *testing.T) {
err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")
err := joinErrors(ErrInvalidMessage, err1, err2)
assert.True(t, errors.Is(err, ErrInvalidMessage))
assert.True(t, errors.Is(err, err1))
assert.True(t, errors.Is(err, err2))
assert.False(t, errors.Is(err, err3))
}
84 changes: 46 additions & 38 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ const (
)

var (
errFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
errSendTimeout = newError(TimeoutError, "message send timeout")
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
errMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
errProducerClosed = newError(ProducerClosed, "producer already been closed")
errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = errors.New("transaction error")
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
ErrProducerFenced = newError(ProducerFenced, "producer fenced")

buffersPool sync.Pool
sendRequestPool *sync.Pool
Expand Down Expand Up @@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(newError(TopicNotFound, err.Error()))
p.doClose(joinErrors(ErrTopicNotfound, err))
break
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(newError(TopicTerminated, err.Error()))
p.doClose(joinErrors(ErrTopicTerminated, err))
break
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(newError(ProducerFenced, err.Error()))
p.doClose(joinErrors(ErrProducerFenced, err))
break
}

Expand Down Expand Up @@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, errFailAddToBatch)
sr.done(nil, ErrFailAddToBatch)
return
}
}
Expand Down Expand Up @@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}

if errors.Is(err, internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err)
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", err)
}

return
Expand Down Expand Up @@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() {

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, errSendTimeout)
sr.done(nil, ErrSendTimeout)
}

// flag the sending has completed with error, flush make no effect
pi.done(errSendTimeout)
pi.done(ErrSendTimeout)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}

Expand Down Expand Up @@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return newError(InvalidMessage, "Message is nil")
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
}

if msg.Value != nil && msg.Payload != nil {
return newError(InvalidMessage, "Can not set Value and Payload both")
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return fmt.Errorf("msg schema can not match with producer schema")
return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
}
}

Expand All @@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send message" +
" by a non-open transaction.")
return newError(InvalidStatus, "Failed to send message by a non-open transaction.")
return joinErrors(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

if err := txn.registerSendOrAckOp(); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

sr.transaction = txn
Expand All @@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return fmt.Errorf("get schema version fail, err: %w", err)
return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
Expand All @@ -1097,15 +1105,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, "set schema value without setting schema")
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}

// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, err.Error())
return joinErrors(ErrSchema, err)
}

sr.uncompressedPayload = schemaPayload
Expand Down Expand Up @@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {

// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
p.log.WithError(errMessageTooLarge).
p.log.WithError(ErrMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)
return errMessageTooLarge
return ErrMessageTooLarge
}

if sr.sendAsBatch || !p.options.EnableChunking {
Expand All @@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
p.log.WithError(ErrMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
return errMetaTooLarge
return ErrMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
Expand Down Expand Up @@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync(
}

if p.getProducerState() != producerReady {
sr.done(nil, errProducerClosed)
sr.done(nil, ErrProducerClosed)
return
}

Expand Down Expand Up @@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)

p.doClose(errProducerClosed)
p.doClose(ErrProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
Expand Down Expand Up @@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
WithField("properties", sr.msg.Properties)
}

if errors.Is(err, errSendTimeout) {
if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}

if errors.Is(err, errMessageTooLarge) {
if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}

Expand Down Expand Up @@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
for i := 0; i < sr.totalChunks; i++ {
if p.blockIfQueueFull() {
if !p.publishSemaphore.Acquire(sr.ctx) {
return errContextExpired
return ErrContextExpired
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
p.metrics.MessagesPending.Inc()
} else {
if !p.publishSemaphore.TryAcquire() {
return errSendQueueIsFull
return ErrSendQueueIsFull
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error {

if p.blockIfQueueFull() {
if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
return errContextExpired
return ErrContextExpired
}
} else {
if !p.client.memLimit.TryReserveMemory(requiredMem) {
return errMemoryBufferIsFull
return ErrMemoryBufferIsFull
}
}

Expand Down
Loading