diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 21492b127..b85a9a518 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -15,11 +15,16 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# Handles incoming events from the receiver/devices, updating the object as appropriate. +"""Handles incoming events from the receiver/devices, updating the +object as appropriate. +""" + +from __future__ import annotations import logging import struct import threading +import typing from solaar.i18n import _ @@ -36,6 +41,11 @@ from .common import Notification from .hidpp10_constants import Registers +if typing.TYPE_CHECKING: + from .base import HIDPPNotification + from .receiver import Receiver + + logger = logging.getLogger(__name__) _hidpp10 = hidpp10.Hidpp10() @@ -55,19 +65,31 @@ def process(device, notification): return _process_device_notification(device, notification) -def _process_receiver_notification(receiver, n): +def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: # supposedly only 0x4x notifications arrive for the receiver - assert n.sub_id & 0x40 == 0x40 - - if n.sub_id == Notification.PAIRING_LOCK: - receiver.pairing.lock_open = bool(n.address & 0x01) + assert hidpp_notification.sub_id in [ + Notification.CONNECT_DISCONNECT, + Notification.DJ_PAIRING, + Notification.CONNECTED, + Notification.RAW_INPUT, + Notification.PAIRING_LOCK, + Notification.POWER, + Registers.DEVICE_DISCOVERY_NOTIFICATION, + Registers.DISCOVERY_STATUS_NOTIFICATION, + Registers.PAIRING_STATUS_NOTIFICATION, + Registers.PASSKEY_PRESSED_NOTIFICATION, + Registers.PASSKEY_REQUEST_NOTIFICATION, + ] + + if hidpp_notification.sub_id == Notification.PAIRING_LOCK: + receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01) reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) receiver.pairing.error = None if receiver.pairing.lock_open: receiver.pairing.new_device = None - pair_error = ord(n.data[:1]) + pair_error = ord(hidpp_notification.data[:1]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error] receiver.pairing.new_device = None @@ -75,9 +97,9 @@ def _process_receiver_notification(receiver, n): receiver.changed(reason=reason) return True - elif n.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing + elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing with notification_lock: - receiver.pairing.discovering = n.address == 0x00 + receiver.pairing.discovering = hidpp_notification.address == 0x00 reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -86,33 +108,33 @@ def _process_receiver_notification(receiver, n): receiver.pairing.counter = receiver.pairing.device_address = None receiver.pairing.device_authentication = receiver.pairing.device_name = None receiver.pairing.device_passkey = None - discover_error = ord(n.data[:1]) + discover_error = ord(hidpp_notification.data[:1]) if discover_error: receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error] logger.warning("bolt discovering error %d: %s", discover_error, discover_string) receiver.changed(reason=reason) return True - elif n.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing + elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing with notification_lock: - counter = n.address + n.data[0] * 256 # notification counter + counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter if receiver.pairing.counter is None: receiver.pairing.counter = counter else: if not receiver.pairing.counter == counter: return None - if n.data[1] == 0: - receiver.pairing.device_kind = n.data[3] - receiver.pairing.device_address = n.data[6:12] - receiver.pairing.device_authentication = n.data[14] - elif n.data[1] == 1: - receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8") + if hidpp_notification.data[1] == 0: + receiver.pairing.device_kind = hidpp_notification.data[3] + receiver.pairing.device_address = hidpp_notification.data[6:12] + receiver.pairing.device_authentication = hidpp_notification.data[14] + elif hidpp_notification.data[1] == 1: + receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8") return True - elif n.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing + elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing with notification_lock: receiver.pairing.device_passkey = None - receiver.pairing.lock_open = n.address == 0x00 + receiver.pairing.lock_open = hidpp_notification.address == 0x00 reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -122,11 +144,11 @@ def _process_receiver_notification(receiver, n): receiver.pairing.device_address = None receiver.pairing.device_authentication = None receiver.pairing.device_name = None - pair_error = n.data[0] + pair_error = hidpp_notification.data[0] if receiver.pairing.lock_open: receiver.pairing.new_device = None - elif n.address == 0x02 and not pair_error: - receiver.pairing.new_device = receiver.register_new_device(n.data[7]) + elif hidpp_notification.address == 0x02 and not pair_error: + receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error] receiver.pairing.new_device = None @@ -134,15 +156,15 @@ def _process_receiver_notification(receiver, n): receiver.changed(reason=reason) return True - elif n.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing + elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing with notification_lock: - receiver.pairing.device_passkey = n.data[0:6].decode("utf-8") + receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8") return True - elif n.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing + elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing return True - logger.warning("%s: unhandled notification %s", receiver, n) + logger.warning("%s: unhandled notification %s", receiver, hidpp_notification) def _process_device_notification(device, n): diff --git a/tests/logitech_receiver/test_notifications.py b/tests/logitech_receiver/test_notifications.py new file mode 100644 index 000000000..84badb7bc --- /dev/null +++ b/tests/logitech_receiver/test_notifications.py @@ -0,0 +1,39 @@ +import pytest + +from logitech_receiver import hidpp10_constants +from logitech_receiver import notifications +from logitech_receiver.base import HIDPPNotification +from logitech_receiver.hidpp10_constants import Registers +from logitech_receiver.receiver import Receiver + + +class MockLowLevelInterface: + def open_path(self, path): + pass + + def find_paired_node_wpid(self, receiver_path: str, index: int): + pass + + def ping(self, handle, number, long_message=False): + pass + + def request(self, handle, devnumber, request_id, *params, **kwargs): + pass + + +@pytest.mark.parametrize( + "sub_id, notification_data, expected_error, expected_new_device", + [ + (Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", "device_timeout", None), + (Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", "failed", None), + ], +) +def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device): + receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None) + notification = HIDPPNotification(0, 0, sub_id, 0, notification_data) + + result = notifications._process_receiver_notification(receiver, notification) + + assert result + assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS[expected_error] + assert receiver.pairing.new_device is expected_new_device