Skip to content

Commit

Permalink
Version 0.3.4
Browse files Browse the repository at this point in the history
RLA: Type annotations
RLA: Add  `SSEDataLengthOutOfRangeException`
  • Loading branch information
mos9527 committed Sep 7, 2024
1 parent ceaeef7 commit f0423d7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
2 changes: 1 addition & 1 deletion sssekai/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__VERSION_MAJOR__ = 0
__VERSION_MINOR__ = 3
__VERSION_PATCH__ = 3
__VERSION_PATCH__ = 4

__version__ = '%s.%s.%s' % (__VERSION_MAJOR__,__VERSION_MINOR__,__VERSION_PATCH__)
52 changes: 47 additions & 5 deletions sssekai/fmt/rla.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,32 @@
from collections import defaultdict
import math, gzip
import msgpack
# Sekai_Streaming_StreamingCommon__cctor
read_int = lambda stream, nbytes, signed=False: int.from_bytes(stream.read(nbytes), 'little',signed=signed)
read_float = lambda stream: s_unpack('<f', stream.read(4))[0]
class SSEDataLengthOutOfRangeException(Exception):
needed : int
current : int
def __init__(self, needed, current):
self.needed = needed
self.current = current
super().__init__(f'Needed {needed} bytes, but only {current} bytes available.')
# Sekai_Streaming_StreamingCommon__CheckHeader
def decode_buffer_base64(buffer):
def decode_buffer_base64(buffer : bytes) -> tuple[int, bytes]:
'''Decodes the 'RTVL' SSE Message payload into a header signature and data.
Args:
buffer (bytes): Encoded buffer
Raises:
SSEDataLengthOutOfRangeException: If the buffer length does not match the expected length.
Returns:
(int, bytes): Header Signature and decoded data payload
'''
# They really want you to believe it's Base64...
# [4 bytes: RTVL][6 bytes : length in hex][Base64 T/F][Split T/F][3 bytes: Signature][data, Base64]
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 15 bytes
# Sekai_Streaming_StreamingCommon__cctor
SYM_HEADER = b'RTVL'
SYM_TRUE = b'T'
SYM_FALSE = b'F'
Expand All @@ -23,12 +41,22 @@ def decode_buffer_base64(buffer):
is_split = stream.read(1) == SYM_TRUE
header_signature = int(stream.read(3).decode(), 10)
data = stream.read()
assert len(data) + 15 == encoded_length, 'bad length. packet may be corrupted'
current_length = len(data) + 15
if current_length != encoded_length:
raise SSEDataLengthOutOfRangeException(encoded_length, current_length)
if is_base64_encoded:
data = b64decode(data)
return header_signature, data
# Sekai_Streaming_SubscribeDecoder__Deserialize
def decode_buffer_payload(buffer):
def decode_buffer_payload(buffer : bytes) -> tuple[int, bytes]:
'''Decodes the decoded data payload (in binary form) into a decoder signature and payload data.
Args:
buffer (bytes): Decoded buffer
Returns:
(int, bytes): Decoder signature and decoded payload data. The signature should match the Header Signature.
'''
stream = BytesIO(buffer)
decoder_signature = read_int(stream, 1)
unk1 = read_int(stream, 4)
Expand All @@ -40,7 +68,21 @@ def decode_buffer_payload(buffer):
return decoder_signature, payload
return decoder_signature, stream.read()
# Sekai_Streaming_StreamingData__Deserialize
def decode_streaming_data(version, decoder_signature, buffer, strict=True):
def decode_streaming_data(version : tuple, decoder_signature, buffer, strict=True) -> dict:
'''Decodes the streaming data payload into a dictionary.
Args:
version (tuple): RLA version. i.e. one of (1,0), (1,1), (1,2), (1,3), (1,4)
decoder_signature (int): Decoder signature
buffer (bytes): Decoded payload data
strict (bool, optional): If False, incomplete packets will be returned as is. Defaults to True.
Raises:
Exception: If the packet is incomplete and strict is True.
Returns:
dict: Parsed streaming data payload
'''
stream = BytesIO(buffer)
n_mask_offset = read_int(stream, 4)
n_init_pos = stream.tell()
Expand Down

0 comments on commit f0423d7

Please sign in to comment.