Compare commits
	
		
			2 Commits
		
	
	
		
			d17af01e2f
			...
			28901649dc
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 28901649dc | |
|  | 0c2dd51ac1 | 
|  | @ -1,4 +1,7 @@ | |||
| #!/usr/bin/env python3 | ||||
| from datetime import datetime | ||||
| import inspect | ||||
| from pathlib import Path | ||||
| import subprocess | ||||
| from typing import Dict, List, Optional, Union | ||||
| 
 | ||||
|  | @ -7,9 +10,25 @@ import pyudev  # type: ignore | |||
| 
 | ||||
| 
 | ||||
| class KnownDevice(): | ||||
|     """Base class for implementing any device""" | ||||
|     """Base class for implementing any device | ||||
| 
 | ||||
|     Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part | ||||
|     is the device action the class wants to handle. | ||||
|     """ | ||||
|     # a map of udev-provided attributes and their values this device should match to | ||||
|     UDEV_PROPS: Dict[str, str] = {} | ||||
|     HANDLERS: Dict[str, List['KnownDevice']] = {} | ||||
| 
 | ||||
|     def __init_subclass__(cls, /, **kwargs): | ||||
|         super().__init_subclass__(**kwargs) | ||||
|         for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)): | ||||
|             if not name.startswith('handle_'): | ||||
|                 continue | ||||
|             if getattr(KnownDevice, name, None) == meth: | ||||
|                 # We do not implement anything that's inheritable _and_ usable in the base class | ||||
|                 continue | ||||
|             action = name.split('_', 1)[1] | ||||
|             KnownDevice.HANDLERS.setdefault(action, []).append(cls) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def check_available() -> bool: | ||||
|  | @ -20,17 +39,13 @@ class KnownDevice(): | |||
|         raise NotImplementedError | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def handle(device: Optional[pyudev.Device] = None) -> None: | ||||
|         """Handle a device being plugged or available | ||||
| 
 | ||||
|         This method will be called on finding an device to be bound by udev with the `device` argument. It will also be | ||||
|         called on startup without the device argument. | ||||
|         """ | ||||
|     def handle_startup() -> None: | ||||
|         """Special, non-udev-action method called on startup if check_available returns True""" | ||||
|         raise NotImplementedError | ||||
| 
 | ||||
| 
 | ||||
| class UsbHidKeyboardMouse(KnownDevice): | ||||
|     """Represent the mechanical thinkpack-like keyboard's trackpoint""" | ||||
|     """Represent the mechanical thinkpad-like keyboard's trackpoint""" | ||||
|     UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'} | ||||
| 
 | ||||
|     @staticmethod | ||||
|  | @ -39,8 +54,16 @@ class UsbHidKeyboardMouse(KnownDevice): | |||
|         cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) | ||||
|         return cp.returncode == 0 | ||||
| 
 | ||||
|     @classmethod | ||||
|     def handle_startup(cls) -> None: | ||||
|         cls.act() | ||||
| 
 | ||||
|     @classmethod | ||||
|     def handle_bind(cls, device: pyudev.Device) -> None: | ||||
|         cls.act() | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def handle(device: Optional[pyudev.Device] = None) -> None: | ||||
|     def act() -> None: | ||||
|         subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \ | ||||
|                  r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'" | ||||
|         cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1" | ||||
|  | @ -60,21 +83,84 @@ class UsbLogitechG930(KnownDevice): | |||
|         cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) | ||||
|         return cp.returncode == 0 | ||||
| 
 | ||||
|     @classmethod | ||||
|     def handle_startup(cls) -> None: | ||||
|         cls.act() | ||||
| 
 | ||||
|     @classmethod | ||||
|     def handle_bind(cls, device: pyudev.Device) -> None: | ||||
|         cls.act() | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def handle(device: Optional[pyudev.Device] = None) -> None: | ||||
|     def act() -> None: | ||||
|         cmd = ['/home/joker/scripting/remap_headset_keys.sh'] | ||||
|         subprocess.run(cmd) | ||||
|         print("Handled Logitech G930 Headset") | ||||
| 
 | ||||
