From 5a1824b4e39d6802551c8e1a6e7956d972af62a6 Mon Sep 17 00:00:00 2001 From: Don Inghram Date: Thu, 15 Jun 2023 07:17:52 -0600 Subject: [PATCH 1/2] [Issue 468] Add Support for NonDurable subscriptions (#992) --- perf/perf-consumer.go | 8 ++++ perf/pulsar-perf-go.go | 6 +++ pulsar/consumer.go | 4 ++ pulsar/consumer_impl.go | 2 +- pulsar/consumer_partition.go | 12 +++--- pulsar/consumer_test.go | 72 ++++++++++++++++++++++++++++++++++++ pulsar/reader_impl.go | 2 +- 7 files changed, 98 insertions(+), 8 deletions(-) diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go index 825de62ff0..2172af533e 100644 --- a/perf/perf-consumer.go +++ b/perf/perf-consumer.go @@ -36,6 +36,8 @@ type ConsumeArgs struct { ReceiverQueueSize int EnableBatchIndexAck bool EnableAutoScaledReceiverQueueSize bool + SubscriptionMode pulsar.SubscriptionMode + SubscriptionType pulsar.SubscriptionType } func newConsumerCommand() *cobra.Command { @@ -60,6 +62,10 @@ func newConsumerCommand() *cobra.Command { flags.BoolVar(&consumeArgs.EnableBatchIndexAck, "enable-batch-index-ack", false, "Whether to enable batch index ACK") flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, "enable-auto-scaled-queue-size", false, "Whether to enable auto scaled receiver queue size") + flags.IntVarP((*int)(&consumeArgs.SubscriptionMode), "subscription-mode", "m", int(pulsar.Durable), + "Subscription mode") + flags.IntVarP((*int)(&consumeArgs.SubscriptionType), "subscription-type", "t", int(pulsar.Exclusive), + "Subscription type") return cmd } @@ -83,6 +89,8 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { SubscriptionName: consumeArgs.SubscriptionName, EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck, EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize, + Type: consumeArgs.SubscriptionType, + SubscriptionMode: consumeArgs.SubscriptionMode, }) if err != nil { diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index a672a30271..40257e83c3 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -43,6 +43,8 @@ type ClientArgs struct { ServiceURL string TokenFile string TLSTrustCertFile string + TLSServerCertFile string + TLSServerKeyFile string MaxConnectionsPerBroker int } @@ -62,6 +64,8 @@ func NewClient() (pulsar.Client, error) { os.Exit(1) } clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes)) + } else if clientArgs.TLSServerCertFile != "" && clientArgs.TLSServerKeyFile != "" { + clientOpts.Authentication = pulsar.NewAuthenticationTLS(clientArgs.TLSServerCertFile, clientArgs.TLSServerKeyFile) } if clientArgs.TLSTrustCertFile != "" { @@ -97,6 +101,8 @@ func main() { flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u", "pulsar://localhost:6650", "The Pulsar service URL") flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file") + flags.StringVar(&clientArgs.TLSServerCertFile, "cert-file", "", "file path to the TLS authentication cert") + flags.StringVar(&clientArgs.TLSServerKeyFile, "key-file", "", "file path to the TLS authentication key") flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file") flags.IntVarP(&clientArgs.MaxConnectionsPerBroker, "max-connections", "c", 1, "Max connections to open to broker. Defaults to 1.") diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 3ef72c7ca8..a62eabe1ed 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -242,6 +242,10 @@ type ConsumerOptions struct { // NOTE: This option does not work if AckWithResponse is true // because there are only synchronous APIs for acknowledgment AckGroupingOptions *AckGroupingOptions + + // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. + // Default is `Durable` + SubscriptionMode SubscriptionMode } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 07f38c3633..d782beab5e 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -385,7 +385,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: nil, - subscriptionMode: durable, + subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, maxReconnectToBroker: c.options.MaxReconnectToBroker, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 6e241d8f7f..2d2a1940f7 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -65,15 +65,15 @@ func (s consumerState) String() string { } } -type subscriptionMode int +type SubscriptionMode int const ( // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position - durable subscriptionMode = iota + Durable SubscriptionMode = iota // Lightweight subscription mode that doesn't have a durable cursor associated - nonDurable + NonDurable ) const ( @@ -101,7 +101,7 @@ type partitionConsumerOpts struct { replicateSubscriptionState bool startMessageID *trackingMessageID startMessageIDInclusive bool - subscriptionMode subscriptionMode + subscriptionMode SubscriptionMode readCompacted bool disableForceTopicCreation bool interceptors ConsumerInterceptors @@ -1698,7 +1698,7 @@ func (pc *partitionConsumer) grabConn() error { RequestId: proto.Uint64(requestID), ConsumerName: proto.String(pc.name), PriorityLevel: nil, - Durable: proto.Bool(pc.options.subscriptionMode == durable), + Durable: proto.Bool(pc.options.subscriptionMode == Durable), Metadata: internal.ConvertFromStringMap(pc.options.metadata), SubscriptionProperties: internal.ConvertFromStringMap(pc.options.subProperties), ReadCompacted: proto.Bool(pc.options.readCompacted), @@ -1709,7 +1709,7 @@ func (pc *partitionConsumer) grabConn() error { } pc.startMessageID.set(pc.clearReceiverQueue()) - if pc.options.subscriptionMode != durable { + if pc.options.subscriptionMode != Durable { // For regular subscriptions the broker will determine the restarting point cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get()) } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 521e576767..2d785b4b23 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4074,6 +4074,78 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T) { }) } +func TestConsumerNonDurable(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := newTopicName() + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Type: Shared, + SubscriptionMode: NonDurable, + }) + assert.Nil(t, err) + + i := 1 + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d", i)), + }); err != nil { + t.Fatal(err) + } + + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + consumer.Ack(msg) + + consumer.Close() + + i++ + + // send a message. Pulsar should delete it as there is no active subscription + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d", i)), + }); err != nil { + t.Fatal(err) + } + + i++ + + // Subscribe again + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Type: Shared, + SubscriptionMode: NonDurable, + }) + assert.Nil(t, err) + defer consumer.Close() + + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d", i)), + }); err != nil { + t.Fatal(err) + } + + msg, err = consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + consumer.Ack(msg) +} + func TestConsumerBatchIndexAckDisabled(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 36b492abea..5a2128a377 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -106,7 +106,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { receiverQueueSize: receiverQueueSize, startMessageID: startMessageID, startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: nonDurable, + subscriptionMode: NonDurable, readCompacted: options.ReadCompacted, metadata: options.Properties, nackRedeliveryDelay: defaultNackRedeliveryDelay, From 6acecf06aee5ab987b5e527d24a2aee7f82f3598 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 16 Jun 2023 13:24:56 +0800 Subject: [PATCH 2/2] Fix broken master by upgrading JRE to 17 (#1030) --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f66eba54b5..7548729c76 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ ARG GOLANG_IMAGE=golang:latest FROM $PULSAR_IMAGE as pulsar FROM $GOLANG_IMAGE -RUN apt-get update && apt-get install -y openjdk-11-jre-headless ca-certificates +RUN apt-get update && apt-get install -y openjdk-17-jre-headless ca-certificates COPY --from=pulsar /pulsar /pulsar