Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SOME/IP TP #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers =
[options]
zip_safe = true
install_requires =
bitstruct
package_dir =
= src
packages = find:
Expand Down
193 changes: 193 additions & 0 deletions src/someip/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
except ImportError: # pragma: nocover
cached_property = property # type: ignore[misc,assignment]

import bitstruct

import someip.utils


Expand All @@ -24,6 +26,8 @@
SD_SERVICE = 0xFFFF
SD_METHOD = 0x8100
SD_INTERFACE_VERSION = 1
TP_FLAG = 0x20
MAX_PAYLOAD_SIZE = 1400


class ParseError(RuntimeError):
Expand All @@ -38,11 +42,16 @@ class SOMEIPMessageType(enum.IntEnum):
REQUEST = 0
REQUEST_NO_RETURN = 1
NOTIFICATION = 2
TP_REQUEST = 0x20
TP_REQUEST_NO_RETURN = 0x21
TP_NOTIFICATION = 0x22
REQUEST_ACK = 0x40
REQUEST_NO_RETURN_ACK = 0x41
NOTIFICATION_ACK = 0x42
RESPONSE = 0x80
ERROR = 0x81
TP_RESPONSE = 0xA0
TP_ERROR = 0xA1
RESPONSE_ACK = 0xC0
ERROR_ACK = 0xC1

Expand All @@ -61,6 +70,20 @@ class SOMEIPReturnCode(enum.IntEnum):
E_WRONG_MESSAGE_TYPE = 10


class StructLikeBitstruct:
"""
A wrapper around `bitstruct.compile` that provides a similar interface to struct.
"""

def __init__(self, fmt: str):
self._compiled = bitstruct.compile(fmt)
self.format = fmt
self.size = self._compiled.calcsize() // 8

def __getattr__(self, item):
return getattr(self._compiled, item)


