#!/usr/bin/env python3 from datetime import datetime import inspect from pathlib import Path import subprocess from typing import Dict, List, Optional, Union import click import pyudev # type: ignore class KnownDevice(): """Base class for implementing any device Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part is the device action the class wants to handle. """ # a map of udev-provided attributes and their values this device should match to UDEV_PROPS: Dict[str, str] = {} HANDLERS: Dict[str, List['KnownDevice']] = {} def __init_subclass__(cls, /, **kwargs): super().__init_subclass__(**kwargs) for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)): if not name.startswith('handle_'): continue if getattr(KnownDevice, name, None) == meth: # We do not implement anything that's inheritable _and_ usable in the base class continue action = name.split('_', 1)[1] KnownDevice.HANDLERS.setdefault(action, []).append(cls) @staticmethod def check_available() -> bool: """Check if that device is connected. This method will be run on startup to check if we need to handle the device. """ raise NotImplementedError @staticmethod def handle_startup() -> None: """Special, non-udev-action method called on startup if check_available returns True""" raise NotImplementedError class UsbHidKeyboardMouse(KnownDevice): """Represent the mechanical thinkpad-like keyboard's trackpoint""" UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'} @staticmethod def check_available() -> bool: cmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse'" cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) return cp.returncode == 0 @classmethod def handle_startup(cls) -> None: cls.act() @classmethod def handle_bind(cls, device: pyudev.Device) -> None: cls.act() @staticmethod def act() -> None: subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \ r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'" cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1" subprocess.run(cmd, shell=True) cmd = ['xset', 'r', 'rate', '195', '25'] subprocess.run(cmd) print("Handled USB-HID_Keyboard") class UsbLogitechG930(KnownDevice): """Represent the Logitech G930 headset's keys""" UDEV_PROPS = {'ID_MODEL': 'Logitech_G930_Headset'} @staticmethod def check_available() -> bool: cmd = "xinput | grep keyboard | grep 'Logitech G930 Headset'" cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) return cp.returncode == 0 @classmethod def handle_startup(cls) -> None: cls.act() @classmethod def handle_bind(cls, device: pyudev.Device) -> None: cls.act() @staticmethod def act() -> None: cmd = ['/home/joker/scripting/remap_headset_keys.sh'] subprocess.run(cmd) print("Handled Logitech G930 Headset") class CanonLIDE400(KnownDevice): """Represents our 2023-bought Canon flatbed scanner""" UDEV_PROPS = {'PRODUCT': '4a9/1912/100'} __process = None @staticmethod def check_available() -> bool: cmd = "lsusb | grep 'LiDE 400'" cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) return cp.returncode == 0 @staticmethod def handle_bind(device: Optional[pyudev.Device] = None) -> None: if CanonLIDE400.__process is not None: # in theory, uplugging the Scanner should have terminated/killed the process, but if something went wrong, # we just try to kill it and proceed CanonLIDE400.__process.kill() now = datetime.now() directory = Path('/srv/paperless/later/scanned/') / now.strftime('%Y-%m-%d') # directory = Path('/home/joker/tmp/scanned/') / now.strftime('%Y-%m-%d') directory.mkdir(exist_ok=True) # TODO read config from ~/.config/udev-device-plug-handler/ # How should that look like? Directory list of arguments? Would be easiest and we just default to something. # Could be nice to read the directory from there, too. cmd = [ 'scanimage', '-d', 'pixma:04A91912_4B9054', '-l', '0mm', '-t', '0mm', '-x', '210mm', '-y', '297mm', '--mode=Color', '--resolution', '300dpi', '--button-controlled=yes', # '-p', # progress f"--batch=scan_{now.strftime('%H%M%S')}_%04d.pnm" ] CanonLIDE400.__process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, cwd=directory) print("Handled Canon LiDE 400") @staticmethod def handle_remove(device: pyudev.Device) -> None: if CanonLIDE400.__process is not None: p = CanonLIDE400.__process p.terminate() try: p.wait(timeout=3) except subprocess.TimeoutExpired: p.kill() print("Handled Canon LiDE 400 unplug") @click.command() @click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.') def main(debug: bool) -> None: # do all actions at startup for devices we might have missed for known_device in KnownDevice.HANDLERS.get('startup', []): if not known_device.check_available(): continue known_device.handle_startup() context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) monitor.filter_by(subsystem='usb') monitor.start() device: pyudev.Device for device in iter(monitor.poll, None): for known_device in KnownDevice.HANDLERS.get(device.action, []): if not all(device.properties.get(prop) == prop_value for prop, prop_value in known_device.UDEV_PROPS.items()): continue getattr(known_device, f"handle_{device.action}")(device=device) break else: if debug: print(dict(device)) if __name__ == '__main__': import sys try: sys.exit(main()) except KeyboardInterrupt: pass