From 69a06d5aeba516431039b3af5d7fd530762761ae Mon Sep 17 00:00:00 2001 From: Stepan Bujnak Date: Fri, 4 Oct 2024 12:33:45 +0200 Subject: [PATCH] [fix] Only decompress the payload if it's not empty (#1280) The message payload is optional and in some cases only message properties are sent. In this case, the message decompression would fail so we only want to do the decompression if the payload is not empty. --- pulsar/consumer_partition.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 16aef5d45b..823e0e8745 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1105,15 +1105,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } } - // decryption is success, decompress the payload - uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer) - if err != nil { - pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) - return err - } + var uncompressedHeadersAndPayload internal.Buffer + // decryption is success, decompress the payload, but only if payload is not empty + if n := msgMeta.UncompressedSize; n != nil && *n > 0 { + uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer) + if err != nil { + pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) + return err + } - // Reset the reader on the uncompressed buffer - reader.ResetBuffer(uncompressedHeadersAndPayload) + // Reset the reader on the uncompressed buffer + reader.ResetBuffer(uncompressedHeadersAndPayload) + } numMsgs := 1 if msgMeta.NumMessagesInBatch != nil {