From 7f91b2bcd798e1dc6ff27b10a557c8ee5440a83a Mon Sep 17 00:00:00 2001 From: grayson <916028390@qq.com> Date: Fri, 16 Jun 2023 17:43:33 +0800 Subject: [PATCH 01/16] [Issue 1027][producer] fix: split sendRequest and make reconnectToBroker and other operate in the same coroutine (#1029) --- pulsar/producer_partition.go | 38 ++++++++++++------------------------ 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c4a460eed4..6bd90818e2 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -84,8 +84,8 @@ type partitionProducer struct { compressionProvider compression.Provider // Channel where app is posting messages to be published - eventsChan chan interface{} - closeCh chan struct{} + dataChan chan *sendRequest + cmdChan chan interface{} connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore @@ -150,9 +150,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), + dataChan: make(chan *sendRequest, maxPendingMessages), + cmdChan: make(chan interface{}, 10), connectClosedCh: make(chan connectionClosed, 10), - closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -438,31 +438,21 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - go func() { - for { - select { - case <-p.closeCh: - p.log.Info("close producer, exit reconnect") - return - case <-p.connectClosedCh: - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } - } - }() - for { select { - case i := <-p.eventsChan: + case data := <-p.dataChan: + p.internalSend(data) + case i := <-p.cmdChan: switch v := i.(type) { - case *sendRequest: - p.internalSend(v) case *flushRequest: p.internalFlush(v) case *closeProducer: p.internalClose(v) return } + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() case <-p.batchFlushTicker.C: p.internalFlushCurrentBatch() } @@ -1165,7 +1155,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } p.options.Interceptors.BeforeSend(p, msg) - p.eventsChan <- sr + p.dataChan <- sr if !p.options.DisableBlockIfQueueFull { // block if queue full @@ -1304,8 +1294,6 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.setProducerState(producerClosed) p._getConn().UnregisterListener(p.producerID) p.batchFlushTicker.Stop() - - close(p.closeCh) } func (p *partitionProducer) LastSequenceID() int64 { @@ -1317,7 +1305,7 @@ func (p *partitionProducer) Flush() error { doneCh: make(chan struct{}), err: nil, } - p.eventsChan <- flushReq + p.cmdChan <- flushReq // wait for the flush request to complete <-flushReq.doneCh @@ -1345,7 +1333,7 @@ func (p *partitionProducer) Close() { } cp := &closeProducer{doneCh: make(chan struct{})} - p.eventsChan <- cp + p.cmdChan <- cp // wait for close producer request to complete <-cp.doneCh From 56c96911789faba94b0ca7a7f62ce2504271bd6a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 26 Jun 2023 12:08:02 +0800 Subject: [PATCH 02/16] fix: install openjdk-17 in Dockerfile (#1037) --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 7548729c76..07eff8ea2d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ ARG GOLANG_IMAGE=golang:latest FROM $PULSAR_IMAGE as pulsar FROM $GOLANG_IMAGE -RUN apt-get update && apt-get install -y openjdk-17-jre-headless ca-certificates +RUN apt-get update && apt-get install -y openjdk-17-jre ca-certificates COPY --from=pulsar /pulsar /pulsar From 5f825166530bdb89ae2cb785511092b98038d78e Mon Sep 17 00:00:00 2001 From: Don Inghram Date: Tue, 27 Jun 2023 14:40:21 -0600 Subject: [PATCH 03/16] Allow user to specify TLS ciphers an min/max TLS version (#1041) --- pulsar/client.go | 9 +++++++++ pulsar/client_impl.go | 3 +++ pulsar/internal/connection.go | 6 ++++++ pulsar/internal/http_client.go | 3 +++ 4 files changed, 21 insertions(+) diff --git a/pulsar/client.go b/pulsar/client.go index 7e6725d42e..d6f18c9aa9 100644 --- a/pulsar/client.go +++ b/pulsar/client.go @@ -120,6 +120,15 @@ type ClientOptions struct { // Configure whether the Pulsar client verify the validity of the host name from broker (default: false) TLSValidateHostname bool + // TLSCipherSuites is a list of enabled TLS 1.0–1.2 cipher suites. See tls.Config CipherSuites for more information. + TLSCipherSuites []uint16 + + // TLSMinVersion contains the minimum TLS version that is acceptable. See tls.Config MinVersion for more information. + TLSMinVersion uint16 + + // TLSMaxVersion contains the maximum TLS version that is acceptable. See tls.Config MaxVersion for more information. + TLSMaxVersion uint16 + // Configure the net model for vpc user to connect the pulsar broker ListenerName string diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 9b44987949..c283f5a827 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -91,6 +91,9 @@ func newClient(options ClientOptions) (Client, error) { TrustCertsFilePath: options.TLSTrustCertsFilePath, ValidateHostname: options.TLSValidateHostname, ServerName: url.Hostname(), + CipherSuites: options.TLSCipherSuites, + MinVersion: options.TLSMinVersion, + MaxVersion: options.TLSMaxVersion, } default: return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme)) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 59aad1675e..1f80d20d33 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -50,6 +50,9 @@ type TLSOptions struct { AllowInsecureConnection bool ValidateHostname bool ServerName string + CipherSuites []uint16 + MinVersion uint16 + MaxVersion uint16 } var ( @@ -1046,6 +1049,9 @@ func (c *connection) closed() bool { func (c *connection) getTLSConfig() (*tls.Config, error) { tlsConfig := &tls.Config{ InsecureSkipVerify: c.tlsOptions.AllowInsecureConnection, + CipherSuites: c.tlsOptions.CipherSuites, + MinVersion: c.tlsOptions.MinVersion, + MaxVersion: c.tlsOptions.MaxVersion, } if c.tlsOptions.TrustCertsFilePath != "" { diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index 2d440d6f9a..e68bd17c39 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -339,6 +339,9 @@ func getDefaultTransport(tlsConfig *TLSOptions) (http.RoundTripper, error) { if tlsConfig != nil { cfg := &tls.Config{ InsecureSkipVerify: tlsConfig.AllowInsecureConnection, + CipherSuites: tlsConfig.CipherSuites, + MinVersion: tlsConfig.MinVersion, + MaxVersion: tlsConfig.MaxVersion, } if len(tlsConfig.TrustCertsFilePath) > 0 { rootCA, err := os.ReadFile(tlsConfig.TrustCertsFilePath) From 3a4e5cfbf2c4abe3bf5df137a970ffd70f8efded Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 28 Jun 2023 10:58:26 +0800 Subject: [PATCH 04/16] Add single partition router (#999) * add single-partition-router * fix license * fix ci * Modify as sync.Once * Update pulsar/producer_test.go Co-authored-by: Zixuan Liu * Modify with CR * Verify message was published on single partition --------- Co-authored-by: Zixuan Liu --- pulsar/producer_test.go | 61 +++++++++++++++++++++++++ pulsar/single_partition_router.go | 42 +++++++++++++++++ pulsar/single_partition_router_test.go | 62 ++++++++++++++++++++++++++ 3 files changed, 165 insertions(+) create mode 100644 pulsar/single_partition_router.go create mode 100644 pulsar/single_partition_router_test.go diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index b587975c9c..2721fa3d80 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -560,6 +560,67 @@ func TestMessageRouter(t *testing.T) { assert.NotNil(t, msg) assert.Equal(t, string(msg.Payload()), "hello") } +func TestMessageSingleRouter(t *testing.T) { + // Create topic with 5 partitions + topicAdminURL := "admin/v2/persistent/public/default/my-single-partitioned-topic/partitions" + err := httpPut(topicAdminURL, 5) + defer httpDelete(topicAdminURL) + if err != nil { + t.Fatal(err) + } + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + + assert.Nil(t, err) + defer client.Close() + + numOfMessages := 10 + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: "my-single-partitioned-topic", + SubscriptionName: "my-sub", + }) + + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: "my-single-partitioned-topic", + MessageRouter: NewSinglePartitionRouter(), + }) + + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + + for i := 0; i < numOfMessages; i++ { + ID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, err) + assert.NotNil(t, ID) + } + + // Verify message was published on single partition + msgCount := 0 + msgPartitionMap := make(map[string]int) + for i := 0; i < numOfMessages; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + consumer.Ack(msg) + msgCount++ + msgPartitionMap[msg.Topic()]++ + } + assert.Equal(t, msgCount, numOfMessages) + assert.Equal(t, len(msgPartitionMap), 1) + for _, i := range msgPartitionMap { + assert.Equal(t, i, numOfMessages) + } + +} func TestNonPersistentTopic(t *testing.T) { topicName := "non-persistent://public/default/testNonPersistentTopic" diff --git a/pulsar/single_partition_router.go b/pulsar/single_partition_router.go new file mode 100644 index 0000000000..7896aa3246 --- /dev/null +++ b/pulsar/single_partition_router.go @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import "sync" + +func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int { + var ( + singlePartition *int + once sync.Once + ) + return func(message *ProducerMessage, metadata TopicMetadata) int { + numPartitions := metadata.NumPartitions() + if len(message.Key) != 0 { + // When a key is specified, use the hash of that key + return int(getHashingFunction(JavaStringHash)(message.Key) % numPartitions) + } + once.Do(func() { + partition := r.R.Intn(int(numPartitions)) + singlePartition = &partition + }) + + return *singlePartition + + } + +} diff --git a/pulsar/single_partition_router_test.go b/pulsar/single_partition_router_test.go new file mode 100644 index 0000000000..a5a42156fc --- /dev/null +++ b/pulsar/single_partition_router_test.go @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type topicMetaData struct { + partition uint32 +} + +func (t topicMetaData) NumPartitions() uint32 { + return t.partition +} + +func TestNewSinglePartitionRouter(t *testing.T) { + numPartitions := topicMetaData{2} + router := NewSinglePartitionRouter() + p := router(&ProducerMessage{ + Payload: []byte("message 2"), + }, numPartitions) + assert.GreaterOrEqual(t, p, 0) + + p2 := router(&ProducerMessage{ + Payload: []byte("message 2"), + }, numPartitions) + assert.Equal(t, p, p2) +} + +func TestNewSinglePartitionRouterWithKey(t *testing.T) { + router := NewSinglePartitionRouter() + numPartitions := topicMetaData{3} + p := router(&ProducerMessage{ + Payload: []byte("message 2"), + Key: "my-key", + }, numPartitions) + assert.Equal(t, 1, p) + + p2 := router(&ProducerMessage{ + Key: "my-key", + Payload: []byte("message 2"), + }, numPartitions) + assert.Equal(t, p, p2) +} From 3da3e5d43134e7ccca933c6a80d6a20fb926e05a Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Jun 2023 11:06:57 +0800 Subject: [PATCH 05/16] [fix] Fix ordering key not being set and parsed when batching is disabled (#1034) --- pulsar/consumer_partition.go | 1 + pulsar/producer_partition.go | 4 ++++ pulsar/producer_test.go | 22 ++++++++++++++++++---- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2d2a1940f7..cd7aa50199 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1190,6 +1190,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header redeliveryCount: response.GetRedeliveryCount(), schemaVersion: msgMeta.GetSchemaVersion(), schemaInfoCache: pc.schemaInfoCache, + orderingKey: string(msgMeta.GetOrderingKey()), index: messageIndex, brokerPublishTime: brokerPublishTime, } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6bd90818e2..837d1d78e6 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -733,6 +733,10 @@ func (p *partitionProducer) genMetadata(msg *ProducerMessage, mm.PartitionKey = proto.String(msg.Key) } + if len(msg.OrderingKey) != 0 { + mm.OrderingKey = []byte(msg.OrderingKey) + } + if msg.Properties != nil { mm.Properties = internal.ConvertFromStringMap(msg.Properties) } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 2721fa3d80..7c3abdc721 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1967,7 +1967,15 @@ func TestMemLimitContextCancel(t *testing.T) { assert.NoError(t, err) } -func TestSendMessagesWithMetadata(t *testing.T) { +func TestBatchSendMessagesWithMetadata(t *testing.T) { + testSendMessagesWithMetadata(t, false) +} + +func TestNoBatchSendMessagesWithMetadata(t *testing.T) { + testSendMessagesWithMetadata(t, true) +} + +func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -1978,7 +1986,7 @@ func TestSendMessagesWithMetadata(t *testing.T) { topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, - DisableBatching: true, + DisableBatching: disableBatch, }) assert.Nil(t, err) @@ -1989,7 +1997,10 @@ func TestSendMessagesWithMetadata(t *testing.T) { assert.Nil(t, err) msg := &ProducerMessage{EventTime: time.Now().Local(), - Payload: []byte("msg")} + Key: "my-key", + OrderingKey: "my-ordering-key", + Properties: map[string]string{"k1": "v1", "k2": "v2"}, + Payload: []byte("msg")} _, err = producer.Send(context.Background(), msg) assert.Nil(t, err) @@ -1997,5 +2008,8 @@ func TestSendMessagesWithMetadata(t *testing.T) { recvMsg, err := consumer.Receive(context.Background()) assert.Nil(t, err) - assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()), internal.TimestampMillis(msg.EventTime)) + assert.Equal(t, internal.TimestampMillis(msg.EventTime), internal.TimestampMillis(recvMsg.EventTime())) + assert.Equal(t, msg.Key, recvMsg.Key()) + assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey()) + assert.Equal(t, msg.Properties, recvMsg.Properties()) } From 6f01a7cead8704aa59afcd819545512d0259af07 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Mon, 3 Jul 2023 09:44:33 +0800 Subject: [PATCH 06/16] [Fix] check if callback is nil before calling it (#1036) Co-authored-by: gunli --- pulsar/producer_partition.go | 58 +++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 837d1d78e6..98c6c980dd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -467,6 +467,14 @@ func (p *partitionProducer) Name() string { return p.producerName } +func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg *ProducerMessage, err error) { + if cb == nil { + return + } + + cb(id, msg, err) +} + func (p *partitionProducer) internalSend(request *sendRequest) { p.log.Debug("Received send request: ", *request.msg) @@ -480,7 +488,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var err error if msg.Value != nil && msg.Payload != nil { p.log.Error("Can not set Value and Payload both") - request.callback(nil, request.msg, errors.New("can not set Value and Payload both")) + runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) return } @@ -494,7 +502,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { p.releaseSemaphoreAndMem(uncompressedPayloadSize) - request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) + runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) return } @@ -513,7 +521,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { schemaPayload, err = schema.Encode(msg.Value) if err != nil { p.releaseSemaphoreAndMem(uncompressedPayloadSize) - request.callback(nil, request.msg, newError(SchemaFailure, err.Error())) + runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } @@ -530,7 +538,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if err != nil { p.releaseSemaphoreAndMem(uncompressedPayloadSize) p.log.WithError(err).Error("get schema version fail") - request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) + runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) return } p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) @@ -589,7 +597,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { // if msg is too large and chunking is disabled if checkSize > maxMessageSize && !p.options.EnableChunking { p.releaseSemaphoreAndMem(uncompressedPayloadSize) - request.callback(nil, request.msg, errMessageTooLarge) + runCallback(request.callback, nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). WithField("size", checkSize). WithField("properties", msg.Properties). @@ -608,7 +616,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) if payloadChunkSize <= 0 { p.releaseSemaphoreAndMem(uncompressedPayloadSize) - request.callback(nil, msg, errMetaTooLarge) + runCallback(request.callback, nil, msg, errMetaTooLarge) p.log.WithError(errMetaTooLarge). WithField("metadata size", proto.Size(mm)). WithField("properties", msg.Properties). @@ -683,7 +691,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, multiSchemaEnabled); !ok { p.releaseSemaphoreAndMem(uncompressedPayloadSize) - request.callback(nil, request.msg, errFailAddToBatch) + runCallback(request.callback, nil, request.msg, errFailAddToBatch) p.log.WithField("size", uncompressedSize). WithField("properties", msg.Properties). Error("unable to add message to batch") @@ -835,7 +843,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, ) } if err != nil { - request.callback(nil, request.msg, err) + runCallback(request.callback, nil, request.msg, err) p.releaseSemaphoreAndMem(int64(len(msg.Payload))) p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value) return @@ -875,7 +883,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() { if err != nil { for _, cb := range callbacks { if sr, ok := cb.(*sendRequest); ok { - sr.callback(nil, sr.msg, err) + runCallback(sr.callback, nil, sr.msg, err) } } if errors.Is(err, internal.ErrExceedMaxMessageSize) { @@ -985,7 +993,7 @@ func (p *partitionProducer) failTimeoutMessages() { if sr.callback != nil { sr.callbackOnce.Do(func() { - sr.callback(nil, sr.msg, errSendTimeout) + runCallback(sr.callback, nil, sr.msg, errSendTimeout) }) } if sr.transaction != nil { @@ -1018,7 +1026,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if errs[i] != nil { for _, cb := range callbacks[i] { if sr, ok := cb.(*sendRequest); ok { - sr.callback(nil, sr.msg, errs[i]) + runCallback(sr.callback, nil, sr.msg, errs[i]) } } if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { @@ -1106,26 +1114,26 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - //Register transaction operation to transaction and the transaction coordinator. + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) if transactionImpl.state != TxnOpen { p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + " by a non-open transaction.") - callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction.")) + runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction.")) return } if err := transactionImpl.registerProducerTopic(p.topic); err != nil { - callback(nil, msg, err) + runCallback(callback, nil, msg, err) return } if err := transactionImpl.registerSendOrAckOp(); err != nil { - callback(nil, msg, err) + runCallback(callback, nil, msg, err) } newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) { - callback(id, producerMessage, err) + runCallback(callback, id, producerMessage, err) transactionImpl.endSendOrAckOp(err) } } else { @@ -1133,7 +1141,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } if p.getProducerState() != producerReady { // Producer is closing - newCallback(nil, msg, errProducerClosed) + runCallback(newCallback, nil, msg, errProducerClosed) return } @@ -1253,9 +1261,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { - if sr.callback != nil { - sr.callback(msgID, sr.msg, nil) - } + runCallback(sr.callback, msgID, sr.msg, nil) p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } } @@ -1406,27 +1412,23 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { - if sr.callback != nil { - sr.callback(nil, sr.msg, errSendQueueIsFull) - } + runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { p.publishSemaphore.Release() - if sr.callback != nil { - sr.callback(nil, sr.msg, errMemoryBufferIsFull) - } + runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull) return false } } else { if !p.publishSemaphore.Acquire(sr.ctx) { - sr.callback(nil, sr.msg, errContextExpired) + runCallback(sr.callback, nil, sr.msg, errContextExpired) return false } if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) { p.publishSemaphore.Release() - sr.callback(nil, sr.msg, errContextExpired) + runCallback(sr.callback, nil, sr.msg, errContextExpired) return false } } From 5f8df2782251226b63b2dad9f4e87fd4bbd31ef5 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Mon, 3 Jul 2023 09:57:14 +0800 Subject: [PATCH 07/16] [Refactor] refactor duplicated code lines and fix typo errors (#1039) * [Refactor] refactor duplicated code lines and fix typo errors * [typo] revert * [typo] revert * [typo] revert * [refactor] delete redunpdant code lines * [typo] revert some words * [fix] set useTxn when use transaction --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 94 +++++++++++++++++------------------- 1 file changed, 45 insertions(+), 49 deletions(-) mode change 100644 => 100755 pulsar/producer_partition.go diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go old mode 100644 new mode 100755 index 98c6c980dd..9d04427a88 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -710,17 +710,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, uncompressedPayload []byte, request *sendRequest, msg *ProducerMessage, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool) bool { - var ok bool + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 if request.transaction != nil { txnID := request.transaction.GetTxnID() - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, true, txnID.MostSigBits, - txnID.LeastSigBits) - } else { - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, false, 0, 0) + useTxn = true + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits } - return ok + + return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, + msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, + leastSigBits) } func (p *partitionProducer) genMetadata(msg *ProducerMessage, @@ -764,6 +766,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro } } +func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) { + if msg.SequenceID != nil { + smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID)) + } else { + smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1)) + } +} + func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage, uncompressedSize int) (smm *pb.SingleMessageMetadata) { smm = &pb.SingleMessageMetadata{ @@ -786,14 +796,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage smm.Properties = internal.ConvertFromStringMap(msg.Properties) } - var sequenceID uint64 - if msg.SequenceID != nil { - sequenceID = uint64(*msg.SequenceID) - } else { - sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) - } - - smm.SequenceId = proto.Uint64(sequenceID) + p.updateSingleMessageMetadataSeqID(smm, msg) return } @@ -813,35 +816,30 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, } sid := *mm.SequenceId - var err error + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 + if request.transaction != nil { txnID := request.transaction.GetTxnID() - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - true, - txnID.MostSigBits, - txnID.LeastSigBits, - ) - } else { - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - false, - 0, - 0, - ) - } + useTxn = true + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits + } + + err := internal.SingleSend( + buffer, + p.producerID, + sid, + mm, + payloadBuf, + p.encryptor, + maxMessageSize, + useTxn, + mostSigBits, + leastSigBits, + ) + if err != nil { runCallback(request.callback, nil, request.msg, err) p.releaseSemaphoreAndMem(int64(len(msg.Payload))) @@ -1001,7 +999,7 @@ func (p *partitionProducer) failTimeoutMessages() { } } - // flag the send has completed with error, flush make no effect + // flag the sending has completed with error, flush make no effect pi.Complete() pi.Unlock() @@ -1116,8 +1114,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) + var txn *transaction if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) + txn = transactionImpl if transactionImpl.state != TxnOpen { p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + " by a non-open transaction.") @@ -1150,10 +1150,6 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer // callbackOnce make sure the callback is only invoked once in chunking callbackOnce := &sync.Once{} - var txn *transaction - if msg.Transaction != nil { - txn = (msg.Transaction).(*transaction) - } sr := &sendRequest{ ctx: ctx, msg: msg, @@ -1398,7 +1394,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The conn must be non-nill for the lifetime of the partitionProducer. + // Invariant: The conn must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection) From 163cd7e5a7666ed59796608ccf0cb20fed76511d Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Tue, 4 Jul 2023 16:42:11 +0800 Subject: [PATCH 08/16] [Improve] improve the perf of schema and schema cache (#1033) * [Improve] improve the perf of schema and schema cache * [Fix] fix lint error * [revert] revert comment format * [revert] revert comment format * use sync.Once instead of atomic.Uint64 * revert comment format --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 21 ++++++++------------- pulsar/schema.go | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9d04427a88..012bfd96a8 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -100,29 +100,24 @@ type partitionProducer struct { } type schemaCache struct { - lock sync.RWMutex - schemas map[uint64][]byte + schemas sync.Map } func newSchemaCache() *schemaCache { - return &schemaCache{ - schemas: make(map[uint64][]byte), - } + return &schemaCache{} } func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) { - s.lock.Lock() - defer s.lock.Unlock() - key := schema.hash() - s.schemas[key] = schemaVersion + s.schemas.Store(key, schemaVersion) } func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.schemas[schema.hash()] + val, ok := s.schemas.Load(schema.hash()) + if !ok { + return nil + } + return val.([]byte) } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, diff --git a/pulsar/schema.go b/pulsar/schema.go index 0b413d40a6..5c063e32a3 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -23,6 +23,7 @@ import ( "fmt" "hash/maphash" "reflect" + "sync" "unsafe" log "github.com/sirupsen/logrus" @@ -66,13 +67,19 @@ type SchemaInfo struct { Schema string Type SchemaType Properties map[string]string + hashVal uint64 + hashOnce sync.Once } -func (s SchemaInfo) hash() uint64 { - h := maphash.Hash{} - h.SetSeed(seed) - h.Write([]byte(s.Schema)) - return h.Sum64() +func (s *SchemaInfo) hash() uint64 { + s.hashOnce.Do(func() { + h := maphash.Hash{} + h.SetSeed(seed) + h.Write([]byte(s.Schema)) + s.hashVal = h.Sum64() + }) + + return s.hashVal } type Schema interface { From aa664716ac0bfb886c756f33ba72376092e8e3a5 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Wed, 5 Jul 2023 10:20:03 +0800 Subject: [PATCH 09/16] [Fix] return when registerSendOrAckOp() failed (#1045) Master Issue: #1040 ### Motivation fix #1040, return when registerSendOrAckOp() return an error. ### Modifications - modified: pulsar/producer_partition.go --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 012bfd96a8..595cab4cbd 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1126,6 +1126,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } if err := transactionImpl.registerSendOrAckOp(); err != nil { runCallback(callback, nil, msg, err) + return } newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) { runCallback(callback, id, producerMessage, err) From b4d6d588d54d12a67dbb5dd71c5a29043290b159 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 5 Jul 2023 10:23:51 +0800 Subject: [PATCH 10/16] Fix the wrong link in the relesae process (#1050) ### Motivation The link to the `GPG keys to sign release artifacts` is dead. The link of the tag is incorrectly pointed to the pulsar-client-node. --- docs/release-process.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release-process.md b/docs/release-process.md index aa79f9ea34..e3efe8edc7 100644 --- a/docs/release-process.md +++ b/docs/release-process.md @@ -128,7 +128,7 @@ https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-go-0.X.0-candidate-1 The tag to be voted upon: v0.X.0 -https://github.com/apache/pulsar-client-node/releases/tag/v0.X.0 +https://github.com/apache/pulsar-client-go/tree/v0.X.0-candidate-1 SHA-512 checksums: 97bb1000f70011e9a585186590e0688586590e09 apache-pulsar-client-go-0.X.0-src.tar.gz From 8d4513787a25423c988708dff985e9c994545df5 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Thu, 6 Jul 2023 16:29:15 +0800 Subject: [PATCH 11/16] [Fix][Producer] check if message is nil (#1047) * [Fix][Producer] check if message is nil * add a debug log --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 6 ++++++ pulsar/producer_test.go | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 595cab4cbd..03729abb47 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1107,6 +1107,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if msg == nil { + p.log.Error("Message is nil") + runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil")) + return + } + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) var txn *transaction diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3abdc721..fecae8efee 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -108,6 +108,9 @@ func TestSimpleProducer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, ID) } + + _, err = producer.Send(context.Background(), nil) + assert.NotNil(t, err) } func TestProducerAsyncSend(t *testing.T) { @@ -152,6 +155,14 @@ func TestProducerAsyncSend(t *testing.T) { wg.Wait() assert.Equal(t, 0, errors.Size()) + + wg.Add(1) + producer.SendAsync(context.Background(), nil, func(id MessageID, m *ProducerMessage, e error) { + assert.NotNil(t, e) + assert.Nil(t, id) + wg.Done() + }) + wg.Wait() } func TestProducerCompression(t *testing.T) { From 43986b16a90fee6306c95dd23ba30690300d53b5 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 6 Jul 2023 16:29:56 +0800 Subject: [PATCH 12/16] Add 0.11.0 change log (#1048) --- CHANGELOG.md | 36 ++++++++++++++++++++++++++++++++++++ VERSION | 2 +- stable.txt | 2 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3e9351a45..973d42f496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,42 @@ All notable changes to this project will be documented in this file. +[0.11.0] 2023-07-04 + +## Features +* Support the schema type ProtoNativeSchema by @gaoran10 in https://github.com/apache/pulsar-client-go/pull/1006 +* Implement transactional consumer/producer API by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1002 +* Support NonDurable subscriptions by @dinghram in https://github.com/apache/pulsar-client-go/pull/992 +* Allow user to specify TLS ciphers an min/max TLS version by @dinghram in https://github.com/apache/pulsar-client-go/pull/1041 +* Add single partition router by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/999 + +## Improve +* Fix missing link in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1000 +* Stablize golangci-lint task in CI by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1007 +* Fix reconnection backoff logic by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/1008 +* Change token name to `GITHUB_TOKEN` in CI by @labuladong in https://github.com/apache/pulsar-client-go/pull/910 +* Add links to client docs and feature matrix in README.md by @momo-jun in https://github.com/apache/pulsar-client-go/pull/1014 +* Fix flaky test `TestMaxPendingChunkMessages` by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1003 +* Fix flaky test in `negative_acks_tracker_test.go` by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1017 +* Fix event time not being set when batching is disabled by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1015 +* Use maphash instead of crypto/sha256 for hash function of hashmap in Schema.hash() by @bpereto in https://github.com/apache/pulsar-client-go/pull/1022 +* Improve logs on failTimeoutMessages by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1025 +* Delete LICENSE-go-rate.txt by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1028 +* Fix broken master by upgrading JRE to 17 by @BewareMyPower in https://github.com/apache/pulsar-client-go/pull/1030 +* Split sendRequest and make reconnectToBroker and other operation in the same coroutine by @zengguan in https://github.com/apache/pulsar-client-go/pull/1029 +* Install openjdk-17 in Dockerfile by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1037 +* Fix ordering key not being set and parsed when batching is disabled by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1034 +* Check if callback is nil before calling it by @gunli in https://github.com/apache/pulsar-client-go/pull/1036 +* Refactor duplicated code lines and fix typo errors by @gunli in https://github.com/apache/pulsar-client-go/pull/1039 + +## New Contributors +* @gaoran10 made their first contribution in https://github.com/apache/pulsar-client-go/pull/1006 +* @momo-jun made their first contribution in https://github.com/apache/pulsar-client-go/pull/1014 +* @bpereto made their first contribution in https://github.com/apache/pulsar-client-go/pull/1022 +* @zengguan made their first contribution in https://github.com/apache/pulsar-client-go/pull/1029 + +**Full Changelog**: https://github.com/apache/pulsar-client-go/compare/v0.10.0...v0.11.0-candidate-1 + [0.10.0] 2023-03-27 ## Feature diff --git a/VERSION b/VERSION index 46cd5a5b14..89d00e6236 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.10.0 +v0.11.0 diff --git a/stable.txt b/stable.txt index fab6099c07..66c47738bd 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.10.0 +v0.11.0 From dd920ef2670e9b59447540a695db807e7cd9731c Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 10 Jul 2023 17:10:04 +0800 Subject: [PATCH 13/16] [fix] Fix 0.11.0 change log (#1054) ### Motivation https://github.com/apache/pulsar-client-go/pull/1048#issuecomment-1623565815 ### Modifications - Add @gunli to the new contributor list in 0.11.0 change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 973d42f496..f1d4839dc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ All notable changes to this project will be documented in this file. * @momo-jun made their first contribution in https://github.com/apache/pulsar-client-go/pull/1014 * @bpereto made their first contribution in https://github.com/apache/pulsar-client-go/pull/1022 * @zengguan made their first contribution in https://github.com/apache/pulsar-client-go/pull/1029 +* @gunli made their first contribution in https://github.com/apache/pulsar-client-go/pull/1036 **Full Changelog**: https://github.com/apache/pulsar-client-go/compare/v0.10.0...v0.11.0-candidate-1 From be3574019383ac0cdc65fec63e422fcfd6c82e4b Mon Sep 17 00:00:00 2001 From: Jiaqi Shen <18863662628@163.com> Date: Tue, 11 Jul 2023 15:35:26 +0800 Subject: [PATCH 14/16] [fix] [issue 877] Fix ctx in partitionProducer.Send() is not performing as expected (#1053) Fixes #877 ### Motivation The original PR is #878. Because the original author @billowqiu has not continued to reply to the review comments for a long time, resubmit the fix here. ### Modifications - Add select for ctx and doneCh in partitionProducer.Send() --------- Co-authored-by: shenjiaqi.2769 --- pulsar/producer_partition.go | 10 +++++++--- pulsar/producer_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 03729abb47..dd45ff249b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1095,9 +1095,13 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes }, true) // wait for send request to finish - <-doneCh - - return msgID, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-doneCh: + // send request has been finished + return msgID, err + } } func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index fecae8efee..11ff089387 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2024,3 +2024,29 @@ func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) { assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey()) assert.Equal(t, msg.Properties, recvMsg.Properties()) } + +func TestProducerSendWithContext(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + // Make ctx be canceled to invalidate the context immediately + cancel() + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: make([]byte, 1024*1024), + }) + // producer.Send should fail and return err context.Canceled + assert.True(t, errors.Is(err, context.Canceled)) +} From e45122c2defc5efd4efc493d0acef278a7ccfc01 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Thu, 13 Jul 2023 17:15:37 +0800 Subject: [PATCH 15/16] [Fix][Producer] Stop block request even if Value and Payload are both set (#1052) ### Motivation Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && msg.Payload != nil`, request will be blocked forever 'cause `defer request.stopBlock()` is set up after the verify logic. ```go if msg.Value != nil && msg.Payload != nil { p.log.Error("Can not set Value and Payload both") runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) return } // The block chan must be closed when returned with exception defer request.stopBlock() ``` Here is the PR to stop block request even if Value and Payload are both set ### Modifications - pulsar/producer_partition.go --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 11 ++++++----- pulsar/producer_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index dd45ff249b..48411b4e0c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var schemaPayload []byte var err error - if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) - return - } // The block chan must be closed when returned with exception defer request.stopBlock() @@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer return } + if msg.Value != nil && msg.Payload != nil { + p.log.Error("Can not set Value and Payload both") + runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) + return + } + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) var txn *transaction diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 11ff089387..adbdc71e67 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) { _, err = producer.Send(context.Background(), nil) assert.NotNil(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + Value: []byte("hello"), + }) + assert.NotNil(t, err) } func TestProducerAsyncSend(t *testing.T) { @@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) { wg.Done() }) wg.Wait() + + wg.Add(1) + producer.SendAsync(context.Background(), &ProducerMessage{Payload: []byte("hello"), Value: []byte("hello")}, + func(id MessageID, m *ProducerMessage, e error) { + assert.NotNil(t, e) + assert.Nil(t, id) + wg.Done() + }) + wg.Wait() } func TestProducerCompression(t *testing.T) { From 3812c07a0b8f6c37a2803cd740f4bf917160fd22 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Tue, 18 Jul 2023 18:57:47 +0800 Subject: [PATCH 16/16] [Improve][Producer] simplify the flush logic (#1049) ### Motivation Simplify the producer flush logic ### Modifications 1. add a callback field to the pendingItem, default is nil; 2. in partitionProducer.internalFlush() get the last pendingItem from pendingQueue; 3. update the last pendingItem by setup a new callback; 4. in partitionProducer.ReceivedSendReceipt, no need to identify the sendRequest by checking if the msg is nil; 5. in pendingItem.Complete(), invoke its callback to notify the flush is done. --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 59 +++++++++++++++--------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 48411b4e0c..e74fd984f4 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -848,11 +848,12 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, type pendingItem struct { sync.Mutex - buffer internal.Buffer - sequenceID uint64 - sentAt time.Time - sendRequests []interface{} - completed bool + buffer internal.Buffer + sequenceID uint64 + sentAt time.Time + sendRequests []interface{} + completed bool + flushCallback func(err error) } func (p *partitionProducer) internalFlushCurrentBatch() { @@ -990,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the sending has completed with error, flush make no effect - pi.Complete() + pi.Complete(errSendTimeout) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1062,15 +1063,10 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { return } - sendReq := &sendRequest{ - msg: nil, - callback: func(id MessageID, message *ProducerMessage, e error) { - fr.err = e - close(fr.doneCh) - }, + pi.flushCallback = func(err error) { + fr.err = err + close(fr.doneCh) } - - pi.sendRequests = append(pi.sendRequests, sendReq) } func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { @@ -1208,27 +1204,17 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) pi.Lock() defer pi.Unlock() p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) - batchSize := int32(0) - for _, i := range pi.sendRequests { - sr := i.(*sendRequest) - if sr.msg != nil { - batchSize = batchSize + 1 - } else { // Flush request - break - } - } + batchSize := int32(len(pi.sendRequests)) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))) - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - } + atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) + p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))) + p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) + p.metrics.MessagesPublished.Inc() + p.metrics.MessagesPending.Dec() + payloadSize := float64(len(sr.msg.Payload)) + p.metrics.BytesPublished.Add(payloadSize) + p.metrics.BytesPending.Sub(payloadSize) if sr.callback != nil || len(p.options.Interceptors) > 0 { msgID := newMessageID( @@ -1274,7 +1260,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.Complete() + pi.Complete(nil) } } @@ -1384,12 +1370,15 @@ type flushRequest struct { err error } -func (i *pendingItem) Complete() { +func (i *pendingItem) Complete(err error) { if i.completed { return } i.completed = true buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } } // _setConn sets the internal connection field of this partition producer atomically.