diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index 996ebb4e0..a1a13619b 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -20,6 +20,7 @@ package admin import ( "bytes" "encoding/binary" + "encoding/json" "io" "net/http" "net/url" @@ -230,7 +231,14 @@ func safeRespClose(resp *http.Response) { const ( PublishTimeHeader = "X-Pulsar-Publish-Time" BatchHeader = "X-Pulsar-Num-Batch-Message" - PropertyPrefix = "X-Pulsar-Property-" + + // PropertyPrefix is part of the old protocol for message properties. + PropertyPrefix = "X-Pulsar-Property-" + + // PropertyHeader is part of the new protocol introduced in SNIP-279 + // https://github.com/apache/pulsar/pull/20627 + // The value is a JSON string representing the properties. + PropertyHeader = "X-Pulsar-Property" ) func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) { @@ -261,6 +269,11 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e properties[BatchHeader] = h } return getIndividualMsgsFromBatch(topic, ID, payload, properties) + case k == PropertyHeader: + propJSON := resp.Header.Get(k) + if err := json.Unmarshal([]byte(propJSON), &properties); err != nil { + return nil, err + } case strings.Contains(k, PropertyPrefix): key := strings.TrimPrefix(k, PropertyPrefix) properties[key] = resp.Header.Get(k) diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index 92c79c1f6..98db69619 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -144,6 +144,58 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) { } } +func TestPeekMessageWithProperties(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + topicName, _ := utils.GetTopicName(topic) + subName := "test-sub" + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + // Create a producer for non-batch messages + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.NoError(t, err) + defer producer.Close() + + props := map[string]string{ + "key1": "value1", + "KEY2": "VALUE2", + "KeY3": "VaLuE3", + "details=man": "good at playing basketball", + } + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: []byte("test-message"), + Properties: props, + }) + assert.NoError(t, err) + + // Peek messages + messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1) + assert.NoError(t, err) + assert.NotNil(t, messages) + + // Verify properties of messages + for _, msg := range messages { + assert.Equal(t, "value1", msg.Properties["key1"]) + assert.Equal(t, "VALUE2", msg.Properties["KEY2"]) + assert.Equal(t, "VaLuE3", msg.Properties["KeY3"]) + assert.Equal(t, "good at playing basketball", msg.Properties["details=man"]) + } +} + func TestGetMessageByID(t *testing.T) { randomName := newTopicName() topic := "persistent://public/default/" + randomName