Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] normalize and export the errors #1143

Merged
merged 12 commits into from
Dec 29, 2023

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Dec 15, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

Currently, producing retrying is not implemented by the client, when I want to retry, it depends on the errors returned by the callback, some errors can retry, some should stop and fail fast(for example the queue is full), but the errors are not normalized and exported now, this PR normalize and export the errors of the producer.

Modifications

  1. export the errors:
        ErrFailAddToBatch     = newError(AddToBatchFailed, "message add to batch failed")
	ErrSendTimeout        = newError(TimeoutError, "message send timeout")
	ErrSendQueueIsFull    = newError(ProducerQueueIsFull, "producer send queue is full")
	ErrContextExpired     = newError(TimeoutError, "message send context expired")
	ErrMessageTooLarge    = newError(MessageTooBig, "message size exceeds MaxMessageSize")
	ErrMetaTooLarge       = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
	ErrProducerClosed     = newError(ProducerClosed, "producer already been closed")
	ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
	ErrSchema                  = newError(SchemaFailure, "schema error")
	ErrTransaction        = newError(TransactionNoFoundError, "transaction error")
	ErrInvalidMessage     = newError(InvalidMessage, "invalid message")
	ErrTopicNotfound      = newError(TopicNotFound, "topic not found")
	ErrTopicTerminated    = newError(TopicTerminated, "topic terminated")
	ErrProducerBlocked    = newError(ProducerBlockedQuotaExceededException, "producer blocked")
	ErrProducerFenced     = newError(ProducerFenced, "producer fenced")
  1. add a joinErrors(errs ...errors) to join errors:
// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
	return multierror.Append(nil, errs...)
}
  1. In the application code, use errors.Is(err, pulsar.ErrChema) to determine if an error can retry or not;

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Makefile Outdated
@@ -20,7 +20,7 @@
IMAGE_NAME = pulsar-client-go-test:latest
PULSAR_VERSION ?= 2.10.3
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
GO_VERSION ?= 1.18
GO_VERSION ?= 1.20
Copy link
Member

@RobertIndie RobertIndie Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me.

This is a major change. Could you upgrade the Go version in another PR?

Copy link
Contributor Author

@gunli gunli Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobertIndie If I upgrade it in another PR, this PR will fail the CI, and the CI is failed now, may be I should update the go version in ci.yml either:

go-version: [1.16, 1.17, 1.18, 1.19]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break the CI and will break the compatibility for the older go version. Could you implement it another way? Or do you think there is a strong motivation that we need to upgrade the go version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to upgrade the go version, it just so happends that I use go 1.20 at the same time, I implement this is PR just in the purpose of retrying and when I test my retrying demo and found that I can not retry all the failed requests, some errors should fail fast instead of retrying(.e.g ErrSendQueueIsFull).

And I just found #1142 today, and found that this is a common requirement. I've checked the 3rd party packages, "https://github.com/cockroachdb/errors" require go 1.19, "https://github.com/hashicorp/go-multierror" has a different package name multierror, "https://github.com/uber-go/multierr" require go 1.20.

For the sake of compatibility, may be the simplest way is to implement errors.Join() in pulsar-client-go, what I mean is to copy the code in go SDK into our package, I will try that tomorrow.

@gunli
Copy link
Contributor Author

gunli commented Dec 20, 2023

@RobertIndie I have updated this PR and use "github.com/hashicorp/go-multierror" to join errors instead of errors.Join() of Go 1.20, would you PTAL at again?

Also cc @tisonkun @nodece @WoWsj! @Gleiphir2769

@gunli gunli mentioned this pull request Dec 20, 2023
Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed.

@nodece
Copy link
Member

nodece commented Dec 25, 2023

fmt.Errorf("component error: %w", err) is also a good idea to wrap the error, and it works with errors.Is().

%w introduced by Go 1.30, see https://pkg.go.dev/fmt@go1.13beta1 and https://stackoverflow.com/questions/61283248/format-errors-in-go-s-v-or-w

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nodece nodece merged commit f491e09 into apache:master Dec 29, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants