try: from typing import Callable, Optional, Tuple except ImportError: pass from supervisor import ticks_ms from keypad import Event as KeyEvent from kmk.consts import UnicodeMode from kmk.hid import BLEHID, USBHID, AbstractHID, HIDModes from kmk.keys import KC, Key from kmk.kmktime import ticks_add, ticks_diff from kmk.modules import Module from kmk.scanners.keypad import MatrixScanner from kmk.utils import Debug debug = Debug(__name__) class Sandbox: matrix_update = None secondary_matrix_update = None active_layers = None class KMKKeyboard: ##### # User-configurable keymap = [] coord_mapping = None row_pins = None col_pins = None diode_orientation = None matrix = None unicode_mode = UnicodeMode.NOOP modules = [] extensions = [] sandbox = Sandbox() ##### # Internal State keys_pressed = set() _coordkeys_pressed = {} hid_type = HIDModes.USB secondary_hid_type = None _hid_helper = None _hid_send_enabled = False hid_pending = False matrix_update = None secondary_matrix_update = None matrix_update_queue = [] state_changed = False _trigger_powersave_enable = False _trigger_powersave_disable = False i2c_deinit_count = 0 _go_args = None _processing_timeouts = False # this should almost always be PREpended to, replaces # former use of reversed_active_layers which had pointless # overhead (the underlying list was never used anyway) active_layers = [0] _timeouts = {} # on some M4 setups (such as klardotsh/klarank_feather_m4, CircuitPython # 6.0rc1) this runs out of RAM every cycle and takes down the board. no # real known fix yet other than turning off debug, but M4s have always been # tight on RAM so.... def __repr__(self) -> str: return ''.join( [ 'KMKKeyboard(\n', f' debug_enabled={self.debug_enabled}, ', f'diode_orientation={self.diode_orientation}, ', f'matrix={self.matrix},\n', f' unicode_mode={self.unicode_mode}, ', f'_hid_helper={self._hid_helper},\n', f' keys_pressed={self.keys_pressed},\n', f' _coordkeys_pressed={self._coordkeys_pressed},\n', f' hid_pending={self.hid_pending}, ', f'active_layers={self.active_layers}, ', f'_timeouts={self._timeouts}\n', ')', ] ) def _print_debug_cycle(self, init: bool = False) -> None: if debug.enabled: debug(f'coordkeys_pressed={self._coordkeys_pressed}') debug(f'keys_pressed={self.keys_pressed}') def _send_hid(self) -> None: if self._hid_send_enabled: hid_report = self._hid_helper.create_report(self.keys_pressed) try: hid_report.send() except KeyError as e: if debug.enabled: debug(f'HidNotFound(HIDReportType={e})') self.hid_pending = False def _handle_matrix_report(self, kevent: KeyEvent) -> None: if kevent is not None: self._on_matrix_changed(kevent) self.state_changed = True def _find_key_in_map(self, int_coord: int) -> Key: try: idx = self.coord_mapping.index(int_coord) except ValueError: if debug.enabled: debug(f'CoordMappingNotFound(ic={int_coord})') return None for layer in self.active_layers: try: layer_key = self.keymap[layer][idx] except IndexError: layer_key = None if debug.enabled: debug(f'KeymapIndexError(idx={idx}, layer={layer})') if not layer_key or layer_key == KC.TRNS: continue return layer_key def _on_matrix_changed(self, kevent: KeyEvent) -> None: int_coord = kevent.key_number is_pressed = kevent.pressed if debug.enabled: debug(f'MatrixChange(ic={int_coord}, pressed={is_pressed})') key = None if not is_pressed: try: key = self._coordkeys_pressed[int_coord] except KeyError: if debug.enabled: debug(f'KeyNotPressed(ic={int_coord})') if key is None: key = self._find_key_in_map(int_coord) if key is None: if debug.enabled: debug(f'MatrixUndefinedCoordinate(ic={int_coord})') return self if debug.enabled: debug(f'KeyResolution(key={key})') self.pre_process_key(key, is_pressed, int_coord) @property def debug_enabled(self) -> bool: return debug.enabled @debug_enabled.setter def debug_enabled(self, enabled: bool): debug.enabled = enabled def pre_process_key( self, key: Key, is_pressed: bool, int_coord: Optional[int] = None, index: int = 0, ) -> None: for module in self.modules[index:]: try: key = module.process_key(self, key, is_pressed, int_coord) if key is None: break except Exception as err: if debug.enabled: debug(f'Error in {module}.process_key: {err}') if int_coord is not None: if is_pressed: self._coordkeys_pressed[int_coord] = key else: try: del self._coordkeys_pressed[int_coord] except KeyError: if debug.enabled: debug(f'ReleaseKeyError(ic={int_coord})') if key: self.process_key(key, is_pressed, int_coord) def process_key( self, key: Key, is_pressed: bool, coord_int: Optional[int] = None ) -> None: if is_pressed: key.on_press(self, coord_int) else: key.on_release(self, coord_int) def resume_process_key( self, module: Module, key: Key, is_pressed: bool, int_coord: Optional[int] = None, ) -> None: index = self.modules.index(module) + 1 self.pre_process_key(key, is_pressed, int_coord, index) def remove_key(self, keycode: Key) -> None: self.keys_pressed.discard(keycode) self.process_key(keycode, False) def add_key(self, keycode: Key) -> None: self.keys_pressed.add(keycode) self.process_key(keycode, True) def tap_key(self, keycode: Key) -> None: self.add_key(keycode) # On the next cycle, we'll remove the key. self.set_timeout(False, lambda: self.remove_key(keycode)) def set_timeout( self, after_ticks: int, callback: Callable[[None], None] ) -> Tuple[int, int]: # We allow passing False as an implicit "run this on the next process timeouts cycle" if after_ticks is False: after_ticks = 0 if after_ticks == 0 and self._processing_timeouts: after_ticks += 1 timeout_key = ticks_add(ticks_ms(), after_ticks) if timeout_key not in self._timeouts: self._timeouts[timeout_key] = [] idx = len(self._timeouts[timeout_key]) self._timeouts[timeout_key].append(callback) return (timeout_key, idx) def cancel_timeout(self, timeout_key: int) -> None: try: self._timeouts[timeout_key[0]][timeout_key[1]] = None except (KeyError, IndexError): if debug.enabled: debug(f'no such timeout: {timeout_key}') def _process_timeouts(self) -> None: if not self._timeouts: return # Copy timeout keys to a temporary list to allow sorting. # Prevent net timeouts set during handling from running on the current # cycle by setting a flag `_processing_timeouts`. current_time = ticks_ms() timeout_keys = [] self._processing_timeouts = True for k in self._timeouts.keys(): if ticks_diff(k, current_time) <= 0: timeout_keys.append(k) if timeout_keys and debug.enabled: debug('processing timeouts') for k in sorted(timeout_keys): for callback in self._timeouts[k]: if callback: callback() del self._timeouts[k] self._processing_timeouts = False def _init_sanity_check(self) -> None: ''' Ensure the provided configuration is *probably* bootable ''' assert self.keymap, 'must define a keymap with at least one row' assert ( self.hid_type in HIDModes.ALL_MODES ), 'hid_type must be a value from kmk.consts.HIDModes' if not self.matrix: assert self.row_pins, 'no GPIO pins defined for matrix rows' assert self.col_pins, 'no GPIO pins defined for matrix columns' assert ( self.diode_orientation is not None ), 'diode orientation must be defined' def _init_coord_mapping(self) -> None: ''' Attempt to sanely guess a coord_mapping if one is not provided. No-op if `kmk.extensions.split.Split` is used, it provides equivalent functionality in `on_bootup` To save RAM on boards that don't use Split, we don't import Split and do an isinstance check, but instead do string detection ''' if any(x.__class__.__module__ == 'kmk.modules.split' for x in self.modules): return if not self.coord_mapping: cm = [] for m in self.matrix: cm.extend(m.coord_mapping) self.coord_mapping = tuple(cm) def _init_hid(self) -> None: if self.hid_type == HIDModes.NOOP: self._hid_helper = AbstractHID elif self.hid_type == HIDModes.USB: self._hid_helper = USBHID elif self.hid_type == HIDModes.BLE: self._hid_helper = BLEHID else: self._hid_helper = AbstractHID self._hid_helper = self._hid_helper(**self._go_args) self._hid_send_enabled = True def _init_matrix(self) -> None: if self.matrix is None: if debug.enabled: debug('Initialising default matrix scanner.') self.matrix = MatrixScanner( column_pins=self.col_pins, row_pins=self.row_pins, columns_to_anodes=self.diode_orientation, ) try: self.matrix = tuple(iter(self.matrix)) offset = 0 for matrix in self.matrix: matrix.offset = offset offset += matrix.key_count except TypeError: self.matrix = (self.matrix,) def before_matrix_scan(self) -> None: for module in self.modules: try: module.before_matrix_scan(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.before_matrix_scan: {err}') for ext in self.extensions: try: ext.before_matrix_scan(self.sandbox) except Exception as err: if debug.enabled: debug(f'Error in {ext}.before_matrix_scan: {err}') def after_matrix_scan(self) -> None: for module in self.modules: try: module.after_matrix_scan(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.after_matrix_scan: {err}') for ext in self.extensions: try: ext.after_matrix_scan(self.sandbox) except Exception as err: if debug.enabled: debug(f'Error in {ext}.after_matrix_scan: {err}') def before_hid_send(self) -> None: for module in self.modules: try: module.before_hid_send(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.before_hid_send: {err}') for ext in self.extensions: try: ext.before_hid_send(self.sandbox) except Exception as err: if debug.enabled: debug( f'Error in {ext}.before_hid_send: {err}', ) def after_hid_send(self) -> None: for module in self.modules: try: module.after_hid_send(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.after_hid_send: {err}') for ext in self.extensions: try: ext.after_hid_send(self.sandbox) except Exception as err: if debug.enabled: debug(f'Error in {ext}.after_hid_send: {err}') def powersave_enable(self) -> None: for module in self.modules: try: module.on_powersave_enable(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.on_powersave: {err}') for ext in self.extensions: try: ext.on_powersave_enable(self.sandbox) except Exception as err: if debug.enabled: debug(f'Error in {ext}.powersave_enable: {err}') def powersave_disable(self) -> None: for module in self.modules: try: module.on_powersave_disable(self) except Exception as err: if debug.enabled: debug(f'Error in {module}.powersave_disable: {err}') for ext in self.extensions: try: ext.on_powersave_disable(self.sandbox) except Exception as err: if debug.enabled: debug(f'Error in {ext}.powersave_disable: {err}') def go(self, hid_type=HIDModes.USB, secondary_hid_type=None, **kwargs) -> None: self._init(hid_type=hid_type, secondary_hid_type=secondary_hid_type, **kwargs) while True: self._main_loop() def _init( self, hid_type: HIDModes = HIDModes.USB, secondary_hid_type: Optional[HIDModes] = None, **kwargs, ) -> None: self._go_args = kwargs self.hid_type = hid_type self.secondary_hid_type = secondary_hid_type self._init_sanity_check() self._init_hid() self._init_matrix() self._init_coord_mapping() for module in self.modules: try: module.during_bootup(self) except Exception as err: if debug.enabled: debug(f'Failed to load module {module}: {err}') for ext in self.extensions: try: ext.during_bootup(self) except Exception as err: if debug.enabled: debug(f'Failed to load extensions {module}: {err}') if debug.enabled: debug(f'init: {self}') def _main_loop(self) -> None: self.state_changed = False self.sandbox.active_layers = self.active_layers.copy() self.before_matrix_scan() for matrix in self.matrix: update = matrix.scan_for_changes() if update: self.matrix_update = update break self.sandbox.matrix_update = self.matrix_update self.sandbox.secondary_matrix_update = self.secondary_matrix_update self.after_matrix_scan() if self.secondary_matrix_update: self.matrix_update_queue.append(self.secondary_matrix_update) self.secondary_matrix_update = None if self.matrix_update: self.matrix_update_queue.append(self.matrix_update) self.matrix_update = None # only handle one key per cycle. if self.matrix_update_queue: self._handle_matrix_report(self.matrix_update_queue.pop(0)) self.before_hid_send() if self.hid_pending: self._send_hid() self._process_timeouts() if self.hid_pending: self._send_hid() self.state_changed = True self.after_hid_send() if self._trigger_powersave_enable: self.powersave_enable() if self._trigger_powersave_disable: self.powersave_disable() if self.state_changed: self._print_debug_cycle()