Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
graysonzeng authored Jun 16, 2023
2 parents 8902e21 + 6acecf0 commit e4965ac
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ARG GOLANG_IMAGE=golang:latest
FROM $PULSAR_IMAGE as pulsar
FROM $GOLANG_IMAGE

RUN apt-get update && apt-get install -y openjdk-11-jre-headless ca-certificates
RUN apt-get update && apt-get install -y openjdk-17-jre-headless ca-certificates

COPY --from=pulsar /pulsar /pulsar

Expand Down
8 changes: 8 additions & 0 deletions perf/perf-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type ConsumeArgs struct {
ReceiverQueueSize int
EnableBatchIndexAck bool
EnableAutoScaledReceiverQueueSize bool
SubscriptionMode pulsar.SubscriptionMode
SubscriptionType pulsar.SubscriptionType
}

func newConsumerCommand() *cobra.Command {
Expand All @@ -60,6 +62,10 @@ func newConsumerCommand() *cobra.Command {
flags.BoolVar(&consumeArgs.EnableBatchIndexAck, "enable-batch-index-ack", false, "Whether to enable batch index ACK")
flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, "enable-auto-scaled-queue-size", false,
"Whether to enable auto scaled receiver queue size")
flags.IntVarP((*int)(&consumeArgs.SubscriptionMode), "subscription-mode", "m", int(pulsar.Durable),
"Subscription mode")
flags.IntVarP((*int)(&consumeArgs.SubscriptionType), "subscription-type", "t", int(pulsar.Exclusive),
"Subscription type")

return cmd
}
Expand All @@ -83,6 +89,8 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
Type: consumeArgs.SubscriptionType,
SubscriptionMode: consumeArgs.SubscriptionMode,
})

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions perf/pulsar-perf-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type ClientArgs struct {
ServiceURL string
TokenFile string
TLSTrustCertFile string
TLSServerCertFile string
TLSServerKeyFile string
MaxConnectionsPerBroker int
}

Expand All @@ -62,6 +64,8 @@ func NewClient() (pulsar.Client, error) {
os.Exit(1)
}
clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes))
} else if clientArgs.TLSServerCertFile != "" && clientArgs.TLSServerKeyFile != "" {
clientOpts.Authentication = pulsar.NewAuthenticationTLS(clientArgs.TLSServerCertFile, clientArgs.TLSServerKeyFile)
}

if clientArgs.TLSTrustCertFile != "" {
Expand Down Expand Up @@ -97,6 +101,8 @@ func main() {
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file")
flags.StringVar(&clientArgs.TLSServerCertFile, "cert-file", "", "file path to the TLS authentication cert")
flags.StringVar(&clientArgs.TLSServerKeyFile, "key-file", "", "file path to the TLS authentication key")
flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file")
flags.IntVarP(&clientArgs.MaxConnectionsPerBroker, "max-connections", "c", 1,
"Max connections to open to broker. Defaults to 1.")
Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ type ConsumerOptions struct {
// NOTE: This option does not work if AckWithResponse is true
// because there are only synchronous APIs for acknowledgment
AckGroupingOptions *AckGroupingOptions

// SubscriptionMode specifies the subscription mode to be used when subscribing to a topic.
// Default is `Durable`
SubscriptionMode SubscriptionMode
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: nil,
subscriptionMode: durable,
subscriptionMode: c.options.SubscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
Expand Down
12 changes: 6 additions & 6 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func (s consumerState) String() string {
}
}

type subscriptionMode int
type SubscriptionMode int

const (
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
durable subscriptionMode = iota
Durable SubscriptionMode = iota

// Lightweight subscription mode that doesn't have a durable cursor associated
nonDurable
NonDurable
)

const (
Expand Down Expand Up @@ -101,7 +101,7 @@ type partitionConsumerOpts struct {
replicateSubscriptionState bool
startMessageID *trackingMessageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
subscriptionMode SubscriptionMode
readCompacted bool
disableForceTopicCreation bool
interceptors ConsumerInterceptors
Expand Down Expand Up @@ -1698,7 +1698,7 @@ func (pc *partitionConsumer) grabConn() error {
RequestId: proto.Uint64(requestID),
ConsumerName: proto.String(pc.name),
PriorityLevel: nil,
Durable: proto.Bool(pc.options.subscriptionMode == durable),
Durable: proto.Bool(pc.options.subscriptionMode == Durable),
Metadata: internal.ConvertFromStringMap(pc.options.metadata),
SubscriptionProperties: internal.ConvertFromStringMap(pc.options.subProperties),
ReadCompacted: proto.Bool(pc.options.readCompacted),
Expand All @@ -1709,7 +1709,7 @@ func (pc *partitionConsumer) grabConn() error {
}

pc.startMessageID.set(pc.clearReceiverQueue())
if pc.options.subscriptionMode != durable {
if pc.options.subscriptionMode != Durable {
// For regular subscriptions the broker will determine the restarting point
cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())
}
Expand Down
72 changes: 72 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4074,6 +4074,78 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
})
}

func TestConsumerNonDurable(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := newTopicName()
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
SubscriptionMode: NonDurable,
})
assert.Nil(t, err)

i := 1
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}

msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)

consumer.Close()

i++

// send a message. Pulsar should delete it as there is no active subscription
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}

i++

// Subscribe again
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
SubscriptionMode: NonDurable,
})
assert.Nil(t, err)
defer consumer.Close()

if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}

msg, err = consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}

func TestConsumerBatchIndexAckDisabled(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down
2 changes: 1 addition & 1 deletion pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
receiverQueueSize: receiverQueueSize,
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
subscriptionMode: NonDurable,
readCompacted: options.ReadCompacted,
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,
Expand Down

0 comments on commit e4965ac

Please sign in to comment.