Skip to content

Commit

Permalink
Refactor: Introduce Feature enum
Browse files Browse the repository at this point in the history
Convert Feature NamedInts to SupportedFeature integer enum.

Related pwr-Solaar#2273
  • Loading branch information
rloutrel authored and MattHag committed Oct 14, 2024
1 parent 0cd9c0c commit afe4d85
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 27 deletions.
76 changes: 49 additions & 27 deletions lib/logitech_receiver/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

# Handles incoming events from the receiver/devices, updating the object as appropriate.
"""Handles incoming events from the receiver/devices, updating the
object as appropriate.
"""

from __future__ import annotations

import logging
import struct
import threading
import typing

from solaar.i18n import _

Expand All @@ -36,6 +41,11 @@
from .common import Notification
from .hidpp10_constants import Registers

if typing.TYPE_CHECKING:
from .base import HIDPPNotification
from .receiver import Receiver


logger = logging.getLogger(__name__)

_hidpp10 = hidpp10.Hidpp10()
Expand All @@ -55,29 +65,41 @@ def process(device, notification):
return _process_device_notification(device, notification)


def _process_receiver_notification(receiver, n):
def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
# supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40

if n.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(n.address & 0x01)
assert hidpp_notification.sub_id in [
Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING,
Notification.CONNECTED,
Notification.RAW_INPUT,
Notification.PAIRING_LOCK,
Notification.POWER,
Registers.DEVICE_DISCOVERY_NOTIFICATION,
Registers.DISCOVERY_STATUS_NOTIFICATION,
Registers.PAIRING_STATUS_NOTIFICATION,
Registers.PASSKEY_PRESSED_NOTIFICATION,
Registers.PASSKEY_REQUEST_NOTIFICATION,
]

if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(n.data[:1])
pair_error = ord(hidpp_notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = n.address == 0x00
receiver.pairing.discovering = hidpp_notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -86,33 +108,33 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(n.data[:1])
discover_error = ord(hidpp_notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = n.address + n.data[0] * 256 # notification counter
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if n.data[1] == 0:
receiver.pairing.device_kind = n.data[3]
receiver.pairing.device_address = n.data[6:12]
receiver.pairing.device_authentication = n.data[14]
elif n.data[1] == 1:
receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
return True

elif n.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = n.address == 0x00
receiver.pairing.lock_open = hidpp_notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -122,27 +144,27 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = n.data[0]
pair_error = hidpp_notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif n.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = n.data[0:6].decode("utf-8")
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
return True

elif n.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True

logger.warning("%s: unhandled notification %s", receiver, n)
logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)


def _process_device_notification(device, n):
Expand Down
39 changes: 39 additions & 0 deletions tests/logitech_receiver/test_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest

from logitech_receiver import hidpp10_constants
from logitech_receiver import notifications
from logitech_receiver.base import HIDPPNotification
from logitech_receiver.hidpp10_constants import Registers
from logitech_receiver.receiver import Receiver


class MockLowLevelInterface:
def open_path(self, path):
pass

def find_paired_node_wpid(self, receiver_path: str, index: int):
pass

def ping(self, handle, number, long_message=False):
pass

def request(self, handle, devnumber, request_id, *params, **kwargs):
pass


@pytest.mark.parametrize(
"sub_id, notification_data, expected_error, expected_new_device",
[
(Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", "device_timeout", None),
(Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", "failed", None),
],
)
def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device):
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, sub_id, 0, notification_data)

result = notifications._process_receiver_notification(receiver, notification)

assert result
assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS[expected_error]
assert receiver.pairing.new_device is expected_new_device

0 comments on commit afe4d85

Please sign in to comment.