fix pystack exhaust during resume_process_key.

Instead of handling resumed key events in a deep stack, buffer them
until the next main loop iteration. New resume events that may be emitted
during handling of old resumes are prepended to that buffer, i.e. take
precedence over events that happen deeper into the buffer/event stack.
Logical replay order is thus preserved.
This commit is contained in:
xs5871 2022-10-08 19:56:22 +00:00 committed by Kyle Brown
parent 0fbba96026
commit 17f2961c0b
9 changed files with 94 additions and 61 deletions

View File

@ -5,6 +5,7 @@ except ImportError:
from supervisor import ticks_ms
from collections import namedtuple
from keypad import Event as KeyEvent
from kmk.consts import UnicodeMode
@ -17,6 +18,10 @@ from kmk.utils import Debug
debug = Debug(__name__)
KeyBufferFrame = namedtuple(
'KeyBufferFrame', ('key', 'is_pressed', 'int_coord', 'index')
)
class Sandbox:
matrix_update = None
@ -59,6 +64,8 @@ class KMKKeyboard:
i2c_deinit_count = 0
_go_args = None
_processing_timeouts = False
_resume_buffer = []
_resume_buffer_x = []
# this should almost always be PREpended to, replaces
# former use of reversed_active_layers which had pointless
@ -158,6 +165,47 @@ class KMKKeyboard:
self.pre_process_key(key, is_pressed, int_coord)
def _process_resume_buffer(self):
'''
Resume the processing of buffered, delayed, deferred, etc. key events
emitted by modules.
We use a copy of the `_resume_buffer` as a working buffer. The working
buffer holds all key events in the correct order for processing. If
during processing new events are pushed to the `_resume_buffer`, they
are prepended to the working buffer (which may not be emptied), in
order to preserve key event order.
We also double-buffer `_resume_buffer` with `_resume_buffer_x`, only
copying the reference to hopefully safe some time on allocations.
'''
buffer, self._resume_buffer = self._resume_buffer, self._resume_buffer_x
while buffer:
ksf = buffer.pop(0)
key = ksf.key
# Handle any unaccounted-for layer shifts by looking up the key resolution again.
if ksf.int_coord in self._coordkeys_pressed.keys():
key = self._find_key_in_map(ksf.int_coord)
# Resume the processing of the key event and update the HID report
# when applicable.
self.pre_process_key(key, ksf.is_pressed, ksf.int_coord, ksf.index)
if self.hid_pending:
self._send_hid()
self.hid_pending = False
# Any newly buffered key events must be prepended to the working
# buffer.
if self._resume_buffer:
self._resume_buffer.extend(buffer)
buffer.clear()
buffer, self._resume_buffer = self._resume_buffer, buffer
self._resume_buffer_x = buffer
@property
def debug_enabled(self) -> bool:
return debug.enabled
@ -211,7 +259,10 @@ class KMKKeyboard:
int_coord: Optional[int] = None,
) -> None:
index = self.modules.index(module) + 1
self.pre_process_key(key, is_pressed, int_coord, index)
ksf = KeyBufferFrame(
key=key, is_pressed=is_pressed, int_coord=int_coord, index=index
)
self._resume_buffer.append(ksf)
def remove_key(self, keycode: Key) -> None:
self.keys_pressed.discard(keycode)
@ -476,6 +527,8 @@ class KMKKeyboard:
self.before_matrix_scan()
self._process_resume_buffer()
for matrix in self.matrix:
update = matrix.scan_for_changes()
if update:

View File

@ -191,10 +191,11 @@ class Combos(Module):
)
else:
# There's no matching combo: send and reset key buffer
self.send_key_buffer(keyboard)
self._key_buffer = []
if int_coord is not None:
key = keyboard._find_key_in_map(int_coord)
if self._key_buffer:
self._key_buffer.append((int_coord, key, True))
self.send_key_buffer(keyboard)
self._key_buffer = []
key = None
return key
@ -251,8 +252,10 @@ class Combos(Module):
elif len(combo._remaining) == len(combo.match) - 1:
self.reset_combo(keyboard, combo)
if not self.count_matching():
self._key_buffer.append((int_coord, key, False))
self.send_key_buffer(keyboard)
self._key_buffer = []
key = None
# Anything between first and last key released.
else:
@ -295,17 +298,7 @@ class Combos(Module):
def send_key_buffer(self, keyboard):
for (int_coord, key, is_pressed) in self._key_buffer:
new_key = None
if not is_pressed:
try:
new_key = keyboard._coordkeys_pressed[int_coord]
except KeyError:
new_key = None
if new_key is None:
new_key = keyboard._find_key_in_map(int_coord)
keyboard.resume_process_key(self, new_key, is_pressed, int_coord)
keyboard._send_hid()
keyboard.resume_process_key(self, key, is_pressed, int_coord)
def activate(self, keyboard, combo):
combo.result.on_press(keyboard)

