Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notifications: Introduce unit tests #2634

Merged
merged 1 commit into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 49 additions & 27 deletions lib/logitech_receiver/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

# Handles incoming events from the receiver/devices, updating the object as appropriate.
"""Handles incoming events from the receiver/devices, updating the
object as appropriate.
"""

from __future__ import annotations

import logging
import struct
import threading
import typing

from solaar.i18n import _

Expand All @@ -36,6 +41,11 @@
from .common import Notification
from .hidpp10_constants import Registers

if typing.TYPE_CHECKING:
from .base import HIDPPNotification
from .receiver import Receiver


logger = logging.getLogger(__name__)

_hidpp10 = hidpp10.Hidpp10()
Expand All @@ -55,29 +65,41 @@ def process(device, notification):
return _process_device_notification(device, notification)


def _process_receiver_notification(receiver, n):
def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
# supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40

if n.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(n.address & 0x01)
assert hidpp_notification.sub_id in [
Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING,
Notification.CONNECTED,
Notification.RAW_INPUT,
Notification.PAIRING_LOCK,
Notification.POWER,
Registers.DEVICE_DISCOVERY_NOTIFICATION,
Registers.DISCOVERY_STATUS_NOTIFICATION,
Registers.PAIRING_STATUS_NOTIFICATION,
Registers.PASSKEY_PRESSED_NOTIFICATION,
Registers.PASSKEY_REQUEST_NOTIFICATION,
]

if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(n.data[:1])
pair_error = ord(hidpp_notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = n.address == 0x00
receiver.pairing.discovering = hidpp_notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -86,33 +108,33 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(n.data[:1])
discover_error = ord(hidpp_notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = n.address + n.data[0] * 256 # notification counter
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if n.data[1] == 0:
receiver.pairing.device_kind = n.data[3]
receiver.pairing.device_address = n.data[6:12]
receiver.pairing.device_authentication = n.data[14]
elif n.data[1] == 1:
receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
return True

elif n.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = n.address == 0x00
receiver.pairing.lock_open = hidpp_notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -122,27 +144,27 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = n.data[0]
pair_error = hidpp_notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif n.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = n.data[0:6].decode("utf-8")
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
return True

elif n.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True

logger.warning("%s: unhandled notification %s", receiver, n)
logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)


def _process_device_notification(device, n):
Expand Down
39 changes: 39 additions & 0 deletions tests/logitech_receiver/test_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest

from logitech_receiver import hidpp10_constants
from logitech_receiver import notifications
from logitech_receiver.base import HIDPPNotification
from logitech_receiver.hidpp10_constants import Registers
from logitech_receiver.receiver import Receiver


class MockLowLevelInterface:
def open_path(self, path):
pass

def find_paired_node_wpid(self, receiver_path: str, index: int):
pass

def ping(self, handle, number, long_message=False):
pass

def request(self, handle, devnumber, request_id, *params, **kwargs):
pass


@pytest.mark.parametrize(
"sub_id, notification_data, expected_error, expected_new_device",
[
(Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", "device_timeout", None),
(Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", "failed", None),
],
)
def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device):
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, sub_id, 0, notification_data)

result = notifications._process_receiver_notification(receiver, notification)

assert result
assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS[expected_error]
assert receiver.pairing.new_device is expected_new_device
Loading