Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] normalize and export the errors #1143

Merged
merged 12 commits into from
Dec 29, 2023
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
}

func TestMultiConsumerMemoryLimit(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)

// Result used to represent pulsar processing is an alias of type int.
Expand Down Expand Up @@ -245,3 +246,10 @@ func getErrorFromServerError(serverError *proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}

// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}
19 changes: 19 additions & 0 deletions pulsar/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pulsar

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_joinErrors(t *testing.T) {
err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")
err := joinErrors(ErrInvalidMessage, err1, err2)
assert.True(t, errors.Is(err, ErrInvalidMessage))
assert.True(t, errors.Is(err, err1))
assert.True(t, errors.Is(err, err2))
assert.False(t, errors.Is(err, err3))
}
84 changes: 46 additions & 38 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ const (
)

var (
errFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
errSendTimeout = newError(TimeoutError, "message send timeout")
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
errMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
errProducerClosed = newError(ProducerClosed, "producer already been closed")
errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = newError(TransactionNoFoundError, "transaction error")
nodece marked this conversation as resolved.
Show resolved Hide resolved
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlocked = newError(ProducerBlockedQuotaExceededException, "producer blocked")
nodece marked this conversation as resolved.
Show resolved Hide resolved
ErrProducerFenced = newError(ProducerFenced, "producer fenced")

buffersPool sync.Pool
sendRequestPool *sync.Pool
Expand Down Expand Up @@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(newError(TopicNotFound, err.Error()))
p.doClose(joinErrors(ErrTopicNotfound, err))
break
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(newError(TopicTerminated, err.Error()))
p.doClose(joinErrors(ErrTopicTerminated, err))
break
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
p.failPendingMessages(joinErrors(ErrProducerBlocked, err))
nodece marked this conversation as resolved.
Show resolved Hide resolved
break
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(newError(ProducerFenced, err.Error()))
p.doClose(joinErrors(ErrProducerFenced, err))
break
}

Expand Down Expand Up @@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, errFailAddToBatch)
sr.done(nil, ErrFailAddToBatch)
return
}
}
Expand Down Expand Up @@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}

if errors.Is(err, internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err)
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", err)
}

return
Expand Down Expand Up @@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() {

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, errSendTimeout)
sr.done(nil, ErrSendTimeout)
}

// flag the sending has completed with error, flush make no effect
pi.done(errSendTimeout)
pi.done(ErrSendTimeout)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}

Expand Down Expand Up @@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return newError(InvalidMessage, "Message is nil")
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
}

if msg.Value != nil && msg.Payload != nil {
return newError(InvalidMessage, "Can not set Value and Payload both")
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return fmt.Errorf("msg schema can not match with producer schema")
return joinErrors(ErrInvalidMessage, fmt.Errorf("msg schema can not match with producer schema"))
nodece marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send message" +
" by a non-open transaction.")
return newError(InvalidStatus, "Failed to send message by a non-open transaction.")
return joinErrors(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

if err := txn.registerSendOrAckOp(); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

sr.transaction = txn
Expand All @@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return fmt.Errorf("get schema version fail, err: %w", err)
return joinErrors(ErrSchema, err)
nodece marked this conversation as resolved.
Show resolved Hide resolved
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
Expand All @@ -1097,15 +1105,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, "set schema value without setting schema")
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}

// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, err.Error())
return joinErrors(ErrSchema, err)
}

sr.uncompressedPayload = schemaPayload
Expand Down Expand Up @@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {

// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
p.log.WithError(errMessageTooLarge).
p.log.WithError(ErrMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)
return errMessageTooLarge
return ErrMessageTooLarge
}

if sr.sendAsBatch || !p.options.EnableChunking {
Expand All @@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
p.log.WithError(ErrMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
return errMetaTooLarge
return ErrMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
Expand Down Expand Up @@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync(
}

if p.getProducerState() != producerReady {
sr.done(nil, errProducerClosed)
sr.done(nil, ErrProducerClosed)
return
}

Expand Down Expand Up @@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)

p.doClose(errProducerClosed)
p.doClose(ErrProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
Expand Down Expand Up @@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
WithField("properties", sr.msg.Properties)
}

if errors.Is(err, errSendTimeout) {
if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}

if errors.Is(err, errMessageTooLarge) {
if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}

Expand Down Expand Up @@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
for i := 0; i < sr.totalChunks; i++ {
if p.blockIfQueueFull() {
if !p.publishSemaphore.Acquire(sr.ctx) {
return errContextExpired
return ErrContextExpired
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
p.metrics.MessagesPending.Inc()
} else {
if !p.publishSemaphore.TryAcquire() {
return errSendQueueIsFull
return ErrSendQueueIsFull
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error {

if p.blockIfQueueFull() {
if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
return errContextExpired
return ErrContextExpired
}
} else {
if !p.client.memLimit.TryReserveMemory(requiredMem) {
return errMemoryBufferIsFull
return ErrMemoryBufferIsFull
}
}

Expand Down
14 changes: 7 additions & 7 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,8 +1061,8 @@ func TestMaxMessageSize(t *testing.T) {
assert.NoError(t, err)
defer client.Close()

// Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge
// being masked by an earlier errFailAddToBatch
// Need to set BatchingMaxSize > serverMaxMessageSize to avoid ErrMessageTooLarge
// being masked by an earlier ErrFailAddToBatch
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
BatchingMaxSize: uint(2 * serverMaxMessageSize),
Expand All @@ -1088,7 +1088,7 @@ func TestMaxMessageSize(t *testing.T) {
// So when bias <= 0, the uncompressed payload will not exceed maxMessageSize,
// but encryptedPayloadSize exceeds maxMessageSize, Send() will return an internal error.
// When bias = 1, the first check of maxMessageSize (for uncompressed payload) is valid,
// Send() will return errMessageTooLarge
// Send() will return ErrMessageTooLarge
for bias := -1; bias <= 1; bias++ {
payload := make([]byte, serverMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{
Expand All @@ -1098,7 +1098,7 @@ func TestMaxMessageSize(t *testing.T) {
assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize))
assert.Nil(t, ID)
} else {
assert.Equal(t, errMessageTooLarge, err)
assert.True(t, errors.Is(err, ErrMessageTooLarge))
}
}

Expand All @@ -1111,7 +1111,7 @@ func TestMaxMessageSize(t *testing.T) {
assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize))
assert.Nil(t, ID)
} else {
assert.Equal(t, errMessageTooLarge, err)
assert.True(t, errors.Is(err, ErrMessageTooLarge))
}
}
}
Expand Down Expand Up @@ -1191,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
})
if err != nil {
e := err.(*Error)
if e.result == TopicTerminated || err == errProducerClosed {
if e.result == TopicTerminated || errors.Is(err, ErrProducerClosed) {
terminatedChan <- true
} else {
terminatedChan <- false
Expand Down Expand Up @@ -2348,7 +2348,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
Payload: make([]byte, 1024),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
assert.Equal(t, errProducerClosed, e)
assert.True(t, errors.Is(e, ErrProducerClosed))
}
})
}
Expand Down