Skip to content

Commit

Permalink
[Improve][Producer] normalize and export the errors (#1143)
Browse files Browse the repository at this point in the history
* [Improve][Producer] normalize and export the errors

* update schema error

* update go version to 1.20 to support errors.Join()

* use errors.Is() to test an error

* use github.com/hashicorp/go-multierror to join errors instead of errors.Join() of Go 1.20

* revert go version to 1.18

* revert go version to 1.18

* update ErrSchema according to the CR sugguestions

* update ErrTransaction to a normal error

* rename ErrProducerBlocked to ErrProducerBlockedQuotaExceeded

* add license header

* fix unit test error

---------

Co-authored-by: gunli <gunli@tencent.com>
  • Loading branch information
gunli and gunli authored Dec 29, 2023
1 parent 443072b commit f491e09
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 47 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
}

func TestMultiConsumerMemoryLimit(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)

// Result used to represent pulsar processing is an alias of type int.
Expand Down Expand Up @@ -245,3 +246,10 @@ func getErrorFromServerError(serverError *proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}

// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}
36 changes: 36 additions & 0 deletions pulsar/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_joinErrors(t *testing.T) {
err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")
err := joinErrors(ErrInvalidMessage, err1, err2)
assert.True(t, errors.Is(err, ErrInvalidMessage))
assert.True(t, errors.Is(err, err1))
assert.True(t, errors.Is(err, err2))
assert.False(t, errors.Is(err, err3))
}
84 changes: 46 additions & 38 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ const (
)

var (
errFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
errSendTimeout = newError(TimeoutError, "message send timeout")
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
errMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
errProducerClosed = newError(ProducerClosed, "producer already been closed")
errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = errors.New("transaction error")
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
ErrProducerFenced = newError(ProducerFenced, "producer fenced")

buffersPool sync.Pool
sendRequestPool *sync.Pool
Expand Down Expand Up @@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(newError(TopicNotFound, err.Error()))
p.doClose(joinErrors(ErrTopicNotfound, err))
break
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(newError(TopicTerminated, err.Error()))
p.doClose(joinErrors(ErrTopicTerminated, err))
break
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(newError(ProducerFenced, err.Error()))
p.doClose(joinErrors(ErrProducerFenced, err))
break
}

Expand Down Expand Up @@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, errFailAddToBatch)
sr.done(nil, ErrFailAddToBatch)
return
}
}
Expand Down Expand Up @@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}

if errors.Is(err, internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err)
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", err)
}

return
Expand Down Expand Up @@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() {

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, errSendTimeout)
sr.done(nil, ErrSendTimeout)
}

// flag the sending has completed with error, flush make no effect
pi.done(errSendTimeout)
pi.done(ErrSendTimeout)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}

Expand Down Expand Up @@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return newError(InvalidMessage, "Message is nil")
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
}

if msg.Value != nil && msg.Payload != nil {
return newError(InvalidMessage, "Can not set Value and Payload both")
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return fmt.Errorf("msg schema can not match with producer schema")
return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
}
}

Expand All @@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send message" +
" by a non-open transaction.")
return newError(InvalidStatus, "Failed to send message by a non-open transaction.")
return joinErrors(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

if err := txn.registerSendOrAckOp(); err != nil {
return err
return joinErrors(ErrTransaction, err)
}

sr.transaction = txn
Expand All @@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return fmt.Errorf("get schema version fail, err: %w", err)
return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
Expand All @@ -1097,15 +1105,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, "set schema value without setting schema")
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}

// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return newError(SchemaFailure, err.Error())
return joinErrors(ErrSchema, err)
}

sr.uncompressedPayload = schemaPayload
Expand Down Expand Up @@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {

// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
p.log.WithError(errMessageTooLarge).
p.log.WithError(ErrMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)
return errMessageTooLarge
return ErrMessageTooLarge
}

if sr.sendAsBatch || !p.options.EnableChunking {
Expand All @@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
p.log.WithError(ErrMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
return errMetaTooLarge
return ErrMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
Expand Down Expand Up @@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync(
}

if p.getProducerState() != producerReady {
sr.done(nil, errProducerClosed)
sr.done(nil, ErrProducerClosed)
return
}

Expand Down Expand Up @@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)

p.doClose(errProducerClosed)
p.doClose(ErrProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
Expand Down Expand Up @@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
WithField("properties", sr.msg.Properties)
}

if errors.Is(err, errSendTimeout) {
if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}

if errors.Is(err, errMessageTooLarge) {
if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}

Expand Down Expand Up @@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
for i := 0; i < sr.totalChunks; i++ {
if p.blockIfQueueFull() {
if !p.publishSemaphore.Acquire(sr.ctx) {
return errContextExpired
return ErrContextExpired
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
p.metrics.MessagesPending.Inc()
} else {
if !p.publishSemaphore.TryAcquire() {
return errSendQueueIsFull
return ErrSendQueueIsFull
}

// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
Expand All @@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error {

if p.blockIfQueueFull() {
if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
return errContextExpired
return ErrContextExpired
}
} else {
if !p.client.memLimit.TryReserveMemory(requiredMem) {
return errMemoryBufferIsFull
return ErrMemoryBufferIsFull
}
}

Expand Down
Loading

0 comments on commit f491e09

Please sign in to comment.