17f2961c0b
Instead of handling resumed key events in a deep stack, buffer them until the next main loop iteration. New resume events that may be emitted during handling of old resumes are prepended to that buffer, i.e. take precedence over events that happen deeper into the buffer/event stack. Logical replay order is thus preserved.
575 lines
18 KiB
Python
575 lines
18 KiB
Python
try:
|
|
from typing import Callable, Optional, Tuple
|
|
except ImportError:
|
|
pass
|
|
|
|
from supervisor import ticks_ms
|
|
|
|
from collections import namedtuple
|
|
from keypad import Event as KeyEvent
|
|
|
|
from kmk.consts import UnicodeMode
|
|
from kmk.hid import BLEHID, USBHID, AbstractHID, HIDModes
|
|
from kmk.keys import KC, Key
|
|
from kmk.kmktime import ticks_add, ticks_diff
|
|
from kmk.modules import Module
|
|
from kmk.scanners.keypad import MatrixScanner
|
|
from kmk.utils import Debug
|
|
|
|
debug = Debug(__name__)
|
|
|
|
KeyBufferFrame = namedtuple(
|
|
'KeyBufferFrame', ('key', 'is_pressed', 'int_coord', 'index')
|
|
)
|
|
|
|
|
|
class Sandbox:
|
|
matrix_update = None
|
|
secondary_matrix_update = None
|
|
active_layers = None
|
|
|
|
|
|
class KMKKeyboard:
|
|
#####
|
|
# User-configurable
|
|
keymap = []
|
|
coord_mapping = None
|
|
|
|
row_pins = None
|
|
col_pins = None
|
|
diode_orientation = None
|
|
matrix = None
|
|
|
|
unicode_mode = UnicodeMode.NOOP
|
|
|
|
modules = []
|
|
extensions = []
|
|
sandbox = Sandbox()
|
|
|
|
#####
|
|
# Internal State
|
|
keys_pressed = set()
|
|
_coordkeys_pressed = {}
|
|
hid_type = HIDModes.USB
|
|
secondary_hid_type = None
|
|
_hid_helper = None
|
|
_hid_send_enabled = False
|
|
hid_pending = False
|
|
matrix_update = None
|
|
secondary_matrix_update = None
|
|
matrix_update_queue = []
|
|
state_changed = False
|
|
_trigger_powersave_enable = False
|
|
_trigger_powersave_disable = False
|
|
i2c_deinit_count = 0
|
|
_go_args = None
|
|
_processing_timeouts = False
|
|
_resume_buffer = []
|
|
_resume_buffer_x = []
|
|
|
|
# this should almost always be PREpended to, replaces
|
|
# former use of reversed_active_layers which had pointless
|
|
# overhead (the underlying list was never used anyway)
|
|
active_layers = [0]
|
|
|
|
_timeouts = {}
|
|
|
|
# on some M4 setups (such as klardotsh/klarank_feather_m4, CircuitPython
|
|
# 6.0rc1) this runs out of RAM every cycle and takes down the board. no
|
|
# real known fix yet other than turning off debug, but M4s have always been
|
|
# tight on RAM so....
|
|
def __repr__(self) -> str:
|
|
return ''.join(
|
|
[
|
|
'KMKKeyboard(\n',
|
|
f' debug_enabled={self.debug_enabled}, ',
|
|
f'diode_orientation={self.diode_orientation}, ',
|
|
f'matrix={self.matrix},\n',
|
|
f' unicode_mode={self.unicode_mode}, ',
|
|
f'_hid_helper={self._hid_helper},\n',
|
|
f' keys_pressed={self.keys_pressed},\n',
|
|
f' _coordkeys_pressed={self._coordkeys_pressed},\n',
|
|
f' hid_pending={self.hid_pending}, ',
|
|
f'active_layers={self.active_layers}, ',
|
|
f'_timeouts={self._timeouts}\n',
|
|
')',
|
|
]
|
|
)
|
|
|
|
def _print_debug_cycle(self, init: bool = False) -> None:
|
|
if debug.enabled:
|
|
debug(f'coordkeys_pressed={self._coordkeys_pressed}')
|
|
debug(f'keys_pressed={self.keys_pressed}')
|
|
|
|
def _send_hid(self) -> None:
|
|
if self._hid_send_enabled:
|
|
hid_report = self._hid_helper.create_report(self.keys_pressed)
|
|
try:
|
|
hid_report.send()
|
|
except KeyError as e:
|
|
if debug.enabled:
|
|
debug(f'HidNotFound(HIDReportType={e})')
|
|
self.hid_pending = False
|
|
|
|
def _handle_matrix_report(self, kevent: KeyEvent) -> None:
|
|
if kevent is not None:
|
|
self._on_matrix_changed(kevent)
|
|
self.state_changed = True
|
|
|
|
def _find_key_in_map(self, int_coord: int) -> Key:
|
|
try:
|
|
idx = self.coord_mapping.index(int_coord)
|
|
except ValueError:
|
|
if debug.enabled:
|
|
debug(f'CoordMappingNotFound(ic={int_coord})')
|
|
|
|
return None
|
|
|
|
for layer in self.active_layers:
|
|
try:
|
|
layer_key = self.keymap[layer][idx]
|
|
except IndexError:
|
|
layer_key = None
|
|
if debug.enabled:
|
|
debug(f'KeymapIndexError(idx={idx}, layer={layer})')
|
|
|
|
if not layer_key or layer_key == KC.TRNS:
|
|
continue
|
|
|
|
return layer_key
|
|
|
|
def _on_matrix_changed(self, kevent: KeyEvent) -> None:
|
|
int_coord = kevent.key_number
|
|
is_pressed = kevent.pressed
|
|
if debug.enabled:
|
|
debug(f'MatrixChange(ic={int_coord}, pressed={is_pressed})')
|
|
|
|
key = None
|
|
if not is_pressed:
|
|
try:
|
|
key = self._coordkeys_pressed[int_coord]
|
|
except KeyError:
|
|
if debug.enabled:
|
|
debug(f'KeyNotPressed(ic={int_coord})')
|
|
|
|
if key is None:
|
|
key = self._find_key_in_map(int_coord)
|
|
|
|
if key is None:
|
|
if debug.enabled:
|
|
debug(f'MatrixUndefinedCoordinate(ic={int_coord})')
|
|
return self
|
|
|
|
if debug.enabled:
|
|
debug(f'KeyResolution(key={key})')
|
|
|
|
self.pre_process_key(key, is_pressed, int_coord)
|
|
|
|
def _process_resume_buffer(self):
|
|
'''
|
|
Resume the processing of buffered, delayed, deferred, etc. key events
|
|
emitted by modules.
|
|
|
|
We use a copy of the `_resume_buffer` as a working buffer. The working
|
|
buffer holds all key events in the correct order for processing. If
|
|
during processing new events are pushed to the `_resume_buffer`, they
|
|
are prepended to the working buffer (which may not be emptied), in
|
|
order to preserve key event order.
|
|
We also double-buffer `_resume_buffer` with `_resume_buffer_x`, only
|
|
copying the reference to hopefully safe some time on allocations.
|
|
'''
|
|
|
|
buffer, self._resume_buffer = self._resume_buffer, self._resume_buffer_x
|
|
|
|
while buffer:
|
|
ksf = buffer.pop(0)
|
|
key = ksf.key
|
|
|
|
# Handle any unaccounted-for layer shifts by looking up the key resolution again.
|
|
if ksf.int_coord in self._coordkeys_pressed.keys():
|
|
key = self._find_key_in_map(ksf.int_coord)
|
|
|
|
# Resume the processing of the key event and update the HID report
|
|
# when applicable.
|
|
self.pre_process_key(key, ksf.is_pressed, ksf.int_coord, ksf.index)
|
|
|
|
if self.hid_pending:
|
|
self._send_hid()
|
|
self.hid_pending = False
|
|
|
|
# Any newly buffered key events must be prepended to the working
|
|
# buffer.
|
|
if self._resume_buffer:
|
|
self._resume_buffer.extend(buffer)
|
|
buffer.clear()
|
|
buffer, self._resume_buffer = self._resume_buffer, buffer
|
|
|
|
self._resume_buffer_x = buffer
|
|
|
|
@property
|
|
def debug_enabled(self) -> bool:
|
|
return debug.enabled
|
|
|
|
@debug_enabled.setter
|
|
def debug_enabled(self, enabled: bool):
|
|
debug.enabled = enabled
|
|
|
|
def pre_process_key(
|
|
self,
|
|
key: Key,
|
|
is_pressed: bool,
|
|
int_coord: Optional[int] = None,
|
|
index: int = 0,
|
|
) -> None:
|
|
for module in self.modules[index:]:
|
|
try:
|
|
key = module.process_key(self, key, is_pressed, int_coord)
|
|
if key is None:
|
|
break
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.process_key: {err}')
|
|
|
|
if int_coord is not None:
|
|
if is_pressed:
|
|
self._coordkeys_pressed[int_coord] = key
|
|
else:
|
|
try:
|
|
del self._coordkeys_pressed[int_coord]
|
|
except KeyError:
|
|
if debug.enabled:
|
|
debug(f'ReleaseKeyError(ic={int_coord})')
|
|
|
|
if key:
|
|
self.process_key(key, is_pressed, int_coord)
|
|
|
|
def process_key(
|
|
self, key: Key, is_pressed: bool, coord_int: Optional[int] = None
|
|
) -> None:
|
|
if is_pressed:
|
|
key.on_press(self, coord_int)
|
|
else:
|
|
key.on_release(self, coord_int)
|
|
|
|
def resume_process_key(
|
|
self,
|
|
module: Module,
|
|
key: Key,
|
|
is_pressed: bool,
|
|
int_coord: Optional[int] = None,
|
|
) -> None:
|
|
index = self.modules.index(module) + 1
|
|
ksf = KeyBufferFrame(
|
|
key=key, is_pressed=is_pressed, int_coord=int_coord, index=index
|
|
)
|
|
self._resume_buffer.append(ksf)
|
|
|
|
def remove_key(self, keycode: Key) -> None:
|
|
self.keys_pressed.discard(keycode)
|
|
self.process_key(keycode, False)
|
|
|
|
def add_key(self, keycode: Key) -> None:
|
|
self.keys_pressed.add(keycode)
|
|
self.process_key(keycode, True)
|
|
|
|
def tap_key(self, keycode: Key) -> None:
|
|
self.add_key(keycode)
|
|
# On the next cycle, we'll remove the key.
|
|
self.set_timeout(False, lambda: self.remove_key(keycode))
|
|
|
|
def set_timeout(
|
|
self, after_ticks: int, callback: Callable[[None], None]
|
|
) -> Tuple[int, int]:
|
|
# We allow passing False as an implicit "run this on the next process timeouts cycle"
|
|
if after_ticks is False:
|
|
after_ticks = 0
|
|
|
|
if after_ticks == 0 and self._processing_timeouts:
|
|
after_ticks += 1
|
|
|
|
timeout_key = ticks_add(ticks_ms(), after_ticks)
|
|
|
|
if timeout_key not in self._timeouts:
|
|
self._timeouts[timeout_key] = []
|
|
|
|
idx = len(self._timeouts[timeout_key])
|
|
self._timeouts[timeout_key].append(callback)
|
|
|
|
return (timeout_key, idx)
|
|
|
|
def cancel_timeout(self, timeout_key: int) -> None:
|
|
try:
|
|
self._timeouts[timeout_key[0]][timeout_key[1]] = None
|
|
except (KeyError, IndexError):
|
|
if debug.enabled:
|
|
debug(f'no such timeout: {timeout_key}')
|
|
|
|
def _process_timeouts(self) -> None:
|
|
if not self._timeouts:
|
|
return
|
|
|
|
# Copy timeout keys to a temporary list to allow sorting.
|
|
# Prevent net timeouts set during handling from running on the current
|
|
# cycle by setting a flag `_processing_timeouts`.
|
|
current_time = ticks_ms()
|
|
timeout_keys = []
|
|
self._processing_timeouts = True
|
|
|
|
for k in self._timeouts.keys():
|
|
if ticks_diff(k, current_time) <= 0:
|
|
timeout_keys.append(k)
|
|
|
|
if timeout_keys and debug.enabled:
|
|
debug('processing timeouts')
|
|
|
|
for k in sorted(timeout_keys):
|
|
for callback in self._timeouts[k]:
|
|
if callback:
|
|
callback()
|
|
del self._timeouts[k]
|
|
|
|
self._processing_timeouts = False
|
|
|
|
def _init_sanity_check(self) -> None:
|
|
'''
|
|
Ensure the provided configuration is *probably* bootable
|
|
'''
|
|
assert self.keymap, 'must define a keymap with at least one row'
|
|
assert (
|
|
self.hid_type in HIDModes.ALL_MODES
|
|
), 'hid_type must be a value from kmk.consts.HIDModes'
|
|
if not self.matrix:
|
|
assert self.row_pins, 'no GPIO pins defined for matrix rows'
|
|
assert self.col_pins, 'no GPIO pins defined for matrix columns'
|
|
assert (
|
|
self.diode_orientation is not None
|
|
), 'diode orientation must be defined'
|
|
|
|
def _init_coord_mapping(self) -> None:
|
|
'''
|
|
Attempt to sanely guess a coord_mapping if one is not provided. No-op
|
|
if `kmk.extensions.split.Split` is used, it provides equivalent
|
|
functionality in `on_bootup`
|
|
|
|
To save RAM on boards that don't use Split, we don't import Split
|
|
and do an isinstance check, but instead do string detection
|
|
'''
|
|
if any(x.__class__.__module__ == 'kmk.modules.split' for x in self.modules):
|
|
return
|
|
|
|
if not self.coord_mapping:
|
|
cm = []
|
|
for m in self.matrix:
|
|
cm.extend(m.coord_mapping)
|
|
self.coord_mapping = tuple(cm)
|
|
|
|
def _init_hid(self) -> None:
|
|
if self.hid_type == HIDModes.NOOP:
|
|
self._hid_helper = AbstractHID
|
|
elif self.hid_type == HIDModes.USB:
|
|
self._hid_helper = USBHID
|
|
elif self.hid_type == HIDModes.BLE:
|
|
self._hid_helper = BLEHID
|
|
else:
|
|
self._hid_helper = AbstractHID
|
|
self._hid_helper = self._hid_helper(**self._go_args)
|
|
self._hid_send_enabled = True
|
|
|
|
def _init_matrix(self) -> None:
|
|
if self.matrix is None:
|
|
if debug.enabled:
|
|
debug('Initialising default matrix scanner.')
|
|
self.matrix = MatrixScanner(
|
|
column_pins=self.col_pins,
|
|
row_pins=self.row_pins,
|
|
columns_to_anodes=self.diode_orientation,
|
|
)
|
|
|
|
try:
|
|
self.matrix = tuple(iter(self.matrix))
|
|
offset = 0
|
|
for matrix in self.matrix:
|
|
matrix.offset = offset
|
|
offset += matrix.key_count
|
|
except TypeError:
|
|
self.matrix = (self.matrix,)
|
|
|
|
def before_matrix_scan(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.before_matrix_scan(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.before_matrix_scan: {err}')
|
|
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.before_matrix_scan(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {ext}.before_matrix_scan: {err}')
|
|
|
|
def after_matrix_scan(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.after_matrix_scan(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.after_matrix_scan: {err}')
|
|
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.after_matrix_scan(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {ext}.after_matrix_scan: {err}')
|
|
|
|
def before_hid_send(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.before_hid_send(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.before_hid_send: {err}')
|
|
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.before_hid_send(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(
|
|
f'Error in {ext}.before_hid_send: {err}',
|
|
)
|
|
|
|
def after_hid_send(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.after_hid_send(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.after_hid_send: {err}')
|
|
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.after_hid_send(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {ext}.after_hid_send: {err}')
|
|
|
|
def powersave_enable(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.on_powersave_enable(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.on_powersave: {err}')
|
|
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.on_powersave_enable(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {ext}.powersave_enable: {err}')
|
|
|
|
def powersave_disable(self) -> None:
|
|
for module in self.modules:
|
|
try:
|
|
module.on_powersave_disable(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {module}.powersave_disable: {err}')
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.on_powersave_disable(self.sandbox)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Error in {ext}.powersave_disable: {err}')
|
|
|
|
def go(self, hid_type=HIDModes.USB, secondary_hid_type=None, **kwargs) -> None:
|
|
self._init(hid_type=hid_type, secondary_hid_type=secondary_hid_type, **kwargs)
|
|
while True:
|
|
self._main_loop()
|
|
|
|
def _init(
|
|
self,
|
|
hid_type: HIDModes = HIDModes.USB,
|
|
secondary_hid_type: Optional[HIDModes] = None,
|
|
**kwargs,
|
|
) -> None:
|
|
self._go_args = kwargs
|
|
self.hid_type = hid_type
|
|
self.secondary_hid_type = secondary_hid_type
|
|
|
|
self._init_sanity_check()
|
|
self._init_hid()
|
|
self._init_matrix()
|
|
self._init_coord_mapping()
|
|
|
|
for module in self.modules:
|
|
try:
|
|
module.during_bootup(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Failed to load module {module}: {err}')
|
|
for ext in self.extensions:
|
|
try:
|
|
ext.during_bootup(self)
|
|
except Exception as err:
|
|
if debug.enabled:
|
|
debug(f'Failed to load extensions {module}: {err}')
|
|
|
|
if debug.enabled:
|
|
debug(f'init: {self}')
|
|
|
|
def _main_loop(self) -> None:
|
|
self.state_changed = False
|
|
self.sandbox.active_layers = self.active_layers.copy()
|
|
|
|
self.before_matrix_scan()
|
|
|
|
self._process_resume_buffer()
|
|
|
|
for matrix in self.matrix:
|
|
update = matrix.scan_for_changes()
|
|
if update:
|
|
self.matrix_update = update
|
|
break
|
|
self.sandbox.matrix_update = self.matrix_update
|
|
self.sandbox.secondary_matrix_update = self.secondary_matrix_update
|
|
|
|
self.after_matrix_scan()
|
|
|
|
if self.secondary_matrix_update:
|
|
self.matrix_update_queue.append(self.secondary_matrix_update)
|
|
self.secondary_matrix_update = None
|
|
|
|
if self.matrix_update:
|
|
self.matrix_update_queue.append(self.matrix_update)
|
|
self.matrix_update = None
|
|
|
|
# only handle one key per cycle.
|
|
if self.matrix_update_queue:
|
|
self._handle_matrix_report(self.matrix_update_queue.pop(0))
|
|
|
|
self.before_hid_send()
|
|
|
|
if self.hid_pending:
|
|
self._send_hid()
|
|
|
|
self._process_timeouts()
|
|
|
|
if self.hid_pending:
|
|
self._send_hid()
|
|
self.state_changed = True
|
|
|
|
self.after_hid_send()
|
|
|
|
if self._trigger_powersave_enable:
|
|
self.powersave_enable()
|
|
|
|
if self._trigger_powersave_disable:
|
|
self.powersave_disable()
|
|
|
|
if self.state_changed:
|
|
self._print_debug_cycle()
|