View File

@ -86,12 +86,8 @@ class HoldTap(Module):
self.ht_activate_on_interrupt(
key, keyboard, *state.args, **state.kwargs
)
keyboard._send_hid()
send_buffer = True
if state.activated == ActivationType.INTERRUPTED:
current_key = keyboard._find_key_in_map(int_coord)
# if interrupt on release: store interrupting keys until one of them
# is released.
if (
@ -104,10 +100,13 @@ class HoldTap(Module):
# apply changes with 'side-effects' on key_states or the loop behaviour
# outside the loop.
if append_buffer:
self.key_buffer.append((int_coord, current_key))
self.key_buffer.append((int_coord, current_key, is_pressed))
current_key = None
elif send_buffer:
self.send_key_buffer(keyboard)
keyboard.resume_process_key(self, current_key, is_pressed, int_coord)
current_key = None
return current_key
@ -166,7 +165,7 @@ class HoldTap(Module):
elif state.activated == ActivationType.PRESSED:
# press and release tap because key released within tap time
self.ht_activate_tap(key, keyboard, *args, **kwargs)
keyboard._send_hid()
self.send_key_buffer(keyboard)
self.ht_deactivate_tap(key, keyboard, *args, **kwargs)
state.activated = ActivationType.RELEASED
self.send_key_buffer(keyboard)
@ -210,11 +209,9 @@ class HoldTap(Module):
if not self.key_buffer:
return
for (int_coord, key) in self.key_buffer:
new_key = keyboard._find_key_in_map(int_coord)
keyboard.resume_process_key(self, new_key, True, int_coord)
for (int_coord, key, is_pressed) in self.key_buffer:
keyboard.resume_process_key(self, key, is_pressed, int_coord)
keyboard._send_hid()
self.key_buffer.clear()
def ht_activate_hold(self, key, keyboard, *args, **kwargs):
@ -225,24 +222,17 @@ class HoldTap(Module):
def ht_deactivate_hold(self, key, keyboard, *args, **kwargs):
if debug.enabled:
debug('ht_deactivate_hold')
keyboard.set_timeout(
False, lambda: keyboard.resume_process_key(self, key.meta.hold, False)
)
keyboard.resume_process_key(self, key.meta.hold, False)
def ht_activate_tap(self, key, keyboard, *args, **kwargs):
if debug.enabled:
debug('ht_activate_tap')
keyboard.resume_process_key(self, key.meta.tap, True)
def ht_deactivate_tap(self, key, keyboard, *args, delayed=True, **kwargs):
def ht_deactivate_tap(self, key, keyboard, *args, **kwargs):
if debug.enabled:
debug('ht_deactivate_tap')
if delayed:
keyboard.set_timeout(
False, lambda: keyboard.resume_process_key(self, key.meta.tap, False)
)
else:
keyboard.resume_process_key(self, key.meta.tap, False)
keyboard.resume_process_key(self, key.meta.tap, False)
def ht_activate_on_interrupt(self, key, keyboard, *args, **kwargs):
if debug.enabled:
@ -258,4 +248,4 @@ class HoldTap(Module):
if key.meta.prefer_hold:
self.ht_deactivate_hold(key, keyboard, *args, **kwargs)
else:
self.ht_deactivate_tap(key, keyboard, *args, delayed=False, **kwargs)
self.ht_deactivate_tap(key, keyboard, *args, **kwargs)

View File

@ -1,6 +1,6 @@
'''One layer isn't enough. Adds keys to get to more of them'''
from kmk.keys import KC, make_argumented_key
from kmk.modules.holdtap import ActivationType, HoldTap, HoldTapKeyMeta
from kmk.modules.holdtap import HoldTap, HoldTapKeyMeta
from kmk.utils import Debug
debug = Debug(__name__)
@ -73,20 +73,6 @@ class Layers(HoldTap):
on_release=self.ht_released,
)
def process_key(self, keyboard, key, is_pressed, int_coord):
current_key = super().process_key(keyboard, key, is_pressed, int_coord)
for key, state in self.key_states.items():
if key == current_key:
continue
# on interrupt: key must be translated here, because it was asigned
# before the layer shift happend.
if state.activated == ActivationType.INTERRUPTED:
current_key = keyboard._find_key_in_map(int_coord)
return current_key
def _df_pressed(self, key, keyboard, *args, **kwargs):
'''
Switches the default layer

View File

@ -29,7 +29,13 @@ class OneShot(HoldTap):
elif state.activated == ActivationType.RELEASED and is_pressed:
state.activated = ActivationType.INTERRUPTED
elif state.activated == ActivationType.INTERRUPTED:
self.ht_released(key, keyboard)
if is_pressed:
keyboard.remove_key(key.meta.tap)
self.key_buffer.append((int_coord, current_key, is_pressed))
keyboard.set_timeout(False, lambda: self.send_key_buffer(keyboard))
current_key = None
else:
self.ht_released(key, keyboard)
return current_key
@ -37,6 +43,7 @@ class OneShot(HoldTap):
'''Register HoldTap mechanism and activate os key.'''
self.ht_pressed(key, keyboard, *args, **kwargs)
self.ht_activate_tap(key, keyboard, *args, **kwargs)
self.send_key_buffer(keyboard)
return keyboard
def osk_released(self, key, keyboard, *args, **kwargs):

View File

@ -48,8 +48,11 @@ class TapDance(HoldTap):
if state.activated == ActivationType.RELEASED:
keyboard.cancel_timeout(state.timeout_key)
self.ht_activate_tap(_key, keyboard)
keyboard._send_hid()
self.ht_deactivate_tap(_key, keyboard, delayed=False)
self.send_key_buffer(keyboard)
self.ht_deactivate_tap(_key, keyboard)
keyboard.resume_process_key(self, key, is_pressed, int_coord)
key = None
del self.key_states[_key]
del self.td_counts[state.tap_dance]
@ -114,6 +117,6 @@ class TapDance(HoldTap):
state = self.key_states[key]
if state.activated == ActivationType.RELEASED:
self.ht_activate_tap(key, keyboard, *args, **kwargs)
keyboard._send_hid()
self.send_key_buffer(keyboard)
del self.td_counts[state.tap_dance]
super().on_tap_time_expired(key, keyboard, *args, **kwargs)

View File

@ -74,6 +74,7 @@ class KeyboardTest:
is_pressed = e[1]
self.pins[key_pos].value = is_pressed
self.do_main_loop()
self.keyboard._main_loop()
matching = True
for i in range(max(len(hid_reports), len(assert_reports))):

View File

@ -300,25 +300,25 @@ class TestHoldTap(unittest.TestCase):
keyboard.test(
'OS interrupt within tap time',
[(4, True), (4, False), t_within, (3, True), (3, False)],
[{KC.E}, {KC.D, KC.E}, {}],
[{KC.E}, {KC.D, KC.E}, {KC.E}, {}],
)
keyboard.test(
'OS interrupt, multiple within tap time',
[(4, True), (4, False), (3, True), (3, False), (2, True), (2, False)],
[{KC.E}, {KC.D, KC.E}, {}, {KC.C}, {}],
[{KC.E}, {KC.D, KC.E}, {KC.E}, {}, {KC.C}, {}],
)
keyboard.test(
'OS interrupt, multiple interleaved',
[(4, True), (4, False), (3, True), (2, True), (2, False), (3, False)],
[{KC.E}, {KC.D, KC.E}, {KC.C, KC.D}, {KC.D}, {}],
[{KC.E}, {KC.D, KC.E}, {KC.D}, {KC.C, KC.D}, {KC.D}, {}],
)
keyboard.test(
'OS interrupt, multiple interleaved',
[(4, True), (4, False), (3, True), (2, True), (3, False), (2, False)],
[{KC.E}, {KC.D, KC.E}, {KC.C, KC.D}, {KC.C}, {}],
[{KC.E}, {KC.D, KC.E}, {KC.D}, {KC.C, KC.D}, {KC.C}, {}],
)
keyboard.test(

View File

@ -71,7 +71,7 @@ class TestTapDance(unittest.TestCase):
keyboard.test(
'Tap x1 interrupted',
[(0, True), (0, False), (4, True), (4, False)],
[{KC.N0}, {KC.N4}, {}],
[{KC.N0}, {}, {KC.N4}, {}],
)
keyboard.test(