def _unpack(fmt, buf):
if len(buf) < fmt.size:
raise IncompleteReadError(
Expand Down Expand Up @@ -210,6 +233,176 @@ def build(self) -> bytes:
return hdr + self.payload


@dataclasses.dataclass(frozen=True)
class SOMEIPTPHeader(SOMEIPHeader):
"""Represents a top-level SOME/IP TP packet (header and payload)."""

__format: typing.ClassVar[struct.Struct] = struct.Struct("!HHIHHBBBB")
TP_STRUCT: typing.ClassVar[StructLikeBitstruct] = StructLikeBitstruct("u28p3b1")

offset: int = 0
more_segments: bool = False

@property
def description(self): # pragma: nocover
return (
f"service: 0x{self.service_id:04x}"
f"method: 0x{self.method_id:04x}"
f"client: 0x{self.client_id:04x}"
f"session: 0x{self.session_id:04x}"
f"protocol: {self.protocol_version}"
f"interface: 0x{self.interface_version:02x}"
f"message: {self.message_type.name}"
f"return code: {self.return_code.name}"
f"offset: {self.offset}"
f"more segments: {self.more_segments}"
f"payload: {len(self.payload)} bytes"
)

def __str__(self): # pragma: nocover
return (
f"service=0x{self.service_id:04x}, "
f"method=0x{self.method_id:04x}, "
f"client=0x{self.client_id:04x}, "
f"session=0x{self.session_id:04x}, "
f"protocol={self.protocol_version}, "
f"interface=0x{self.interface_version:02x}, "
f"message={self.message_type.name}, "
f"return_code={self.return_code.name}, "
f"offset={self.offset}, "
f"more_segments={self.more_segments}, "
f"payload: {len(self.payload)}"
)

@classmethod
def _build_from_buffer(
cls,
buf: bytes,
size: int,
builder: typing.Callable[[bytes, int, bool], SOMEIPTPHeader],
) -> typing.Tuple[SOMEIPTPHeader, bytes]:
"""Helper function to build a SOMEIP packet from a buffer.

:param buf: buffer in which the payload is located.
:param size: size of the SOME/IP packet.
:param builder: callable to build the header from the payload
:return: tuple of :class:`SOMEIPTPHeader` instance and unparsed rest of `buf`.
"""
tp_args, buf = _unpack(cls.TP_STRUCT, buf)
payload_len = size - (8 + cls.TP_STRUCT.size)
payload_b, buf = buf[: payload_len], buf[payload_len:]
header = builder(payload_b, *tp_args)
return header, buf

@classmethod
def _parse_header(
cls, parsed
) -> typing.Tuple[int, typing.Callable[[bytes, int, bool], SOMEIPTPHeader]]:
"""Validate the header fields from `parsed` tuple.

:param parsed: tuple of parsed header fields.
:return: Return the size of the whole SOMEIP packet and a builder function to
create a :class:`SOMEIPTPHeader` instance from the payload.
"""
sid, mid, size, cid, sessid, pv, iv, mt_b, rc_b = parsed
if pv != 1:
raise ParseError(f"bad someip protocol version 0x{pv:02x}, expected 0x01")

try:
if not (mt_b & TP_FLAG):
raise ValueError("TP flag not set in SOMEIP message type")

mt = SOMEIPMessageType(mt_b)
except ValueError as exc:
raise ParseError(f"bad someip message type {mt_b:#x}") from exc

try:
rc = SOMEIPReturnCode(rc_b)
except ValueError as exc:
raise ParseError(f"bad someip return code {rc_b:#x}") from exc

if size < 8:
raise ParseError("SOMEIP length must be at least 8")

return (
size,
lambda payload_b, offset=0, more_segments=False: cls(
service_id=sid,
method_id=mid,
client_id=cid,
session_id=sessid,
protocol_version=pv,
interface_version=iv,
message_type=mt,
return_code=rc,
offset=offset,
more_segments=more_segments,
payload=payload_b,
),
)

@classmethod
def parse(cls, buf: bytes) -> typing.Tuple[SOMEIPTPHeader, bytes]:
"""Parse SOME/IP packet in `buf`.

:param buf: buffer containing SOME/IP packet.
:raises IncompleteReadError: if the buffer did not contain enough data to unpack
the SOMEIP packet. Either there was less data than one SOMEIP header length,
or the size in the header was too big
:raises ParseError: if the packet contained invalid data, such as an unknown
message type or return code
:return: tuple (S, B) where S is the parsed :class:`SOMEIPTPHeader` instance
and B is the unparsed rest of `buf`.
"""
parsed, buf_rest = _unpack(cls.__format, buf)
size, builder = cls._parse_header(parsed)
if len(buf_rest) < size - 8:
raise IncompleteReadError(
f"packet too short, expected {size+4}, got {len(buf)}"
)

header, buf_rest = cls._build_from_buffer(buf_rest, size, builder)
return header, buf_rest

def build(self) -> bytes:
"""Build the byte representation of this SOMEIP packet.

:raises struct.error: if any attribute was out of range for serialization
:return: the byte representation
"""
size = len(self.payload) + 8 + self.TP_STRUCT.size
tp_hdr = self.TP_STRUCT.pack(self.offset, self.more_segments)

hdr = self.__format.pack(
self.service_id,
self.method_id,
size,
self.client_id,
self.session_id,
self.protocol_version,
self.interface_version,
self.message_type.value,
self.return_code.value,
)
return hdr + tp_hdr + self.payload


def header_parser(
buf: bytes,
) -> typing.Tuple[typing.Union[SOMEIPHeader, SOMEIPTPHeader], bytes]:
"""
Parse data and return either `SOMEIPHeader` or `SOMEIPTPHeader` and the rest of
the buffer.

:param buf: Buffer containing SOME/IP or SOME/IP TP packet.
:return: tuple of parsed header and unparsed buffer.
"""
try:
return SOMEIPTPHeader.parse(buf)
except ParseError:
return SOMEIPHeader.parse(buf)


class SOMEIPReader:
"""
Wrapper class around :class:`asyncio.StreamReader` that returns parsed
Expand Down
103 changes: 101 additions & 2 deletions src/someip/sd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import someip.header
import someip.config
from someip.config import _T_SOCKNAME as _T_SOCKADDR
from someip.header import MAX_PAYLOAD_SIZE, SOMEIPTPHeader, TP_FLAG, header_parser
from someip.utils import log_exceptions, wait_cancelled

LOG = logging.getLogger("someip.sd")
Expand Down Expand Up @@ -49,6 +50,39 @@ def format_address(addr: _T_SOCKADDR) -> str:
raise NotImplementedError(f"unknown ip address format: {addr!r} -> {ip!r}")


def segment_msg(
msg: someip.header.SOMEIPHeader,
max_payload_size: int = MAX_PAYLOAD_SIZE,
) -> typing.Iterator[SOMEIPTPHeader]:
"""Split a SOME/IP message into multiple SOME/IP-TP messages."""
bytes_sent = 0
offset = 0
max_len = (max_payload_size - SOMEIPTPHeader.TP_STRUCT.size) // 16 * 16
message_type = someip.header.SOMEIPMessageType(msg.message_type.value | TP_FLAG)
data = msg.payload
original_payload_len = len(data)
while data:
payload, data = data[:max_len], data[max_len:]
bytes_sent = bytes_sent + len(payload)
more_segments = bytes_sent < original_payload_len

yield SOMEIPTPHeader(
service_id=msg.service_id,
method_id=msg.method_id,
client_id=msg.client_id,
session_id=msg.session_id,
interface_version=msg.interface_version,
message_type=message_type,
protocol_version=msg.protocol_version,
return_code=msg.return_code,
offset=offset,
more_segments=more_segments,
payload=payload,
)

offset = bytes_sent // 16


class SOMEIPDatagramProtocol:
"""
is actually not a subclass of asyncio.BaseProtocol or asyncio.DatagramProtocol,
Expand Down Expand Up @@ -85,13 +119,70 @@ def __init__(self, logger: str = "someip"):
# default_addr=None means use connected address from socket
self.default_addr: _T_OPT_SOCKADDR = None

# Dict that keeps track of dict of kwargs of SOME/IP messages currently being
# assembled keyed by PDU ID (i.e. a tuple of service_id and method_id).
self._tp_buffer: typing.Dict[
typing.Tuple[int, int], typing.Dict[str, typing.Any]
] = {}

def _assemble_msg(
self, segment: SOMEIPTPHeader,
) -> typing.Union[None, someip.header.SOMEIPHeader]:
"""Assemble SOME/IP message from SOME/IP TP messages.

:param segment: The SOME/IP TP message to process.
:return: The assembled SOME/IP message if all segments were received, None
otherwise.
"""
header = None
pdu_id = segment.service_id, segment.method_id
if segment.offset == 0:
message_type = someip.header.SOMEIPMessageType(
segment.message_type.value & ~TP_FLAG
)
self._tp_buffer[pdu_id] = {
"service_id": segment.service_id,
"method_id": segment.method_id,
"client_id": segment.client_id,
"session_id": segment.session_id,
"interface_version": segment.interface_version,
"message_type": message_type,
"protocol_version": segment.protocol_version,
"return_code": segment.return_code,
"payload": segment.payload,
}
elif segment.offset > 0:
wip = self._tp_buffer.get(pdu_id)
if wip is not None:
current_offset = len(wip["payload"]) / 16
if segment.offset == current_offset:
wip["payload"] = wip["payload"] + segment.payload
if not segment.more_segments:
del self._tp_buffer[pdu_id]
header = someip.header.SOMEIPHeader(**wip)
else:
self.log.error(
"Received TP segment with wrong offset: %s", segment
)
else:
self.log.error(
"Received TP segment without first segment: %s", segment
)

return header

def datagram_received(self, data, addr: _T_SOCKADDR, multicast: bool) -> None:
try:
while data:
# 4.2.1, TR_SOMEIP_00140 more than one SOMEIP message per UDP frame
# allowed
parsed, data = someip.header.SOMEIPHeader.parse(data)
self.message_received(parsed, addr, multicast)
parsed, data = header_parser(data)
if isinstance(parsed, SOMEIPTPHeader):
parsed = self._assemble_msg(parsed)
if parsed:
self.message_received(parsed, addr, multicast)
else:
self.message_received(parsed, addr, multicast)
except someip.header.ParseError as exc:
self.log.error(
"failed to parse SOME/IP datagram from %s: %r",
Expand Down Expand Up @@ -135,6 +226,14 @@ def send(self, buf: bytes, remote: _T_OPT_SOCKADDR = None):
remote = self.default_addr
self.transport.sendto(buf, remote)

def send_msg(self, msg: someip.header.SOMEIPHeader, remote: _T_OPT_SOCKADDR = None):
"""Send a SOME/IP message or split it into multiple SOME/IP-TP messages."""
if len(msg.payload) <= MAX_PAYLOAD_SIZE:
self.send(msg.build(), remote)
else:
for msg in segment_msg(msg):
self.send(msg.build(), remote)


class DatagramProtocolAdapter(asyncio.DatagramProtocol):
def __init__(self, protocol: SOMEIPDatagramProtocol, is_multicast: bool):
Expand Down
Loading
Loading