Skip to content

Commit

Permalink
[fix] Only decompress the payload if it's not empty (#1280)
Browse files Browse the repository at this point in the history
The message payload is optional and in some cases only message
properties are sent. In this case, the message decompression would fail
so we only want to do the decompression if the payload is not empty.
  • Loading branch information
stepanbujnak authored Oct 4, 2024
1 parent 7484c93 commit 69a06d5
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,15 +1105,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
}

// decryption is success, decompress the payload
uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}
var uncompressedHeadersAndPayload internal.Buffer
// decryption is success, decompress the payload, but only if payload is not empty
if n := msgMeta.UncompressedSize; n != nil && *n > 0 {
uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}

// Reset the reader on the uncompressed buffer
reader.ResetBuffer(uncompressedHeadersAndPayload)
// Reset the reader on the uncompressed buffer
reader.ResetBuffer(uncompressedHeadersAndPayload)
}

numMsgs := 1
if msgMeta.NumMessagesInBatch != nil {
Expand Down

0 comments on commit 69a06d5

Please sign in to comment.