Merge pull request #32 from KMKfw/topic-perf-grind

Grind out as much performance as possible with lower-ish hanging fruit
This commit is contained in:
Josh Klar 2018-10-01 10:34:55 -07:00 committed by GitHub
commit 27d1ee8755
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 187 additions and 260 deletions

View File

@ -5,21 +5,5 @@ class AbstractMatrixScanner():
def __init__(self, cols, rows, active_layers, diode_orientation=DiodeOrientation.COLUMNS): def __init__(self, cols, rows, active_layers, diode_orientation=DiodeOrientation.COLUMNS):
raise NotImplementedError('Abstract implementation') raise NotImplementedError('Abstract implementation')
def _normalize_matrix(self, matrix): def scan_for_pressed(self):
'''
We always want to internally look at a keyboard as a list of rows,
where a "row" is a list of keycodes (columns).
This will convert DiodeOrientation.COLUMNS matrix scans into a
ROWS scan, so we never have to think about these things again.
'''
if self.diode_orientation == DiodeOrientation.ROWS:
return matrix
return [
[col[col_entry] for col in matrix]
for col_entry in range(max(len(col) for col in matrix))
]
def raw_scan(self):
raise NotImplementedError('Abstract implementation') raise NotImplementedError('Abstract implementation')

View File

@ -1,4 +1,5 @@
import logging import logging
from collections import namedtuple
from micropython import const from micropython import const
@ -16,31 +17,46 @@ MACRO_COMPLETE_EVENT = const(8)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
InitFirmware = namedtuple('InitFirmware', (
'type',
'keymap',
'row_pins',
'col_pins',
'diode_orientation',
'unicode_mode',
))
KeyUpDown = namedtuple('KeyUpDown', ('type', 'row', 'col'))
KeycodeUpDown = namedtuple('KeycodeUpDown', ('type', 'keycode'))
NewMatrix = namedtuple('NewMatrix', ('type', 'matrix'))
BareEvent = namedtuple('BareEvent', ('type',))
def init_firmware(keymap, row_pins, col_pins, diode_orientation, unicode_mode): def init_firmware(keymap, row_pins, col_pins, diode_orientation, unicode_mode):
return { return InitFirmware(
'type': INIT_FIRMWARE_EVENT, type=INIT_FIRMWARE_EVENT,
'keymap': keymap, keymap=keymap,
'row_pins': row_pins, row_pins=row_pins,
'col_pins': col_pins, col_pins=col_pins,
'diode_orientation': diode_orientation, diode_orientation=diode_orientation,
'unicode_mode': unicode_mode, unicode_mode=unicode_mode,
} )
def key_up_event(row, col): def key_up_event(row, col):
return { return KeyUpDown(
'type': KEY_UP_EVENT, type=KEY_UP_EVENT,
'row': row, row=row,
'col': col, col=col,
} )
def key_down_event(row, col): def key_down_event(row, col):
return { return KeyUpDown(
'type': KEY_DOWN_EVENT, type=KEY_DOWN_EVENT,
'row': row, row=row,
'col': col, col=col,
} )
def keycode_up_event(keycode): def keycode_up_event(keycode):
@ -48,10 +64,10 @@ def keycode_up_event(keycode):
Press a key by Keycode object, bypassing the keymap. Used mostly for Press a key by Keycode object, bypassing the keymap. Used mostly for
macros. macros.
''' '''
return { return KeycodeUpDown(
'type': KEYCODE_UP_EVENT, type=KEYCODE_UP_EVENT,
'keycode': keycode, keycode=keycode,
} )
def keycode_down_event(keycode): def keycode_down_event(keycode):
@ -59,80 +75,53 @@ def keycode_down_event(keycode):
Release a key by Keycode object, bypassing the keymap. Used mostly for Release a key by Keycode object, bypassing the keymap. Used mostly for
macros. macros.
''' '''
return { return KeycodeUpDown(
'type': KEYCODE_DOWN_EVENT, type=KEYCODE_DOWN_EVENT,
'keycode': keycode, keycode=keycode,
} )
def new_matrix_event(matrix): def new_matrix_event(matrix):
return { return NewMatrix(
'type': NEW_MATRIX_EVENT, type=NEW_MATRIX_EVENT,
'matrix': matrix, matrix=matrix,
} )
def hid_report_event(): def hid_report_event():
return { return BareEvent(
'type': HID_REPORT_EVENT, type=HID_REPORT_EVENT,
} )
def macro_complete_event(): def macro_complete_event():
return { return BareEvent(
'type': MACRO_COMPLETE_EVENT, type=MACRO_COMPLETE_EVENT,
} )
def matrix_changed(new_matrix): def matrix_changed(new_pressed):
def _key_pressed(dispatch, get_state): def _key_pressed(dispatch, get_state):
dispatch(new_matrix_event(new_pressed))
state = get_state() state = get_state()
# Temporarily preserve a reference to the old event
# We do fake Redux around here because microcontrollers
# aren't exactly RAM or CPU powerhouses - the state does
# mutate in place. Unfortunately this makes reasoning
# about code a bit messier and really hurts one of the
# selling points of Redux. Former development versions
# of KMK created new InternalState copies every single
# time the state changed, but it was sometimes slow.
old_matrix = state.matrix
old_keys_pressed = state.keys_pressed
dispatch(new_matrix_event(new_matrix)) if state.hid_pending:
dispatch(hid_report_event())
with get_state() as new_state: if Keycodes.KMK.KC_RESET in state.keys_pressed:
for ridx, row in enumerate(new_state.matrix): try:
for cidx, col in enumerate(row): import machine
if col != old_matrix[ridx][cidx]: machine.bootloader()
if col: except ImportError:
dispatch(key_down_event( logger.warning('Tried to reset to bootloader, but not supported on this chip?')
row=ridx,
col=cidx,
))
else:
dispatch(key_up_event(
row=ridx,
col=cidx,
))
with get_state() as new_state: if state.macro_pending:
if old_keys_pressed != new_state.keys_pressed: macro = state.macro_pending
dispatch(hid_report_event())
if Keycodes.KMK.KC_RESET in new_state.keys_pressed: for event in macro(state):
try: dispatch(event)
import machine
machine.bootloader()
except ImportError:
logger.warning('Tried to reset to bootloader, but not supported on this chip?')
with get_state() as new_state: dispatch(macro_complete_event())
if new_state.macro_pending:
macro = new_state.macro_pending
for event in macro(new_state):
dispatch(event)
dispatch(macro_complete_event())
return _key_pressed return _key_pressed

View File

@ -3,8 +3,13 @@ import logging
from kmk.common.event_defs import KEY_DOWN_EVENT, KEY_UP_EVENT from kmk.common.event_defs import KEY_DOWN_EVENT, KEY_UP_EVENT
from kmk.common.keycodes import Keycodes, RawKeycodes from kmk.common.keycodes import Keycodes, RawKeycodes
GESC_TRIGGERS = {
Keycodes.Modifiers.KC_LSHIFT, Keycodes.Modifiers.KC_RSHIFT,
Keycodes.Modifiers.KC_LGUI, Keycodes.Modifiers.KC_RGUI,
}
def process_internal_key_event(state, action, changed_key, logger=None):
def process_internal_key_event(state, action_type, changed_key, logger=None):
if logger is None: if logger is None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,71 +19,54 @@ def process_internal_key_event(state, action, changed_key, logger=None):
# objects # objects
if changed_key.code == RawKeycodes.KC_DF: if changed_key.code == RawKeycodes.KC_DF:
return df(state, action, changed_key, logger=logger) return df(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_MO: elif changed_key.code == RawKeycodes.KC_MO:
return mo(state, action, changed_key, logger=logger) return mo(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_TG: elif changed_key.code == RawKeycodes.KC_TG:
return tg(state, action, changed_key, logger=logger) return tg(state, action_type, changed_key, logger=logger)
elif changed_key.code == RawKeycodes.KC_TO: elif changed_key.code == RawKeycodes.KC_TO:
return to(state, action, changed_key, logger=logger) return to(state, action_type, changed_key, logger=logger)
elif changed_key.code == Keycodes.KMK.KC_GESC.code: elif changed_key.code == Keycodes.KMK.KC_GESC.code:
return grave_escape(state, action, logger=logger) return grave_escape(state, action_type, logger=logger)
elif changed_key.code == RawKeycodes.KC_UC_MODE: elif changed_key.code == RawKeycodes.KC_UC_MODE:
return unicode_mode(state, action, changed_key, logger=logger) return unicode_mode(state, action_type, changed_key, logger=logger)
else: else:
return state return state
def grave_escape(state, action, logger): def grave_escape(state, action_type, logger):
if action['type'] == KEY_DOWN_EVENT: if action_type == KEY_DOWN_EVENT:
for key in state.keys_pressed: if any(key in GESC_TRIGGERS for key in state.keys_pressed):
if key in {Keycodes.Modifiers.KC_LSHIFT, Keycodes.Modifiers.KC_RSHIFT}: # if Shift is held, KC_GRAVE will become KC_TILDE on OS level
# if Shift is held, return KC_GRAVE which will become KC_TILDE on OS level state.keys_pressed.add(Keycodes.Common.KC_GRAVE)
return state.update( return state
keys_pressed=(
state.keys_pressed | {Keycodes.Common.KC_GRAVE}
),
)
elif key in {Keycodes.Modifiers.KC_LGUI, Keycodes.Modifiers.KC_RGUI}:
# if GUI is held, return KC_GRAVE
return state.update(
keys_pressed=(
state.keys_pressed | {Keycodes.Common.KC_GRAVE}
),
)
# else return KC_ESC # else return KC_ESC
return state.update( state.keys_pressed.add(Keycodes.Common.KC_ESCAPE)
keys_pressed=( return state
state.keys_pressed | {Keycodes.Common.KC_ESCAPE}
),
)
elif action['type'] == KEY_UP_EVENT: elif action_type == KEY_UP_EVENT:
return state.update( state.keys_pressed.discard(Keycodes.Common.KC_ESCAPE)
keys_pressed=frozenset( state.keys_pressed.discard(Keycodes.Common.KC_GRAVE)
key for key in state.keys_pressed return state
if key not in {Keycodes.Common.KC_ESCAPE, Keycodes.Common.KC_GRAVE}
),
)
def df(state, action, changed_key, logger): def df(state, action_type, changed_key, logger):
"""Switches the default layer""" """Switches the default layer"""
if action['type'] == KEY_DOWN_EVENT: if action_type == KEY_DOWN_EVENT:
state.active_layers[0] = changed_key.layer state.active_layers[0] = changed_key.layer
return state return state
def mo(state, action, changed_key, logger): def mo(state, action_type, changed_key, logger):
"""Momentarily activates layer, switches off when you let go""" """Momentarily activates layer, switches off when you let go"""
if action['type'] == KEY_UP_EVENT: if action_type == KEY_UP_EVENT:
state.active_layers = [ state.active_layers = [
layer for layer in state.active_layers layer for layer in state.active_layers
if layer != changed_key.layer if layer != changed_key.layer
] ]
elif action['type'] == KEY_DOWN_EVENT: elif action_type == KEY_DOWN_EVENT:
state.active_layers.append(changed_key.layer) state.active_layers.append(changed_key.layer)
return state return state
@ -92,9 +80,9 @@ def lt(layer, kc):
"""Momentarily activates layer if held, sends kc if tapped""" """Momentarily activates layer if held, sends kc if tapped"""
def tg(state, action, changed_key, logger): def tg(state, action_type, changed_key, logger):
"""Toggles the layer (enables it if not active, and vise versa)""" """Toggles the layer (enables it if not active, and vise versa)"""
if action['type'] == KEY_DOWN_EVENT: if action_type == KEY_DOWN_EVENT:
if changed_key.layer in state.active_layers: if changed_key.layer in state.active_layers:
state.active_layers = [ state.active_layers = [
layer for layer in state.active_layers layer for layer in state.active_layers
@ -106,9 +94,9 @@ def tg(state, action, changed_key, logger):
return state return state
def to(state, action, changed_key, logger): def to(state, action_type, changed_key, logger):
"""Activates layer and deactivates all other layers""" """Activates layer and deactivates all other layers"""
if action['type'] == KEY_DOWN_EVENT: if action_type == KEY_DOWN_EVENT:
state.active_layers = [changed_key.layer] state.active_layers = [changed_key.layer]
return state return state
@ -118,8 +106,8 @@ def tt(layer):
"""Momentarily activates layer if held, toggles it if tapped repeatedly""" """Momentarily activates layer if held, toggles it if tapped repeatedly"""
def unicode_mode(state, action, changed_key, logger): def unicode_mode(state, action_type, changed_key, logger):
if action['type'] == KEY_DOWN_EVENT: if action_type == KEY_DOWN_EVENT:
state.unicode_mode = changed_key.mode state.unicode_mode = changed_key.mode
return state return state

View File

@ -26,9 +26,9 @@ class ReduxStore:
self.logger.debug('Finished thunk') self.logger.debug('Finished thunk')
return None return None
self.logger.debug('Dispatching action: Type {} >> {}'.format(action['type'], action)) self.logger.debug('Dispatching action: Type {} >> {}'.format(action.type, action))
self.state = self.reducer(self.state, action, logger=self.logger) self.state = self.reducer(self.state, action, logger=self.logger)
self.logger.debug('Dispatching complete: Type {}'.format(action['type'])) self.logger.debug('Dispatching complete: Type {}'.format(action.type))
self.logger.debug('New state: {}'.format(self.state)) self.logger.debug('New state: {}'.format(self.state))
@ -52,9 +52,9 @@ class ReduxStore:
class InternalState: class InternalState:
modifiers_pressed = frozenset() keys_pressed = set()
keys_pressed = frozenset()
macro_pending = None macro_pending = None
hid_pending = False
unicode_mode = UnicodeModes.NOOP unicode_mode = UnicodeModes.NOOP
keymap = [] keymap = []
row_pins = [] row_pins = []
@ -76,7 +76,6 @@ class InternalState:
def to_dict(self, verbose=False): def to_dict(self, verbose=False):
ret = { ret = {
'keys_pressed': self.keys_pressed, 'keys_pressed': self.keys_pressed,
'modifiers_pressed': self.modifiers_pressed,
'active_layers': self.active_layers, 'active_layers': self.active_layers,
'unicode_mode': self.unicode_mode, 'unicode_mode': self.unicode_mode,
} }
@ -133,106 +132,80 @@ def kmk_reducer(state=None, action=None, logger=None):
return state return state
if action['type'] == NEW_MATRIX_EVENT: if action.type == NEW_MATRIX_EVENT:
return state.update( matrix_keys_pressed = {
matrix=action['matrix'], find_key_in_map(state, row, col)
) for row, col in action.matrix
}
if action['type'] == KEYCODE_UP_EVENT: pressed = matrix_keys_pressed - state.keys_pressed
return state.update( released = state.keys_pressed - matrix_keys_pressed
keys_pressed=frozenset(
key for key in state.keys_pressed if key != action['keycode']
),
)
if action['type'] == KEYCODE_DOWN_EVENT: if not pressed and not released:
return state.update(
keys_pressed=(
state.keys_pressed | {action['keycode']}
),
)
if action['type'] == KEY_UP_EVENT:
row = action['row']
col = action['col']
changed_key = find_key_in_map(state, row, col)
logger.debug('Detected change to key: {}'.format(changed_key))
if not changed_key:
return state return state
if isinstance(changed_key, KMKMacro): for changed_key in released:
if changed_key.keyup: if not changed_key:
return state.update( continue
macro_pending=changed_key.keyup,
elif isinstance(changed_key, KMKMacro):
if changed_key.keyup:
state.macro_pending = changed_key.keyup
elif changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
state = process_internal_key_event(state, KEY_UP_EVENT, changed_key, logger=logger)
for changed_key in pressed:
if not changed_key:
continue
elif isinstance(changed_key, KMKMacro):
if changed_key.keydown:
state.macro_pending = changed_key.keydown
elif changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
state = process_internal_key_event(
state,
KEY_DOWN_EVENT,
changed_key,
logger=logger,
) )
return state state.matrix = action.matrix
state.keys_pressed |= pressed
state.keys_pressed -= released
state.hid_pending = True
newstate = state.update( return state
keys_pressed=frozenset(
key for key in state.keys_pressed if key != changed_key
),
)
if changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE: if action.type == KEYCODE_UP_EVENT:
return process_internal_key_event(newstate, action, changed_key, logger=logger) state.keys_pressed.discard(action.keycode)
state.hid_pending = True
return state
return newstate if action.type == KEYCODE_DOWN_EVENT:
state.keys_pressed.add(action.keycode)
state.hid_pending = True
return state
if action['type'] == KEY_DOWN_EVENT: if action.type == INIT_FIRMWARE_EVENT:
row = action['row']
col = action['col']
changed_key = find_key_in_map(state, row, col)
logger.debug('Detected change to key: {}'.format(changed_key))
if not changed_key:
return state
if isinstance(changed_key, KMKMacro):
if changed_key.keydown:
return state.update(
macro_pending=changed_key.keydown,
)
return state
newstate = state.update(
keys_pressed=(
state.keys_pressed | {changed_key}
),
)
if changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
return process_internal_key_event(newstate, action, changed_key, logger=logger)
return newstate
if action['type'] == INIT_FIRMWARE_EVENT:
return state.update( return state.update(
keymap=action['keymap'], keymap=action.keymap,
row_pins=action['row_pins'], row_pins=action.row_pins,
col_pins=action['col_pins'], col_pins=action.col_pins,
diode_orientation=action['diode_orientation'], diode_orientation=action.diode_orientation,
unicode_mode=action['unicode_mode'], unicode_mode=action.unicode_mode,
matrix=[
[False for c in action['col_pins']]
for r in action['row_pins']
],
) )
# HID events are non-mutating, used exclusively for listeners to know # HID events are non-mutating, used exclusively for listeners to know
# they should be doing things. This could/should arguably be folded back # they should be doing things. This could/should arguably be folded back
# into KEY_UP_EVENT and KEY_DOWN_EVENT, but for now it's nice to separate # into KEY_UP_EVENT and KEY_DOWN_EVENT, but for now it's nice to separate
# this out for debugging's sake. # this out for debugging's sake.
if action['type'] == HID_REPORT_EVENT: if action.type == HID_REPORT_EVENT:
state.hid_pending = False
return state return state
if action['type'] == MACRO_COMPLETE_EVENT: if action.type == MACRO_COMPLETE_EVENT:
return state.update(macro_pending=None) return state.update(macro_pending=None)
# On unhandled events, log and do not mutate state # On unhandled events, log and do not mutate state

View File

@ -18,7 +18,8 @@ class Firmware:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(log_level) logger.setLevel(log_level)
self.cached_state = None self.hydrated = False
self.store = ReduxStore(kmk_reducer, log_level=log_level) self.store = ReduxStore(kmk_reducer, log_level=log_level)
self.store.subscribe( self.store.subscribe(
lambda state, action: self._subscription(state, action), lambda state, action: self._subscription(state, action),
@ -41,20 +42,17 @@ class Firmware:
)) ))
def _subscription(self, state, action): def _subscription(self, state, action):
if self.cached_state is None or any( if not self.hydrated:
getattr(self.cached_state, k) != getattr(state, k)
for k in state.__dict__.keys()
):
self.matrix = MatrixScanner( self.matrix = MatrixScanner(
state.col_pins, state.col_pins,
state.row_pins, state.row_pins,
state.diode_orientation, state.diode_orientation,
) )
self.hydrated = True
def go(self): def go(self):
while True: while True:
state = self.store.get_state() update = self.matrix.scan_for_pressed()
update = self.matrix.scan_for_changes(state.matrix)
if update: if update:
self.store.dispatch(update) self.store.dispatch(update)

View File

@ -21,6 +21,7 @@ class MatrixScanner(AbstractMatrixScanner):
self.rows = [machine.Pin(pin) for pin in rows] self.rows = [machine.Pin(pin) for pin in rows]
self.diode_orientation = diode_orientation self.diode_orientation = diode_orientation
self.active_layers = active_layers self.active_layers = active_layers
self.last_pressed_len = 0
if self.diode_orientation == DiodeOrientation.COLUMNS: if self.diode_orientation == DiodeOrientation.COLUMNS:
self.outputs = self.cols self.outputs = self.cols
@ -41,29 +42,22 @@ class MatrixScanner(AbstractMatrixScanner):
pin.init(machine.Pin.IN, machine.Pin.PULL_DOWN) pin.init(machine.Pin.IN, machine.Pin.PULL_DOWN)
pin.off() pin.off()
def _normalize_matrix(self, matrix): def scan_for_pressed(self):
return super()._normalize_matrix(matrix) pressed = []
def raw_scan(self): for oidx, opin in enumerate(self.outputs):
matrix = []
for opin in self.outputs:
opin.value(1) opin.value(1)
matrix.append([bool(ipin.value()) for ipin in self.inputs])
for iidx, ipin in enumerate(self.inputs):
if ipin.value():
pressed.append(
(oidx, iidx) if self.diode_orientation == DiodeOrientation.ROWS else (iidx, oidx) # noqa
)
opin.value(0) opin.value(0)
return self._normalize_matrix(matrix) if len(pressed) != self.last_pressed_len:
self.last_pressed_len = len(pressed)
def scan_for_changes(self, old_matrix): return matrix_changed(pressed)
matrix = self.raw_scan()
if any(
any(
col != old_matrix[ridx][cidx]
for cidx, col in enumerate(row)
)
for ridx, row in enumerate(matrix)
):
return matrix_changed(matrix)
return None # The default, but for explicitness return None # The default, but for explicitness

View File

@ -6,6 +6,7 @@ from kmk.common.consts import HID_REPORT_STRUCTURE, HIDReportTypes
from kmk.common.event_defs import HID_REPORT_EVENT from kmk.common.event_defs import HID_REPORT_EVENT
from kmk.common.keycodes import (FIRST_KMK_INTERNAL_KEYCODE, ConsumerKeycode, from kmk.common.keycodes import (FIRST_KMK_INTERNAL_KEYCODE, ConsumerKeycode,
ModifierKeycode) ModifierKeycode)
from kmk.common.macros import KMKMacro
def generate_pyb_hid_descriptor(): def generate_pyb_hid_descriptor():
@ -43,7 +44,7 @@ class HIDHelper:
self.report_non_mods = memoryview(self._evt)[3:] self.report_non_mods = memoryview(self._evt)[3:]
def _subscription(self, state, action): def _subscription(self, state, action):
if action['type'] == HID_REPORT_EVENT: if action.type == HID_REPORT_EVENT:
self.clear_all() self.clear_all()
consumer_key = None consumer_key = None
@ -71,7 +72,7 @@ class HIDHelper:
self.add_key(consumer_key) self.add_key(consumer_key)
else: else:
for key in state.keys_pressed: for key in state.keys_pressed:
if key.code >= FIRST_KMK_INTERNAL_KEYCODE: if isinstance(key, KMKMacro) or key.code >= FIRST_KMK_INTERNAL_KEYCODE:
continue continue
if isinstance(key, ModifierKeycode): if isinstance(key, ModifierKeycode):
@ -98,7 +99,7 @@ class HIDHelper:
# It'd be real awesome if pyb.USB_HID.send/recv would support # It'd be real awesome if pyb.USB_HID.send/recv would support
# uselect.poll or uselect.select to more safely determine when # uselect.poll or uselect.select to more safely determine when
# it is safe to write to the host again... # it is safe to write to the host again...
delay(10) delay(5)
return self return self