From 0c2dd51ac111f03e2b5f3b04903ed40dc40d30fe Mon Sep 17 00:00:00 2001 From: MasterofJOKers Date: Sat, 3 Feb 2024 13:55:52 +0100 Subject: [PATCH] udev-device-plug-handler: Support more than bind There can be cases where we want to support more than "bind" events - e.g. "remove". Therefore, we rewrite how things work in the spirit of the STLs `SimpleHTTPRequestHandler`: any "handle_{action}" method of a `KnownDevice` subclass allows the class to receive events for that action. To not use `getattr()` on every `KnownDevice` on every event, we use `__init_subclass__()` to build a list of `KnownDevice` registered for each `action`. --- .../udev-device-plug-handler | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/udev-device-plug-handler/udev-device-plug-handler b/udev-device-plug-handler/udev-device-plug-handler index f1a5860..2a84f1d 100644 --- a/udev-device-plug-handler/udev-device-plug-handler +++ b/udev-device-plug-handler/udev-device-plug-handler @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import inspect import subprocess from typing import Dict, List, Optional, Union @@ -7,9 +8,25 @@ import pyudev # type: ignore class KnownDevice(): - """Base class for implementing any device""" + """Base class for implementing any device + + Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part + is the device action the class wants to handle. + """ # a map of udev-provided attributes and their values this device should match to UDEV_PROPS: Dict[str, str] = {} + HANDLERS: Dict[str, List['KnownDevice']] = {} + + def __init_subclass__(cls, /, **kwargs): + super().__init_subclass__(**kwargs) + for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)): + if not name.startswith('handle_'): + continue + if getattr(KnownDevice, name, None) == meth: + # We do not implement anything that's inheritable _and_ usable in the base class + continue + action = name.split('_', 1)[1] + KnownDevice.HANDLERS.setdefault(action, []).append(cls) @staticmethod def check_available() -> bool: @@ -20,17 +37,13 @@ class KnownDevice(): raise NotImplementedError @staticmethod - def handle(device: Optional[pyudev.Device] = None) -> None: - """Handle a device being plugged or available - - This method will be called on finding an device to be bound by udev with the `device` argument. It will also be - called on startup without the device argument. - """ + def handle_startup() -> None: + """Special, non-udev-action method called on startup if check_available returns True""" raise NotImplementedError class UsbHidKeyboardMouse(KnownDevice): - """Represent the mechanical thinkpack-like keyboard's trackpoint""" + """Represent the mechanical thinkpad-like keyboard's trackpoint""" UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'} @staticmethod @@ -39,8 +52,16 @@ class UsbHidKeyboardMouse(KnownDevice): cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) return cp.returncode == 0 + @classmethod + def handle_startup(cls) -> None: + cls.act() + + @classmethod + def handle_bind(cls, device: pyudev.Device) -> None: + cls.act() + @staticmethod - def handle(device: Optional[pyudev.Device] = None) -> None: + def act() -> None: subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \ r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'" cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1" @@ -60,8 +81,16 @@ class UsbLogitechG930(KnownDevice): cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) return cp.returncode == 0 + @classmethod + def handle_startup(cls) -> None: + cls.act() + + @classmethod + def handle_bind(cls, device: pyudev.Device) -> None: + cls.act() + @staticmethod - def handle(device: Optional[pyudev.Device] = None) -> None: + def act() -> None: cmd = ['/home/joker/scripting/remap_headset_keys.sh'] subprocess.run(cmd) print("Handled Logitech G930 Headset") @@ -71,10 +100,10 @@ class UsbLogitechG930(KnownDevice): @click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.') def main(debug: bool) -> None: # do all actions at startup for devices we might have missed - for known_device in KnownDevice.__subclasses__(): + for known_device in KnownDevice.HANDLERS.get('startup', []): if not known_device.check_available(): continue - known_device.handle() + known_device.handle_startup() context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) @@ -83,14 +112,11 @@ def main(debug: bool) -> None: device: pyudev.Device for device in iter(monitor.poll, None): - if device.action != 'bind': - continue - - for known_device in KnownDevice.__subclasses__(): + for known_device in KnownDevice.HANDLERS.get(device.action, []): if not all(device.properties.get(prop) == prop_value for prop, prop_value in known_device.UDEV_PROPS.items()): continue - known_device.handle(device=device) + getattr(known_device, f"handle_{device.action}")(device=device) break else: if debug: