Skip to content

Commit

Permalink
compatibility with SwiftProtobuf 1.27.0 and older versions
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 17, 2024
1 parent 0eca234 commit 1f660aa
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
20 changes: 12 additions & 8 deletions Sources/SwiftCentrifuge/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -892,17 +892,21 @@ fileprivate extension CentrifugeClient {
}

private func handleData(data: Data) {
guard let replies = try? CentrifugeSerializer.deserializeCommands(data: data) else { return }
for reply in replies {
if reply.id > 0 {
self.opCallbacks[reply.id]?(CentrifugeResolveData(error: nil, reply: reply))
} else {
if !reply.hasPush {
self.handlePing()
do {
let replies = try CentrifugeSerializer.deserializeCommands(data: data)
for reply in replies {
if reply.id > 0 {
self.opCallbacks[reply.id]?(CentrifugeResolveData(error: nil, reply: reply))
} else {
self.handlePush(push: reply.push)
if !reply.hasPush {
self.handlePing()
} else {
self.handlePush(push: reply.push)
}
}
}
} catch {
self.log.error("error deserializeCommands: \(error)")
}
}

Expand Down
55 changes: 45 additions & 10 deletions Sources/SwiftCentrifuge/Helpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ public struct StreamPosition {
var epoch: String = ""
}

// Helper function to decode a varint.
func readVarint(from data: Data) throws -> (value: Int, length: Int) {
var value = 0
var length = 0
for byte in data {
value |= Int(byte & 0x7F) << (7 * length)
length += 1
if (byte & 0x80) == 0 {
return (value, length)
}
}
throw ProtobufDecodeError.failedToReadVarintLengthPrefix
}

private enum ProtobufDecodeError: Error {
case failedToReadVarintLengthPrefix
case notEnoughDataForMessage
case failedToParseMessage(Error)
}

internal enum CentrifugeSerializer {
static func serializeCommands(commands: [Centrifugal_Centrifuge_Protocol_Command]) throws -> Data {
let stream = OutputStream.toMemory()
Expand All @@ -46,20 +66,35 @@ internal enum CentrifugeSerializer {
return stream.property(forKey: .dataWrittenToMemoryStreamKey) as! Data
}

// Note, not using BinaryDelimited.parse to work with all Protobuf versions - SwiftProtobuf 1.27.0
// introduced noBytesAvailable error which was not available before.
static func deserializeCommands(data: Data) throws -> [Centrifugal_Centrifuge_Protocol_Reply] {
var commands = [Centrifugal_Centrifuge_Protocol_Reply]()
let stream = InputStream(data: data as Data)
stream.open()
while true {
var replies = [Centrifugal_Centrifuge_Protocol_Reply]()
var currentIndex = data.startIndex

while currentIndex < data.endIndex {
// Read the varint length prefix.
let remainingData = data[currentIndex...]
let (messageLength, lengthBytes) = try readVarint(from: remainingData)
// Calculate the total length of the message (varint length prefix + message data).
let totalLength = lengthBytes + messageLength
// Ensure there is enough data left.
guard currentIndex + totalLength <= data.endIndex else {
throw ProtobufDecodeError.notEnoughDataForMessage
}
// Extract the message data.
let messageData = data[(currentIndex + lengthBytes)..<(currentIndex + totalLength)]
// Parse the Protobuf message payload to Centrifuge Reply.
do {
let res = try BinaryDelimited.parse(messageType: Centrifugal_Centrifuge_Protocol_Reply.self, from: stream)
commands.append(res)
} catch BinaryDelimited.Error.truncated {
// End of stream
break
let reply = try Centrifugal_Centrifuge_Protocol_Reply(serializedData: messageData)
replies.append(reply)
} catch {
throw ProtobufDecodeError.failedToParseMessage(error)
}
// Move to the next message.
currentIndex += totalLength
}
return commands
return replies
}
}

Expand Down

0 comments on commit 1f660aa

Please sign in to comment.