OMEGA REFACTOR! Perf grind basically complete.

Resolves #70, Resolves #67

Still needs some regression testing in general, and a definite
regression is that rotary encoders are no longer (for the immediate time
being) supported.

Moves to a much simpler internal state tracking system, and FAR lighter
matrix scan.

Removes MicroPython support entirely.
This commit is contained in:
Josh Klar
2018-10-16 04:04:39 -07:00
parent 0c72554773
commit 16c82b1c0c
35 changed files with 797 additions and 1526 deletions

View File

@@ -1,13 +1,5 @@
import logging
import sys
from kmk import kmktime
from kmk.consts import DiodeOrientation, LeaderMode, UnicodeModes
from kmk.event_defs import (HID_REPORT_EVENT, INIT_FIRMWARE_EVENT,
KEY_DOWN_EVENT, KEY_UP_EVENT, KEYCODE_DOWN_EVENT,
KEYCODE_UP_EVENT, MACRO_COMPLETE_EVENT,
NEW_MATRIX_EVENT, PENDING_KEYCODE_POP_EVENT)
from kmk.keycodes import FIRST_KMK_INTERNAL_KEYCODE, Keycodes, RawKeycodes
from kmk.kmktime import sleep_ms, ticks_diff, ticks_ms
GESC_TRIGGERS = {
Keycodes.Modifiers.KC_LSHIFT, Keycodes.Modifiers.KC_RSHIFT,
@@ -15,57 +7,6 @@ GESC_TRIGGERS = {
}
class Store:
'''
A data store very loosely inspired by Redux, but with most of the fancy
functional and immutable abilities unavailable because microcontrollers.
This serves as the event dispatcher at the heart of KMK. All changes to the
state of the keyboard should be triggered by events (see event_defs.py)
dispatched through this store, and listened to (for side-effects or other
handling) by subscription functions.
'''
def __init__(self, reducer, log_level=logging.NOTSET):
self.reducer = reducer
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
self.state = self.reducer(logger=self.logger)
self.callbacks = []
def dispatch(self, action):
if self.state.preserve_intermediate_states:
self.state._oldstates.append(repr(self.state.to_dict(verbose=True)))
if callable(action):
self.logger.debug('Received thunk')
action(self.dispatch, self.get_state)
self.logger.debug('Finished thunk')
return None
self.logger.debug('Dispatching action: Type {} >> {}'.format(action.type, action))
self.state = self.reducer(self.state, action, logger=self.logger)
self.logger.debug('Dispatching complete: Type {}'.format(action.type))
self.logger.debug('New state: {}'.format(self.state))
for cb in self.callbacks:
if cb is not None:
try:
cb(self.state, action)
except Exception as e:
self.logger.error('Callback failed, moving on')
sys.print_exception(e)
def get_state(self):
return self.state
def subscribe(self, callback):
self.callbacks.append(callback)
return len(self.callbacks) - 1
def unsubscribe(self, idx):
self.callbacks[idx] = None
class InternalState:
keys_pressed = set()
pending_keys = set()
@@ -73,13 +14,9 @@ class InternalState:
leader_pending = None
leader_last_len = 0
hid_pending = False
keymap = []
row_pins = []
col_pins = []
matrix = []
diode_orientation = DiodeOrientation.COLUMNS
leader_mode_history = []
active_layers = [0]
reversed_active_layers = list(reversed(active_layers))
start_time = {
'lt': None,
'tg': None,
@@ -87,23 +24,31 @@ class InternalState:
'lm': None,
'leader': None,
}
_oldstates = []
def __init__(self, preserve_intermediate_states=False):
import kmk_keyboard
self.unicode_mode = getattr(kmk_keyboard, 'unicode_mode', UnicodeModes.NOOP)
self.tap_time = getattr(kmk_keyboard, 'tap_time', 300)
self.leader_mode = getattr(kmk_keyboard, 'leader_mode', LeaderMode.ENTER)
self.leader_dictionary = getattr(kmk_keyboard, 'leader_dictionary', {})
self.preserve_intermediate_states = preserve_intermediate_states
def __init__(self, config):
self.config = config
def __enter__(self):
return self
self.leader_mode = config.leader_mode
def __exit__(self, type, value, traceback):
pass
self.internal_key_handlers = {
RawKeycodes.KC_DF: self._layer_df,
RawKeycodes.KC_MO: self._layer_mo,
RawKeycodes.KC_LM: self._layer_lm,
RawKeycodes.KC_LT: self._layer_lt,
RawKeycodes.KC_TG: self._layer_tg,
RawKeycodes.KC_TO: self._layer_to,
RawKeycodes.KC_TT: self._layer_tt,
Keycodes.KMK.KC_GESC.code: self._kc_gesc,
RawKeycodes.KC_UC_MODE: self._kc_uc_mode,
RawKeycodes.KC_MACRO: self._kc_macro,
Keycodes.KMK.KC_LEAD.code: self._kc_lead,
Keycodes.KMK.KC_NO.code: self._kc_no,
}
def to_dict(self, verbose=False):
def __repr__(self):
return 'InternalState({})'.format(self._to_dict())
def _to_dict(self):
ret = {
'keys_pressed': self.keys_pressed,
'active_layers': self.active_layers,
@@ -113,307 +58,280 @@ class InternalState:
'start_time': self.start_time,
}
if verbose:
ret.update({
'keymap': self.keymap,
'matrix': self.matrix,
'col_pins': self.col_pins,
'row_pins': self.row_pins,
'diode_orientation': self.diode_orientation,
})
return ret
def __repr__(self):
return 'InternalState({})'.format(self.to_dict())
def _find_key_in_map(self, row, col):
# Later-added layers have priority. Sift through the layers
# in reverse order until we find a valid keycode object
for layer in self.reversed_active_layers:
layer_key = self.config.keymap[layer][row][col]
def find_key_in_map(state, row, col):
# Later-added layers have priority. Sift through the layers
# in reverse order until we find a valid keycode object
for layer in reversed(state.active_layers):
layer_key = state.keymap[layer][row][col]
if not layer_key or layer_key == Keycodes.KMK.KC_TRNS:
continue
if layer_key == Keycodes.KMK.KC_NO:
break
return layer_key
def kmk_reducer(state=None, action=None, logger=None):
if state is None:
state = InternalState()
if logger is not None:
logger.debug('Reducer received state of None, creating new')
if action is None:
if logger is not None:
logger.debug('No action received, returning state unmodified')
return state
if action.type == NEW_MATRIX_EVENT:
matrix_keys_pressed = {
find_key_in_map(state, row, col)
for row, col, in action.matrix
}
pressed = matrix_keys_pressed - state.keys_pressed
released = state.keys_pressed - matrix_keys_pressed
if not pressed and not released:
return state
for changed_key in released:
if not changed_key:
if not layer_key or layer_key == Keycodes.KMK.KC_TRNS:
continue
elif changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
state = process_internal_key_event(
state,
KEY_UP_EVENT,
changed_key,
logger=logger,
)
for changed_key in pressed:
if not changed_key:
continue
elif changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
state = process_internal_key_event(
state,
KEY_DOWN_EVENT,
changed_key,
logger=logger,
)
if layer_key == Keycodes.KMK.KC_NO:
return layer_key
state.matrix = action.matrix
state.keys_pressed |= pressed
state.keys_pressed -= released
if state.leader_mode % 2 == 1:
state.hid_pending = False
return layer_key
def matrix_changed(self, row, col, is_pressed):
if self.config.debug_enabled:
print('Matrix changed (col, row, pressed?): {}, {}, {}'.format(
row, col, is_pressed,
))
kc_changed = self._find_key_in_map(row, col)
if kc_changed is None:
print('No key accessible for col, row: {}, {}'.format(row, col))
return self
if kc_changed.code >= FIRST_KMK_INTERNAL_KEYCODE:
self._process_internal_key_event(
kc_changed,
is_pressed,
)
else:
state.hid_pending = True
if is_pressed:
self.keys_pressed.add(kc_changed)
else:
self.keys_pressed.discard(kc_changed)
return state
self.hid_pending = True
if action.type == KEYCODE_UP_EVENT:
state.keys_pressed.discard(action.keycode)
return state
if self.leader_mode % 2 == 1:
self._process_leader_mode()
if action.type == KEYCODE_DOWN_EVENT:
state.keys_pressed.add(action.keycode)
return state
return self
if action.type == INIT_FIRMWARE_EVENT:
state.keymap = action.keymap
state.row_pins = action.row_pins
state.col_pins = action.col_pins
state.diode_orientation = action.diode_orientation
return state
def force_keycode_up(self, keycode):
self.keys_pressed.discard(keycode)
self.hid_pending = True
return self
# HID events are non-mutating, used exclusively for listeners to know
# they should be doing things. This could/should arguably be folded back
# into KEY_UP_EVENT and KEY_DOWN_EVENT, but for now it's nice to separate
# this out for debugging's sake.
if action.type == HID_REPORT_EVENT:
return state
def force_keycode_down(self, keycode):
if keycode.code == Keycodes.KMK.KC_MACRO_SLEEP_MS:
sleep_ms(keycode.ms)
else:
self.keys_pressed.add(keycode)
self.hid_pending = True
return self
if action.type == MACRO_COMPLETE_EVENT:
state.macro_pending = None
return state
def pending_key_handled(self):
popped = self.pending_keys.pop()
if action.type == PENDING_KEYCODE_POP_EVENT:
state.pending_keys.pop()
return state
if self.config.debug_enabled:
print('Popped pending key: {}'.format(popped))
# On unhandled events, log and do not mutate state
logger.warning('Unhandled event! Returning state unmodified.')
return state
return self
def resolve_hid(self):
self.hid_pending = False
return self
def process_internal_key_event(state, action_type, changed_key, logger=None):
if logger is None:
logger = logging.getLogger(__name__)
def resolve_macro(self):
if self.config.debug_enabled:
print('Macro complete!')
# Since the key objects can be chained into new objects
# with, for example, no_press set, always check against
# the underlying code rather than comparing Keycode
# objects
self.macro_pending = None
return self
if changed_key.code == RawKeycodes.KC_DF:
return df(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_MO:
return mo(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_LM:
return lm(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_LT:
return lt(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_TG:
return tg(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_TO:
return to(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_TT:
return tt(state, action_type, changed_key, logger=logger)
elif changed_key.code == Keycodes.KMK.KC_GESC.code:
return grave_escape(state, action_type, logger=logger)
elif changed_key.code == RawKeycodes.KC_UC_MODE:
return unicode_mode(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_MACRO:
return macro(state, action_type, changed_key, logger=logger)
elif changed_key.code == Keycodes.KMK.KC_LEAD.code:
return leader(state)
else:
return state
def _process_internal_key_event(self, changed_key, is_pressed):
# Since the key objects can be chained into new objects
# with, for example, no_press set, always check against
# the underlying code rather than comparing Keycode
# objects
return self.internal_key_handlers[changed_key.code](
changed_key, is_pressed,
)
def grave_escape(state, action_type, logger):
if action_type == KEY_DOWN_EVENT:
if any(key in GESC_TRIGGERS for key in state.keys_pressed):
# if Shift is held, KC_GRAVE will become KC_TILDE on OS level
state.keys_pressed.add(Keycodes.Common.KC_GRAVE)
return state
def _layer_df(self, changed_key, is_pressed):
"""Switches the default layer"""
if is_pressed:
self.active_layers[0] = changed_key.layer
self.reversed_active_layers = list(reversed(self.active_layers))
# else return KC_ESC
state.keys_pressed.add(Keycodes.Common.KC_ESCAPE)
return state
return self
elif action_type == KEY_UP_EVENT:
state.keys_pressed.discard(Keycodes.Common.KC_ESCAPE)
state.keys_pressed.discard(Keycodes.Common.KC_GRAVE)
return state
return state
def df(state, action_type, changed_key, logger):
"""Switches the default layer"""
if action_type == KEY_DOWN_EVENT:
state.active_layers[0] = changed_key.layer
return state
def mo(state, action_type, changed_key, logger):
"""Momentarily activates layer, switches off when you let go"""
if action_type == KEY_UP_EVENT:
state.active_layers = [
layer for layer in state.active_layers
if layer != changed_key.layer
]
elif action_type == KEY_DOWN_EVENT:
state.active_layers.append(changed_key.layer)
return state
def lm(state, action_type, changed_key, logger):
"""As MO(layer) but with mod active"""
if action_type == KEY_DOWN_EVENT:
# Sets the timer start and acts like MO otherwise
state.start_time['lm'] = kmktime.ticks_ms()
state.keys_pressed.add(changed_key.kc)
state = mo(state, action_type, changed_key, logger)
elif action_type == KEY_UP_EVENT:
state.keys_pressed.discard(changed_key.kc)
state.start_time['lm'] = None
state = mo(state, action_type, changed_key)
return state
def lt(state, action_type, changed_key, logger):
"""Momentarily activates layer if held, sends kc if tapped"""
if action_type == KEY_DOWN_EVENT:
# Sets the timer start and acts like MO otherwise
state.start_time['lt'] = kmktime.ticks_ms()
state = mo(state, action_type, changed_key, logger)
elif action_type == KEY_UP_EVENT:
# On keyup, check timer, and press key if needed.
if state.start_time['lt'] and (
kmktime.ticks_diff(kmktime.ticks_ms(), state.start_time['lt']) < state.tap_time
):
state.pending_keys.add(changed_key.kc)
state.start_time['lt'] = None
state = mo(state, action_type, changed_key, logger)
return state
def tg(state, action_type, changed_key, logger):
"""Toggles the layer (enables it if not active, and vise versa)"""
if action_type == KEY_DOWN_EVENT:
if changed_key.layer in state.active_layers:
state.active_layers = [
layer for layer in state.active_layers
def _layer_mo(self, changed_key, is_pressed):
"""Momentarily activates layer, switches off when you let go"""
if is_pressed:
self.active_layers.append(changed_key.layer)
else:
self.active_layers = [
layer for layer in self.active_layers
if layer != changed_key.layer
]
else:
state.active_layers.append(changed_key.layer)
return state
self.reversed_active_layers = list(reversed(self.active_layers))
return self
def to(state, action_type, changed_key, logger):
"""Activates layer and deactivates all other layers"""
if action_type == KEY_DOWN_EVENT:
state.active_layers = [changed_key.layer]
def _layer_lm(self, changed_key, is_pressed):
"""As MO(layer) but with mod active"""
self.hid_pending = True
return state
def tt(state, action_type, changed_key, logger):
"""Momentarily activates layer if held, toggles it if tapped repeatedly"""
# TODO Make this work with tap dance to function more correctly, but technically works.
if action_type == KEY_DOWN_EVENT:
if state.start_time['tt'] is None:
if is_pressed:
# Sets the timer start and acts like MO otherwise
state.start_time['tt'] = kmktime.ticks_ms()
state = mo(state, action_type, changed_key, logger)
elif kmktime.ticks_diff(kmktime.ticks_ms(), state.start_time['tt']) < state.tap_time:
state.start_time['tt'] = None
state = tg(state, action_type, changed_key, logger)
elif action_type == KEY_UP_EVENT and (
state.start_time['tt'] is None or
kmktime.ticks_diff(kmktime.ticks_ms(), state.start_time['tt']) >= state.tap_time
):
# On first press, works like MO. On second press, does nothing unless let up within
# time window, then acts like TG.
state.start_time['tt'] = None
state = mo(state, action_type, changed_key, logger)
self.start_time['lm'] = ticks_ms()
self.keys_pressed.add(changed_key.kc)
return self.mo(changed_key, is_pressed)
return state
self.keys_pressed.discard(changed_key.kc)
self.start_time['lm'] = None
return self.mo(changed_key, is_pressed)
def _layer_lt(self, changed_key, is_pressed):
"""Momentarily activates layer if held, sends kc if tapped"""
if is_pressed:
# Sets the timer start and acts like MO otherwise
self.start_time['lt'] = ticks_ms()
return self.mo(changed_key, is_pressed)
def unicode_mode(state, action_type, changed_key, logger):
if action_type == KEY_DOWN_EVENT:
state.unicode_mode = changed_key.mode
# On keyup, check timer, and press key if needed.
if self.start_time['lt'] and (
ticks_diff(ticks_ms(), self.start_time['lt']) < self.tap_time
):
self.hid_pending = True
self.pending_keys.add(changed_key.kc)
return state
self.start_time['lt'] = None
return self.mo(changed_key, is_pressed)
def _layer_tg(self, changed_key, is_pressed):
"""Toggles the layer (enables it if not active, and vise versa)"""
if is_pressed:
if changed_key.layer in self.active_layers:
self.active_layers = [
layer for layer in self.active_layers
if layer != changed_key.layer
]
else:
self.active_layers.append(changed_key.layer)
def macro(state, action_type, changed_key, logger):
if action_type == KEY_UP_EVENT:
if changed_key.keyup:
state.macro_pending = changed_key.keyup
return state
self.reversed_active_layers = list(reversed(self.active_layers))
elif action_type == KEY_DOWN_EVENT:
if changed_key.keydown:
state.macro_pending = changed_key.keydown
return state
return self
return state
def _layer_to(self, changed_key, is_pressed):
"""Activates layer and deactivates all other layers"""
if is_pressed:
self.active_layers = [changed_key.layer]
self.reversed_active_layers = list(reversed(self.active_layers))
return self
def leader(state):
if state.leader_mode % 2 == 0:
state.keys_pressed.discard(Keycodes.KMK.KC_LEAD)
# All leader modes are one number higher when activating
state.leader_mode += 1
def _layer_tt(self, changed_key, is_pressed):
"""Momentarily activates layer if held, toggles it if tapped repeatedly"""
# TODO Make this work with tap dance to function more correctly, but technically works.
if is_pressed:
if self.start_time['tt'] is None:
# Sets the timer start and acts like MO otherwise
self.start_time['tt'] = ticks_ms()
return self.mo(changed_key, is_pressed)
elif ticks_diff(ticks_ms(), self.start_time['tt']) < self.tap_time:
self.start_time['tt'] = None
return self.tg(changed_key, is_pressed)
elif (
self.start_time['tt'] is None or
ticks_diff(ticks_ms(), self.start_time['tt']) >= self.tap_time
):
# On first press, works like MO. On second press, does nothing unless let up within
# time window, then acts like TG.
self.start_time['tt'] = None
return self.mo(changed_key, is_pressed)
return state
return self
def _kc_uc_mode(self, changed_key, is_pressed):
if is_pressed:
self.config.unicode_mode = changed_key.mode
return self
def _kc_macro(self, changed_key, is_pressed):
if is_pressed:
if changed_key.keyup:
self.macro_pending = changed_key.keyup
else:
if changed_key.keydown:
self.macro_pending = changed_key.keydown
return self
def _kc_lead(self, changed_key, is_pressed):
if is_pressed:
self._begin_leader_mode()
return self
def _kc_gesc(self, changed_key, is_pressed):
self.hid_pending = True
if is_pressed:
if GESC_TRIGGERS.intersection(self.keys_pressed):
# if Shift is held, KC_GRAVE will become KC_TILDE on OS level
self.keys_pressed.add(Keycodes.Common.KC_GRAVE)
return self
# else return KC_ESC
self.keys_pressed.add(Keycodes.Common.KC_ESCAPE)
return self
self.keys_pressed.discard(Keycodes.Common.KC_ESCAPE)
self.keys_pressed.discard(Keycodes.Common.KC_GRAVE)
return self
def _kc_no(self, changed_key, is_pressed):
return self
def _begin_leader_mode(self):
if self.leader_mode % 2 == 0:
self.keys_pressed.discard(Keycodes.KMK.KC_LEAD)
# All leader modes are one number higher when activating
self.leader_mode += 1
return self
def _process_leader_mode(self):
keys_pressed = self.keys_pressed
if self.leader_last_len and self.leader_mode_history:
history_set = set(self.leader_mode_history)
keys_pressed = keys_pressed - history_set
self.leader_last_len = len(self.keys_pressed)
for key in keys_pressed:
if key == Keycodes.Common.KC_ENT:
# Process the action and remove the extra KC.ENT that was added to get here
lmh = tuple(self.leader_mode_history)
if lmh in self.config.leader_dictionary:
self.macro_pending = self.config.leader_dictionary[lmh].keydown
self._exit_leader_mode()
break
elif key == Keycodes.Common.KC_ESC or key == Keycodes.KMK.KC_GESC:
# Clean self and turn leader mode off.
self._exit_leader_mode()
break
elif key == Keycodes.KMK.KC_LEAD:
break
else:
# Add key if not needing to escape
# This needs replaced later with a proper debounce
self.leader_mode_history.append(key)
self.hid_pending = False
return self
def _exit_leader_mode(self):
self.leader_mode_history.clear()
self.leader_mode -= 1
self.leader_last_len = 0
self.keys_pressed.clear()
return self