| 
 | ||||
| class CanonLIDE400(KnownDevice): | ||||
|     """Represents our 2023-bought Canon flatbed scanner""" | ||||
|     UDEV_PROPS = {'PRODUCT': '4a9/1912/100'} | ||||
| 
 | ||||
|     __process = None | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def check_available() -> bool: | ||||
|         cmd = "lsusb | grep 'LiDE 400'" | ||||
|         cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) | ||||
|         return cp.returncode == 0 | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def handle_bind(device: Optional[pyudev.Device] = None) -> None: | ||||
|         if CanonLIDE400.__process is not None: | ||||
|             # in theory, uplugging the Scanner should have terminated/killed the process, but if something went wrong, | ||||
|             # we just try to kill it and proceed | ||||
|             CanonLIDE400.__process.kill() | ||||
| 
 | ||||
|         now = datetime.now() | ||||
|         directory = Path('/srv/paperless/later/scanned/') / now.strftime('%Y-%m-%d') | ||||
|         # directory = Path('/home/joker/tmp/scanned/') / now.strftime('%Y-%m-%d') | ||||
|         directory.mkdir(exist_ok=True) | ||||
| 
 | ||||
|         # TODO read config from ~/.config/udev-device-plug-handler/ | ||||
|         # How should that look like? Directory list of arguments? Would be easiest and we just default to something. | ||||
|         # Could be nice to read the directory from there, too. | ||||
|         cmd = [ | ||||
|             'scanimage', | ||||
|             '-d', 'pixma:04A91912_4B9054', | ||||
|             '-l', '0mm', | ||||
|             '-t', '0mm', | ||||
|             '-x', '210mm', | ||||
|             '-y', '297mm', | ||||
|             '--mode=Color', | ||||
|             '--resolution', '300dpi', | ||||
|             '--button-controlled=yes', | ||||
|             # '-p',  # progress | ||||
|             f"--batch=scan_{now.strftime('%H%M%S')}_%04d.pnm" | ||||
|         ] | ||||
|         CanonLIDE400.__process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, cwd=directory) | ||||
|         print("Handled Canon LiDE 400") | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def handle_remove(device: pyudev.Device) -> None: | ||||
|         if CanonLIDE400.__process is not None: | ||||
|             p = CanonLIDE400.__process | ||||
|             p.terminate() | ||||
|             try: | ||||
|                 p.wait(timeout=3) | ||||
|             except subprocess.TimeoutExpired: | ||||
|                 p.kill() | ||||
|         print("Handled Canon LiDE 400 unplug") | ||||
| 
 | ||||
| 
 | ||||
| @click.command() | ||||
| @click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.') | ||||
| def main(debug: bool) -> None: | ||||
|     # do all actions at startup for devices we might have missed | ||||
|     for known_device in KnownDevice.__subclasses__(): | ||||
|     for known_device in KnownDevice.HANDLERS.get('startup', []): | ||||
|         if not known_device.check_available(): | ||||
|             continue | ||||
|         known_device.handle() | ||||
|         known_device.handle_startup() | ||||
| 
 | ||||
|     context = pyudev.Context() | ||||
|     monitor = pyudev.Monitor.from_netlink(context) | ||||
|  | @ -83,14 +169,11 @@ def main(debug: bool) -> None: | |||
| 
 | ||||
|     device: pyudev.Device | ||||
|     for device in iter(monitor.poll, None): | ||||
|         if device.action != 'bind': | ||||
|             continue | ||||
| 
 | ||||
|         for known_device in KnownDevice.__subclasses__(): | ||||
|         for known_device in KnownDevice.HANDLERS.get(device.action, []): | ||||
|             if not all(device.properties.get(prop) == prop_value | ||||
|                        for prop, prop_value in known_device.UDEV_PROPS.items()): | ||||
|                 continue | ||||
|             known_device.handle(device=device) | ||||
|             getattr(known_device, f"handle_{device.action}")(device=device) | ||||
|             break | ||||
|         else: | ||||
|             if debug: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue