Compare commits

...

2 Commits

Author SHA1 Message Date
MasterofJOKers 28901649dc udev-device-plug-handler: Handle Canon LiDE400
When this scanner is plugged in, we want to start `scanimage` in batch
mode with button support, scanning into a pre-defined directory. Since
plug and unplug might happen multiple times a day and batch mode
overwrites files with the same name, we use date and time to define
directory and filename.
2024-02-03 13:59:22 +01:00
MasterofJOKers 0c2dd51ac1 udev-device-plug-handler: Support more than bind
There can be cases where we want to support more than "bind" events -
e.g. "remove". Therefore, we rewrite how things work in the spirit of
the STLs `SimpleHTTPRequestHandler`: any "handle_{action}" method of a
`KnownDevice` subclass allows the class to receive events for that
action.

To not use `getattr()` on every `KnownDevice` on every event, we use
`__init_subclass__()` to build a list of `KnownDevice` registered for
each `action`.
2024-02-03 13:55:52 +01:00
1 changed files with 100 additions and 17 deletions

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from datetime import datetime
import inspect
from pathlib import Path
import subprocess import subprocess
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
@ -7,9 +10,25 @@ import pyudev # type: ignore
class KnownDevice(): class KnownDevice():
"""Base class for implementing any device""" """Base class for implementing any device
Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part
is the device action the class wants to handle.
"""
# a map of udev-provided attributes and their values this device should match to # a map of udev-provided attributes and their values this device should match to
UDEV_PROPS: Dict[str, str] = {} UDEV_PROPS: Dict[str, str] = {}
HANDLERS: Dict[str, List['KnownDevice']] = {}
def __init_subclass__(cls, /, **kwargs):
super().__init_subclass__(**kwargs)
for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)):
if not name.startswith('handle_'):
continue
if getattr(KnownDevice, name, None) == meth:
# We do not implement anything that's inheritable _and_ usable in the base class
continue
action = name.split('_', 1)[1]
KnownDevice.HANDLERS.setdefault(action, []).append(cls)
@staticmethod @staticmethod
def check_available() -> bool: def check_available() -> bool:
@ -20,17 +39,13 @@ class KnownDevice():
raise NotImplementedError raise NotImplementedError
@staticmethod @staticmethod
def handle(device: Optional[pyudev.Device] = None) -> None: def handle_startup() -> None:
"""Handle a device being plugged or available """Special, non-udev-action method called on startup if check_available returns True"""
This method will be called on finding an device to be bound by udev with the `device` argument. It will also be
called on startup without the device argument.
"""
raise NotImplementedError raise NotImplementedError
class UsbHidKeyboardMouse(KnownDevice): class UsbHidKeyboardMouse(KnownDevice):
"""Represent the mechanical thinkpack-like keyboard's trackpoint""" """Represent the mechanical thinkpad-like keyboard's trackpoint"""
UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'} UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'}
@staticmethod @staticmethod
@ -39,8 +54,16 @@ class UsbHidKeyboardMouse(KnownDevice):
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
return cp.returncode == 0 return cp.returncode == 0
@classmethod
def handle_startup(cls) -> None:
cls.act()
@classmethod
def handle_bind(cls, device: pyudev.Device) -> None:
cls.act()
@staticmethod @staticmethod
def handle(device: Optional[pyudev.Device] = None) -> None: def act() -> None:
subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \ subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \
r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'" r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'"
cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1" cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1"
@ -60,21 +83,84 @@ class UsbLogitechG930(KnownDevice):
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
return cp.returncode == 0 return cp.returncode == 0
@classmethod
def handle_startup(cls) -> None:
cls.act()
@classmethod
def handle_bind(cls, device: pyudev.Device) -> None:
cls.act()
@staticmethod @staticmethod
def handle(device: Optional[pyudev.Device] = None) -> None: def act() -> None:
cmd = ['/home/joker/scripting/remap_headset_keys.sh'] cmd = ['/home/joker/scripting/remap_headset_keys.sh']
subprocess.run(cmd) subprocess.run(cmd)
print("Handled Logitech G930 Headset") print("Handled Logitech G930 Headset")
class CanonLIDE400(KnownDevice):
"""Represents our 2023-bought Canon flatbed scanner"""
UDEV_PROPS = {'PRODUCT': '4a9/1912/100'}
__process = None
@staticmethod
def check_available() -> bool:
cmd = "lsusb | grep 'LiDE 400'"
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
return cp.returncode == 0
@staticmethod
def handle_bind(device: Optional[pyudev.Device] = None) -> None:
if CanonLIDE400.__process is not None:
# in theory, uplugging the Scanner should have terminated/killed the process, but if something went wrong,
# we just try to kill it and proceed
CanonLIDE400.__process.kill()
now = datetime.now()
directory = Path('/srv/paperless/later/scanned/') / now.strftime('%Y-%m-%d')
# directory = Path('/home/joker/tmp/scanned/') / now.strftime('%Y-%m-%d')
directory.mkdir(exist_ok=True)
# TODO read config from ~/.config/udev-device-plug-handler/
# How should that look like? Directory list of arguments? Would be easiest and we just default to something.
# Could be nice to read the directory from there, too.
cmd = [
'scanimage',
'-d', 'pixma:04A91912_4B9054',
'-l', '0mm',
'-t', '0mm',
'-x', '210mm',
'-y', '297mm',
'--mode=Color',
'--resolution', '300dpi',
'--button-controlled=yes',
# '-p', # progress
f"--batch=scan_{now.strftime('%H%M%S')}_%04d.pnm"
]
CanonLIDE400.__process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, cwd=directory)
print("Handled Canon LiDE 400")
@staticmethod
def handle_remove(device: pyudev.Device) -> None:
if CanonLIDE400.__process is not None:
p = CanonLIDE400.__process
p.terminate()
try:
p.wait(timeout=3)
except subprocess.TimeoutExpired:
p.kill()
print("Handled Canon LiDE 400 unplug")
@click.command() @click.command()
@click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.') @click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.')
def main(debug: bool) -> None: def main(debug: bool) -> None:
# do all actions at startup for devices we might have missed # do all actions at startup for devices we might have missed
for known_device in KnownDevice.__subclasses__(): for known_device in KnownDevice.HANDLERS.get('startup', []):
if not known_device.check_available(): if not known_device.check_available():
continue continue
known_device.handle() known_device.handle_startup()
context = pyudev.Context() context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context) monitor = pyudev.Monitor.from_netlink(context)
@ -83,14 +169,11 @@ def main(debug: bool) -> None:
device: pyudev.Device device: pyudev.Device
for device in iter(monitor.poll, None): for device in iter(monitor.poll, None):
if device.action != 'bind': for known_device in KnownDevice.HANDLERS.get(device.action, []):
continue
for known_device in KnownDevice.__subclasses__():
if not all(device.properties.get(prop) == prop_value if not all(device.properties.get(prop) == prop_value
for prop, prop_value in known_device.UDEV_PROPS.items()): for prop, prop_value in known_device.UDEV_PROPS.items()):
continue continue
known_device.handle(device=device) getattr(known_device, f"handle_{device.action}")(device=device)
break break
else: else:
if debug: if debug: