ffa81bcf43
This does a bunch of crazy stuff: - The ability to set a unicode mode (right now only Linux+ibus or MacOS-RALT) in the keymap. This will be changeable at runtime soon, to allow a single keyboard to be able to send table flips and whatever other crazy stuff on any OS the board is plugged into (something that's not currently doable on QMK, so yay us?) - As part of the above, there is now just one user-facing macro for unicode codepoint submission, `kmk.common.macros.unicode.unicode_sequence`. Users should never use the platform-specific macros, partly because they just outright won't work. There's all sorts of fun stuff in these methods now, thank goodness MicroPython supports the `yield from` construct. - Keycode (these should really be renamed Keysym or something) objects that are intended to not be pressed, or not be released. Right now these properties are completely ignored if not part of a macro, and it's probably sane to keep it that way. This was necessary to support MacOS's "hold RALT while typing the codepoint characters" flow. - Other refactor-y bits, like moving macro support to `kmk/common` rather than sitting at the top level of the tree. One day `kmk/common` may make sense to surface at top level `kmk/`, but that's a discussion for another day.
241 lines
7.2 KiB
Python
241 lines
7.2 KiB
Python
import logging
|
|
import sys
|
|
|
|
from kmk.common.consts import DiodeOrientation, UnicodeModes
|
|
from kmk.common.event_defs import (HID_REPORT_EVENT, INIT_FIRMWARE_EVENT,
|
|
KEY_DOWN_EVENT, KEY_UP_EVENT,
|
|
KEYCODE_DOWN_EVENT, KEYCODE_UP_EVENT,
|
|
MACRO_COMPLETE_EVENT, NEW_MATRIX_EVENT)
|
|
from kmk.common.internal_keycodes import process_internal_key_event
|
|
from kmk.common.keycodes import FIRST_KMK_INTERNAL_KEYCODE, Keycodes
|
|
from kmk.common.macros import KMKMacro
|
|
|
|
|
|
class ReduxStore:
|
|
def __init__(self, reducer, log_level=logging.NOTSET):
|
|
self.reducer = reducer
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.setLevel(log_level)
|
|
self.state = self.reducer(logger=self.logger)
|
|
self.callbacks = []
|
|
|
|
def dispatch(self, action):
|
|
if callable(action):
|
|
self.logger.debug('Received thunk')
|
|
action(self.dispatch, self.get_state)
|
|
self.logger.debug('Finished thunk')
|
|
return None
|
|
|
|
self.logger.debug('Dispatching action: Type {} >> {}'.format(action['type'], action))
|
|
self.state = self.reducer(self.state, action, logger=self.logger)
|
|
self.logger.debug('Dispatching complete: Type {}'.format(action['type']))
|
|
|
|
self.logger.debug('New state: {}'.format(self.state))
|
|
|
|
for cb in self.callbacks:
|
|
if cb is not None:
|
|
try:
|
|
cb(self.state, action)
|
|
except Exception as e:
|
|
self.logger.error('Callback failed, moving on')
|
|
print(sys.print_exception(e), file=sys.stderr)
|
|
|
|
def get_state(self):
|
|
return self.state
|
|
|
|
def subscribe(self, callback):
|
|
self.callbacks.append(callback)
|
|
return len(self.callbacks) - 1
|
|
|
|
def unsubscribe(self, idx):
|
|
self.callbacks[idx] = None
|
|
|
|
|
|
class InternalState:
|
|
modifiers_pressed = frozenset()
|
|
keys_pressed = frozenset()
|
|
macro_pending = None
|
|
unicode_mode = UnicodeModes.NOOP
|
|
keymap = []
|
|
row_pins = []
|
|
col_pins = []
|
|
matrix = []
|
|
diode_orientation = DiodeOrientation.COLUMNS
|
|
active_layers = [0]
|
|
_oldstates = []
|
|
|
|
def __init__(self, preserve_intermediate_states=False):
|
|
self.preserve_intermediate_states = preserve_intermediate_states
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
pass
|
|
|
|
def to_dict(self, verbose=False):
|
|
ret = {
|
|
'keys_pressed': self.keys_pressed,
|
|
'modifiers_pressed': self.modifiers_pressed,
|
|
'active_layers': self.active_layers,
|
|
'unicode_mode': self.unicode_mode,
|
|
}
|
|
|
|
if verbose:
|
|
ret.update({
|
|
'keymap': self.keymap,
|
|
'matrix': self.matrix,
|
|
'col_pins': self.col_pins,
|
|
'row_pins': self.row_pins,
|
|
'diode_orientation': self.diode_orientation,
|
|
})
|
|
|
|
return ret
|
|
|
|
def __repr__(self):
|
|
return 'InternalState({})'.format(self.to_dict())
|
|
|
|
def update(self, **kwargs):
|
|
if self.preserve_intermediate_states:
|
|
self._oldstates.append(repr(self.to_dict(verbose=True)))
|
|
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
return self
|
|
|
|
|
|
def find_key_in_map(state, row, col):
|
|
# Later-added layers have priority. Sift through the layers
|
|
# in reverse order until we find a valid keycode object
|
|
for layer in reversed(state.active_layers):
|
|
layer_key = state.keymap[layer][row][col]
|
|
|
|
if not layer_key or layer_key == Keycodes.KMK.KC_TRNS:
|
|
continue
|
|
|
|
if layer_key == Keycodes.KMK.KC_NO:
|
|
break
|
|
|
|
return layer_key
|
|
|
|
|
|
def kmk_reducer(state=None, action=None, logger=None):
|
|
if state is None:
|
|
state = InternalState()
|
|
|
|
if logger is not None:
|
|
logger.debug('Reducer received state of None, creating new')
|
|
|
|
if action is None:
|
|
if logger is not None:
|
|
logger.debug('No action received, returning state unmodified')
|
|
|
|
return state
|
|
|
|
if action['type'] == NEW_MATRIX_EVENT:
|
|
return state.update(
|
|
matrix=action['matrix'],
|
|
)
|
|
|
|
if action['type'] == KEYCODE_UP_EVENT:
|
|
return state.update(
|
|
keys_pressed=frozenset(
|
|
key for key in state.keys_pressed if key != action['keycode']
|
|
),
|
|
)
|
|
|
|
if action['type'] == KEYCODE_DOWN_EVENT:
|
|
return state.update(
|
|
keys_pressed=(
|
|
state.keys_pressed | {action['keycode']}
|
|
),
|
|
)
|
|
|
|
if action['type'] == KEY_UP_EVENT:
|
|
row = action['row']
|
|
col = action['col']
|
|
|
|
changed_key = find_key_in_map(state, row, col)
|
|
|
|
logger.debug('Detected change to key: {}'.format(changed_key))
|
|
|
|
if not changed_key:
|
|
return state
|
|
|
|
if isinstance(changed_key, KMKMacro):
|
|
if changed_key.keyup:
|
|
return state.update(
|
|
macro_pending=changed_key.keyup,
|
|
)
|
|
|
|
return state
|
|
|
|
newstate = state.update(
|
|
keys_pressed=frozenset(
|
|
key for key in state.keys_pressed if key != changed_key
|
|
),
|
|
)
|
|
|
|
if changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
|
|
return process_internal_key_event(newstate, action, changed_key, logger=logger)
|
|
|
|
return newstate
|
|
|
|
if action['type'] == KEY_DOWN_EVENT:
|
|
row = action['row']
|
|
col = action['col']
|
|
|
|
changed_key = find_key_in_map(state, row, col)
|
|
|
|
logger.debug('Detected change to key: {}'.format(changed_key))
|
|
|
|
if not changed_key:
|
|
return state
|
|
|
|
if isinstance(changed_key, KMKMacro):
|
|
if changed_key.keydown:
|
|
return state.update(
|
|
macro_pending=changed_key.keydown,
|
|
)
|
|
|
|
return state
|
|
|
|
newstate = state.update(
|
|
keys_pressed=(
|
|
state.keys_pressed | {changed_key}
|
|
),
|
|
)
|
|
|
|
if changed_key.code >= FIRST_KMK_INTERNAL_KEYCODE:
|
|
return process_internal_key_event(newstate, action, changed_key, logger=logger)
|
|
|
|
return newstate
|
|
|
|
if action['type'] == INIT_FIRMWARE_EVENT:
|
|
return state.update(
|
|
keymap=action['keymap'],
|
|
row_pins=action['row_pins'],
|
|
col_pins=action['col_pins'],
|
|
diode_orientation=action['diode_orientation'],
|
|
unicode_mode=action['unicode_mode'],
|
|
matrix=[
|
|
[False for c in action['col_pins']]
|
|
for r in action['row_pins']
|
|
],
|
|
)
|
|
|
|
# HID events are non-mutating, used exclusively for listeners to know
|
|
# they should be doing things. This could/should arguably be folded back
|
|
# into KEY_UP_EVENT and KEY_DOWN_EVENT, but for now it's nice to separate
|
|
# this out for debugging's sake.
|
|
if action['type'] == HID_REPORT_EVENT:
|
|
return state
|
|
|
|
if action['type'] == MACRO_COMPLETE_EVENT:
|
|
return state.update(macro_pending=None)
|
|
|
|
# On unhandled events, log and do not mutate state
|
|
logger.warning('Unhandled event! Returning state unmodified.')
|
|
return state
|