Skip to content

Commit

Permalink
Compatible the HTTP header properties with SNIP-279
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Oct 23, 2024
1 parent 06f2693 commit b1e1f9d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pulsaradmin/pkg/admin/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package admin
import (
"bytes"
"encoding/binary"
"encoding/json"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -230,7 +231,14 @@ func safeRespClose(resp *http.Response) {
const (
PublishTimeHeader = "X-Pulsar-Publish-Time"
BatchHeader = "X-Pulsar-Num-Batch-Message"
PropertyPrefix = "X-Pulsar-Property-"

// PropertyPrefix is part of the old protocol for message properties.
PropertyPrefix = "X-Pulsar-Property-"

// PropertyHeader is part of the new protocol introduced in SNIP-279
// https://github.com/apache/pulsar/pull/20627
// The value is a JSON string representing the properties.
PropertyHeader = "X-Pulsar-Property"
)

func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {
Expand Down Expand Up @@ -261,6 +269,11 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
return nil, err
}
case strings.Contains(k, PropertyPrefix):
key := strings.TrimPrefix(k, PropertyPrefix)
properties[key] = resp.Header.Get(k)
Expand Down
52 changes: 52 additions & 0 deletions pulsaradmin/pkg/admin/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,58 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
}
}

func TestPeekMessageWithProperties(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, err := utils.GetTopicName(topic)

Check failure on line 150 in pulsaradmin/pkg/admin/subscription_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
subName := "test-sub"

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()

// Create a producer for non-batch messages
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()

props := map[string]string{
"key1": "value1",
"KEY2": "VALUE2",
"KeY3": "VaLuE3",
"details=man": "good at playing basketball",
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("test-message"),
Properties: props,
})
assert.NoError(t, err)

// Peek messages
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1)
assert.NoError(t, err)
assert.NotNil(t, messages)

// Verify properties of messages
for _, msg := range messages {
assert.Equal(t, "value1", msg.Properties["key1"])
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
}
}

func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
Expand Down

0 comments on commit b1e1f9d

Please sign in to comment.