udev-device-plug-handler: Support more than bind
There can be cases where we want to support more than "bind" events - e.g. "remove". Therefore, we rewrite how things work in the spirit of the STLs `SimpleHTTPRequestHandler`: any "handle_{action}" method of a `KnownDevice` subclass allows the class to receive events for that action. To not use `getattr()` on every `KnownDevice` on every event, we use `__init_subclass__()` to build a list of `KnownDevice` registered for each `action`.
This commit is contained in:
parent
d17af01e2f
commit
0c2dd51ac1
|
@ -1,4 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
import inspect
|
||||
import subprocess
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
|
@ -7,9 +8,25 @@ import pyudev # type: ignore
|
|||
|
||||
|
||||
class KnownDevice():
|
||||
"""Base class for implementing any device"""
|
||||
"""Base class for implementing any device
|
||||
|
||||
Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part
|
||||
is the device action the class wants to handle.
|
||||
"""
|
||||
# a map of udev-provided attributes and their values this device should match to
|
||||
UDEV_PROPS: Dict[str, str] = {}
|
||||
HANDLERS: Dict[str, List['KnownDevice']] = {}
|
||||
|
||||
def __init_subclass__(cls, /, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)):
|
||||
if not name.startswith('handle_'):
|
||||
continue
|
||||
if getattr(KnownDevice, name, None) == meth:
|
||||
# We do not implement anything that's inheritable _and_ usable in the base class
|
||||
continue
|
||||
action = name.split('_', 1)[1]
|
||||
KnownDevice.HANDLERS.setdefault(action, []).append(cls)
|
||||
|
||||
@staticmethod
|
||||
def check_available() -> bool:
|
||||
|
@ -20,17 +37,13 @@ class KnownDevice():
|
|||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
||||
"""Handle a device being plugged or available
|
||||
|
||||
This method will be called on finding an device to be bound by udev with the `device` argument. It will also be
|
||||
called on startup without the device argument.
|
||||
"""
|
||||
def handle_startup() -> None:
|
||||
"""Special, non-udev-action method called on startup if check_available returns True"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class UsbHidKeyboardMouse(KnownDevice):
|
||||
"""Represent the mechanical thinkpack-like keyboard's trackpoint"""
|
||||
"""Represent the mechanical thinkpad-like keyboard's trackpoint"""
|
||||
UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'}
|
||||
|
||||
@staticmethod
|
||||
|
@ -39,8 +52,16 @@ class UsbHidKeyboardMouse(KnownDevice):
|
|||
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
||||
return cp.returncode == 0
|
||||
|
||||
@classmethod
|
||||
def handle_startup(cls) -> None:
|
||||
cls.act()
|
||||
|
||||
@classmethod
|
||||
def handle_bind(cls, device: pyudev.Device) -> None:
|
||||
cls.act()
|
||||
|
||||
@staticmethod
|
||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
||||
def act() -> None:
|
||||
subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \
|
||||
r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'"
|
||||
cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1"
|
||||
|
@ -60,8 +81,16 @@ class UsbLogitechG930(KnownDevice):
|
|||
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
||||
return cp.returncode == 0
|
||||
|
||||
@classmethod
|
||||
def handle_startup(cls) -> None:
|
||||
cls.act()
|
||||
|
||||
@classmethod
|
||||
def handle_bind(cls, device: pyudev.Device) -> None:
|
||||
cls.act()
|
||||
|
||||
@staticmethod
|
||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
||||
def act() -> None:
|
||||
cmd = ['/home/joker/scripting/remap_headset_keys.sh']
|
||||
subprocess.run(cmd)
|
||||
print("Handled Logitech G930 Headset")
|
||||
|
@ -71,10 +100,10 @@ class UsbLogitechG930(KnownDevice):
|
|||
@click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.')
|
||||
def main(debug: bool) -> None:
|
||||
# do all actions at startup for devices we might have missed
|
||||
for known_device in KnownDevice.__subclasses__():
|
||||
for known_device in KnownDevice.HANDLERS.get('startup', []):
|
||||
if not known_device.check_available():
|
||||
continue
|
||||
known_device.handle()
|
||||
known_device.handle_startup()
|
||||
|
||||
context = pyudev.Context()
|
||||
monitor = pyudev.Monitor.from_netlink(context)
|
||||
|
@ -83,14 +112,11 @@ def main(debug: bool) -> None:
|
|||
|
||||
device: pyudev.Device
|
||||
for device in iter(monitor.poll, None):
|
||||
if device.action != 'bind':
|
||||
continue
|
||||
|
||||
for known_device in KnownDevice.__subclasses__():
|
||||
for known_device in KnownDevice.HANDLERS.get(device.action, []):
|
||||
if not all(device.properties.get(prop) == prop_value
|
||||
for prop, prop_value in known_device.UDEV_PROPS.items()):
|
||||
continue
|
||||
known_device.handle(device=device)
|
||||
getattr(known_device, f"handle_{device.action}")(device=device)
|
||||
break
|
||||
else:
|
||||
if debug:
|
||||
|
|
Loading…
Reference in New Issue