Compare commits
2 Commits
d17af01e2f
...
28901649dc
Author | SHA1 | Date |
---|---|---|
MasterofJOKers | 28901649dc | |
MasterofJOKers | 0c2dd51ac1 |
|
@ -1,4 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
from datetime import datetime
|
||||||
|
import inspect
|
||||||
|
from pathlib import Path
|
||||||
import subprocess
|
import subprocess
|
||||||
from typing import Dict, List, Optional, Union
|
from typing import Dict, List, Optional, Union
|
||||||
|
|
||||||
|
@ -7,9 +10,25 @@ import pyudev # type: ignore
|
||||||
|
|
||||||
|
|
||||||
class KnownDevice():
|
class KnownDevice():
|
||||||
"""Base class for implementing any device"""
|
"""Base class for implementing any device
|
||||||
|
|
||||||
|
Any class inheriting from this should implement some handle_{action} methods to take any actions. The {action} part
|
||||||
|
is the device action the class wants to handle.
|
||||||
|
"""
|
||||||
# a map of udev-provided attributes and their values this device should match to
|
# a map of udev-provided attributes and their values this device should match to
|
||||||
UDEV_PROPS: Dict[str, str] = {}
|
UDEV_PROPS: Dict[str, str] = {}
|
||||||
|
HANDLERS: Dict[str, List['KnownDevice']] = {}
|
||||||
|
|
||||||
|
def __init_subclass__(cls, /, **kwargs):
|
||||||
|
super().__init_subclass__(**kwargs)
|
||||||
|
for name, meth in inspect.getmembers(cls, lambda x: inspect.isfunction(x) or inspect.ismethod(x)):
|
||||||
|
if not name.startswith('handle_'):
|
||||||
|
continue
|
||||||
|
if getattr(KnownDevice, name, None) == meth:
|
||||||
|
# We do not implement anything that's inheritable _and_ usable in the base class
|
||||||
|
continue
|
||||||
|
action = name.split('_', 1)[1]
|
||||||
|
KnownDevice.HANDLERS.setdefault(action, []).append(cls)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def check_available() -> bool:
|
def check_available() -> bool:
|
||||||
|
@ -20,17 +39,13 @@ class KnownDevice():
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
def handle_startup() -> None:
|
||||||
"""Handle a device being plugged or available
|
"""Special, non-udev-action method called on startup if check_available returns True"""
|
||||||
|
|
||||||
This method will be called on finding an device to be bound by udev with the `device` argument. It will also be
|
|
||||||
called on startup without the device argument.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class UsbHidKeyboardMouse(KnownDevice):
|
class UsbHidKeyboardMouse(KnownDevice):
|
||||||
"""Represent the mechanical thinkpack-like keyboard's trackpoint"""
|
"""Represent the mechanical thinkpad-like keyboard's trackpoint"""
|
||||||
UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'}
|
UDEV_PROPS = {'ID_MODEL': 'USB-HID_Keyboard'}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -39,8 +54,16 @@ class UsbHidKeyboardMouse(KnownDevice):
|
||||||
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
||||||
return cp.returncode == 0
|
return cp.returncode == 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def handle_startup(cls) -> None:
|
||||||
|
cls.act()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def handle_bind(cls, device: pyudev.Device) -> None:
|
||||||
|
cls.act()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
def act() -> None:
|
||||||
subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \
|
subcmd = "xinput | grep pointer | grep 'USB-HID Keyboard Mouse' | " \
|
||||||
r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'"
|
r"sed -r 's/.*[[:space:]]id=([[:digit:]]+)[[:space:]].*/\1/'"
|
||||||
cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1"
|
cmd: Union[str, List[str]] = f"xinput set-prop $({subcmd}) 'libinput Middle Emulation Enabled' 1"
|
||||||
|
@ -60,21 +83,84 @@ class UsbLogitechG930(KnownDevice):
|
||||||
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
||||||
return cp.returncode == 0
|
return cp.returncode == 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def handle_startup(cls) -> None:
|
||||||
|
cls.act()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def handle_bind(cls, device: pyudev.Device) -> None:
|
||||||
|
cls.act()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle(device: Optional[pyudev.Device] = None) -> None:
|
def act() -> None:
|
||||||
cmd = ['/home/joker/scripting/remap_headset_keys.sh']
|
cmd = ['/home/joker/scripting/remap_headset_keys.sh']
|
||||||
subprocess.run(cmd)
|
subprocess.run(cmd)
|
||||||
print("Handled Logitech G930 Headset")
|
print("Handled Logitech G930 Headset")
|
||||||
|
|
||||||
|
|
||||||
|
class CanonLIDE400(KnownDevice):
|
||||||
|
"""Represents our 2023-bought Canon flatbed scanner"""
|
||||||
|
UDEV_PROPS = {'PRODUCT': '4a9/1912/100'}
|
||||||
|
|
||||||
|
__process = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def check_available() -> bool:
|
||||||
|
cmd = "lsusb | grep 'LiDE 400'"
|
||||||
|
cp = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL)
|
||||||
|
return cp.returncode == 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def handle_bind(device: Optional[pyudev.Device] = None) -> None:
|
||||||
|
if CanonLIDE400.__process is not None:
|
||||||
|
# in theory, uplugging the Scanner should have terminated/killed the process, but if something went wrong,
|
||||||
|
# we just try to kill it and proceed
|
||||||
|
CanonLIDE400.__process.kill()
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
directory = Path('/srv/paperless/later/scanned/') / now.strftime('%Y-%m-%d')
|
||||||
|
# directory = Path('/home/joker/tmp/scanned/') / now.strftime('%Y-%m-%d')
|
||||||
|
directory.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
# TODO read config from ~/.config/udev-device-plug-handler/
|
||||||
|
# How should that look like? Directory list of arguments? Would be easiest and we just default to something.
|
||||||
|
# Could be nice to read the directory from there, too.
|
||||||
|
cmd = [
|
||||||
|
'scanimage',
|
||||||
|
'-d', 'pixma:04A91912_4B9054',
|
||||||
|
'-l', '0mm',
|
||||||
|
'-t', '0mm',
|
||||||
|
'-x', '210mm',
|
||||||
|
'-y', '297mm',
|
||||||
|
'--mode=Color',
|
||||||
|
'--resolution', '300dpi',
|
||||||
|
'--button-controlled=yes',
|
||||||
|
# '-p', # progress
|
||||||
|
f"--batch=scan_{now.strftime('%H%M%S')}_%04d.pnm"
|
||||||
|
]
|
||||||
|
CanonLIDE400.__process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, cwd=directory)
|
||||||
|
print("Handled Canon LiDE 400")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def handle_remove(device: pyudev.Device) -> None:
|
||||||
|
if CanonLIDE400.__process is not None:
|
||||||
|
p = CanonLIDE400.__process
|
||||||
|
p.terminate()
|
||||||
|
try:
|
||||||
|
p.wait(timeout=3)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
p.kill()
|
||||||
|
print("Handled Canon LiDE 400 unplug")
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
@click.command()
|
||||||
@click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.')
|
@click.option('--debug', is_flag=True, help='Show all device changes instead of only handled.')
|
||||||
def main(debug: bool) -> None:
|
def main(debug: bool) -> None:
|
||||||
# do all actions at startup for devices we might have missed
|
# do all actions at startup for devices we might have missed
|
||||||
for known_device in KnownDevice.__subclasses__():
|
for known_device in KnownDevice.HANDLERS.get('startup', []):
|
||||||
if not known_device.check_available():
|
if not known_device.check_available():
|
||||||
continue
|
continue
|
||||||
known_device.handle()
|
known_device.handle_startup()
|
||||||
|
|
||||||
context = pyudev.Context()
|
context = pyudev.Context()
|
||||||
monitor = pyudev.Monitor.from_netlink(context)
|
monitor = pyudev.Monitor.from_netlink(context)
|
||||||
|
@ -83,14 +169,11 @@ def main(debug: bool) -> None:
|
||||||
|
|
||||||
device: pyudev.Device
|
device: pyudev.Device
|
||||||
for device in iter(monitor.poll, None):
|
for device in iter(monitor.poll, None):
|
||||||
if device.action != 'bind':
|
for known_device in KnownDevice.HANDLERS.get(device.action, []):
|
||||||
continue
|
|
||||||
|
|
||||||
for known_device in KnownDevice.__subclasses__():
|
|
||||||
if not all(device.properties.get(prop) == prop_value
|
if not all(device.properties.get(prop) == prop_value
|
||||||
for prop, prop_value in known_device.UDEV_PROPS.items()):
|
for prop, prop_value in known_device.UDEV_PROPS.items()):
|
||||||
continue
|
continue
|
||||||
known_device.handle(device=device)
|
getattr(known_device, f"handle_{device.action}")(device=device)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if debug:
|
if debug:
|
||||||
|
|
Loading…
Reference in New Issue