prelim module support

This commit is contained in:
Kyle Brown
2020-11-12 14:33:39 -08:00
parent a85ec2cc3f
commit 8839c1c7ec
13 changed files with 235 additions and 159 deletions

40
kmk/modules/__init__.py Normal file
View File

@@ -0,0 +1,40 @@
class InvalidExtensionEnvironment(Exception):
pass
class Module:
'''
Modules differ from extensions in that they not only can read the state, but
are allowed to modify the state. The will be loaded on boot, and are not
allowed to be unloaded as they are required to continue functioning in a
consistant manner.
'''
# The below methods should be implemented by subclasses
def during_bootup(self, keyboard):
raise NotImplementedError
def before_matrix_scan(self, keyboard):
'''
Return value will be injected as an extra matrix update
'''
raise NotImplementedError
def after_matrix_scan(self, keyboard):
'''
Return value will be replace matrix update if supplied
'''
raise NotImplementedError
def before_hid_send(self, keyboard):
raise NotImplementedError
def after_hid_send(self, keyboard):
raise NotImplementedError
def on_powersave_enable(self, keyboard):
raise NotImplementedError
def on_powersave_disable(self, keyboard):
raise NotImplementedError

199
kmk/modules/ble_split.py Normal file
View File

@@ -0,0 +1,199 @@
'''Enables splitting keyboards wirelessly'''
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.services.nordic import UARTService
from kmk.hid import HIDModes
from kmk.kmktime import ticks_diff, ticks_ms
from kmk.matrix import intify_coordinate
from kmk.modules import Module
from storage import getmount
class BLE_Split(Module):
'''Enables splitting keyboards wirelessly'''
def __init__(
self, split_flip=True, split_side=None, uart_interval=30, hid_type=HIDModes.BLE
):
self._is_target = True
self._uart_buffer = []
self.hid_type = hid_type
self.split_flip = split_flip
self.split_side = split_side
self.split_offset = None
self._ble = BLERadio()
self._ble_last_scan = ticks_ms() - 5000
self._is_target = True
self._connection_count = 0
self._uart = None
self._uart_connection = None
self._advertisment = None
self._advertising = False
self._uart_interval = uart_interval
self._psave_enable = False
self._debug_enabled = False
def __repr__(self):
return f'BLE_SPLIT({self._to_dict()})'
def _to_dict(self):
return {
'_ble': self._ble,
'_ble_last_scan': self._ble_last_scan,
'_is_target': self._is_target,
'uart_buffer': self._uart_buffer,
'_split_flip': self.split_flip,
'_split_side': self.split_side,
}
def during_bootup(self, keyboard):
self._debug_enabled = keyboard.debug_enabled
self._ble.name = str(getmount('/').label)
if self.split_side is None:
if self._ble.name.endswith('L'):
# If name ends in 'L' assume left and strip from name
self._is_target = True
elif self._ble.name.endswith('R'):
# If name ends in 'R' assume right and strip from name
self._is_target = False
else:
self._is_target = bool(self.split_side == 0)
if self.split_flip and not self._is_target:
keyboard.col_pins = list(reversed(keyboard.col_pins))
self.split_offset = len(keyboard.col_pins)
# Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping:
keyboard.coord_mapping = []
rows_to_calc = len(keyboard.row_pins) * 2
cols_to_calc = len(keyboard.col_pins) * 2
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
keyboard.coord_mapping.append(intify_coordinate(ridx, cidx))
def before_matrix_scan(self, keyboard):
self._check_all_connections()
return self._receive(keyboard)
def after_matrix_scan(self, keyboard):
if keyboard.matrix_update:
keyboard.matrix_update = self._send(keyboard.matrix_update)
return
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
if self._uart_connection and not self._psave_enable:
self._uart_connection.connection_interval = self._uart_interval
self._psave_enable = True
def on_powersave_disable(self, keyboard):
if self._uart_connection and self._psave_enable:
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self):
'''Validates the correct number of BLE connections'''
self._connection_count = len(self._ble.connections)
if self._is_target and self._connection_count < 2:
self._target_advertise()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(ProvideServicesAdvertisement, timeout=20):
if self._debug_enabled:
print('Scanning')
if UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = UARTService()
advertisement = ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self.ble_time_reset()
while not self.ble_rescan_timer():
self._connection_count = len(self._ble.connections)
if self._connection_count > 1:
self.ble_time_reset()
if self._debug_enabled:
print('Advertising complete')
break
self._ble.stop_advertising()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000)
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()
def _send(self, update):
if self._uart:
try:
if not self._is_target:
update[1] += self.split_offset
self._uart.write(update)
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
return update
def _receive(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return
return None

196
kmk/modules/layers.py Normal file
View File

@@ -0,0 +1,196 @@
'''One layer isn't enough. Adds keys to get to more of them'''
from micropython import const
from kmk.key_validators import layer_key_validator
from kmk.keys import make_argumented_key
from kmk.kmktime import accurate_ticks, accurate_ticks_diff
from kmk.modules import Module
class LayerType:
'''Defines layer type values for readability'''
MO = const(0)
DF = const(1)
LM = const(2)
LT = const(3)
TG = const(4)
TT = const(5)
class Layers(Module):
'''Gives access to the keys used to enable the layer system'''
def __init__(self):
# Layers
self.start_time = {
LayerType.LT: None,
LayerType.TG: None,
LayerType.TT: None,
LayerType.LM: None,
}
make_argumented_key(
validator=layer_key_validator,
names=('MO',),
on_press=self._mo_pressed,
on_release=self._mo_released,
)
make_argumented_key(
validator=layer_key_validator, names=('DF',), on_press=self._df_pressed
)
make_argumented_key(
validator=layer_key_validator,
names=('LM',),
on_press=self._lm_pressed,
on_release=self._lm_released,
)
make_argumented_key(
validator=layer_key_validator,
names=('LT',),
on_press=self._lt_pressed,
on_release=self._lt_released,
)
make_argumented_key(
validator=layer_key_validator, names=('TG',), on_press=self._tg_pressed
)
make_argumented_key(
validator=layer_key_validator, names=('TO',), on_press=self._to_pressed
)
make_argumented_key(
validator=layer_key_validator,
names=('TT',),
on_press=self._tt_pressed,
on_release=self._tt_released,
)
def during_bootup(self, keyboard):
return
def before_matrix_scan(self, keyboard):
return
def after_matrix_scan(self, keyboard):
return
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
return
def on_powersave_disable(self, keyboard):
return
@staticmethod
def _df_pressed(key, keyboard, *args, **kwargs):
'''
Switches the default layer
'''
keyboard.active_layers[-1] = key.meta.layer
@staticmethod
def _mo_pressed(key, keyboard, *args, **kwargs):
'''
Momentarily activates layer, switches off when you let go
'''
keyboard.active_layers.insert(0, key.meta.layer)
@staticmethod
def _mo_released(key, keyboard, KC, *args, **kwargs):
# remove the first instance of the target layer
# from the active list
# under almost all normal use cases, this will
# disable the layer (but preserve it if it was triggered
# as a default layer, etc.)
# this also resolves an issue where using DF() on a layer
# triggered by MO() and then defaulting to the MO()'s layer
# would result in no layers active
try:
del_idx = keyboard.active_layers.index(key.meta.layer)
del keyboard.active_layers[del_idx]
except ValueError:
pass
def _lm_pressed(self, key, keyboard, *args, **kwargs):
'''
As MO(layer) but with mod active
'''
keyboard.hid_pending = True
# Sets the timer start and acts like MO otherwise
keyboard.keys_pressed.add(key.meta.kc)
self._mo_pressed(key, keyboard, *args, **kwargs)
def _lm_released(self, key, keyboard, *args, **kwargs):
'''
As MO(layer) but with mod active
'''
keyboard.hid_pending = True
keyboard.keys_pressed.discard(key.meta.kc)
self._mo_released(key, keyboard, *args, **kwargs)
def _lt_pressed(self, key, keyboard, *args, **kwargs):
# Sets the timer start and acts like MO otherwise
self.start_time[LayerType.LT] = accurate_ticks()
self._mo_pressed(key, keyboard, *args, **kwargs)
def _lt_released(self, key, keyboard, *args, **kwargs):
# On keyup, check timer, and press key if needed.
if self.start_time[LayerType.LT] and (
accurate_ticks_diff(
accurate_ticks(), self.start_time[LayerType.LT], keyboard.tap_time
)
):
keyboard.hid_pending = True
keyboard.tap_key(key.meta.kc)
self._mo_released(key, keyboard, *args, **kwargs)
self.start_time[LayerType.LT] = None
@staticmethod
def _tg_pressed(key, keyboard, *args, **kwargs):
'''
Toggles the layer (enables it if not active, and vise versa)
'''
# See mo_released for implementation details around this
try:
del_idx = keyboard.active_layers.index(key.meta.layer)
del keyboard.active_layers[del_idx]
except ValueError:
keyboard.active_layers.insert(0, key.meta.layer)
@staticmethod
def _to_pressed(key, keyboard, *args, **kwargs):
'''
Activates layer and deactivates all other layers
'''
keyboard.active_layers.clear()
keyboard.active_layers.insert(0, key.meta.layer)
def _tt_pressed(self, key, keyboard, *args, **kwargs):
'''
Momentarily activates layer if held, toggles it if tapped repeatedly
'''
if self.start_time[LayerType.TT] is None:
# Sets the timer start and acts like MO otherwise
self.start_time[LayerType.TT] = accurate_ticks()
self._mo_pressed(key, keyboard, *args, **kwargs)
return
elif accurate_ticks_diff(
accurate_ticks(), self.start_time[LayerType.TT], keyboard.tap_time
):
self.start_time[LayerType.TT] = None
self._tg_pressed(key, keyboard, *args, **kwargs)
return
return None
def _tt_released(self, key, keyboard, *args, **kwargs):
if self.start_time[LayerType.TT] is None or not accurate_ticks_diff(
accurate_ticks(), self.start_time[LayerType.TT], keyboard.tap_time
):
# On first press, works like MO. On second press, does nothing unless let up within
# time window, then acts like TG.
self.start_time[LayerType.TT] = None
self._mo_released(key, keyboard, *args, **kwargs)

57
kmk/modules/modtap.py Normal file
View File

@@ -0,0 +1,57 @@
from kmk.key_validators import mod_tap_validator
from kmk.keys import make_argumented_key
from kmk.kmktime import accurate_ticks, accurate_ticks_diff
from kmk.modules import Module
class ModTap(Module):
def __init__(self):
self._mod_tap_timer = None
make_argumented_key(
validator=mod_tap_validator,
names=('MT',),
on_press=self.mt_pressed,
on_release=self.mt_released,
)
def during_bootup(self, keyboard):
return
def before_matrix_scan(self, keyboard):
return
def after_matrix_scan(self, keyboard):
return
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
return
def on_powersave_disable(self, keyboard):
return
def mt_pressed(self, key, keyboard, *args, **kwargs):
'''Sets the timer start and acts like a modifier otherwise'''
keyboard.keys_pressed.add(key.meta.mods)
self._mod_tap_timer = accurate_ticks()
return keyboard
def mt_released(self, key, keyboard, *args, **kwargs):
''' On keyup, check timer, and press key if needed.'''
keyboard.keys_pressed.discard(key.meta.mods)
if self._mod_tap_timer and (
accurate_ticks_diff(
accurate_ticks(), self._mod_tap_timer, keyboard.tap_time
)
):
keyboard.hid_pending = True
keyboard.tap_key(key.meta.kc)
self._mod_tap_timer = None
return keyboard

145
kmk/modules/power.py Normal file
View File

@@ -0,0 +1,145 @@
import board
import digitalio
from kmk.handlers.stock import passthrough as handler_passthrough
from kmk.keys import make_key
from kmk.kmktime import sleep_ms, ticks_diff, ticks_ms
from kmk.modules import Module
class Power(Module):
def __init__(self, powersave_pin=None):
self.enable = False
self.powersave_pin = powersave_pin # Powersave pin board object
self._powersave_start = ticks_ms()
self._usb_last_scan = ticks_ms() - 5000
self._psp = None # Powersave pin object
self._i2c = None
self._loopcounter = 0
make_key(
names=('PS_TOG',), on_press=self._ps_tog, on_release=handler_passthrough
)
make_key(
names=('PS_ON',), on_press=self._ps_enable, on_release=handler_passthrough
)
make_key(
names=('PS_OFF',), on_press=self._ps_disable, on_release=handler_passthrough
)
def __repr__(self):
return f'Power({self._to_dict()})'
def _to_dict(self):
return {
'enable': self.enable,
'powersave_pin': self.powersave_pin,
'_powersave_start': self._powersave_start,
'_usb_last_scan': self._usb_last_scan,
'_psp': self._psp,
}
def during_bootup(self, keyboard):
self._i2c_scan()
def before_matrix_scan(self, keyboard):
return
def after_matrix_scan(self, keyboard, matrix_update):
if matrix_update or keyboard.secondary_matrix_update:
self.psave_time_reset()
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
if self.enable:
self.psleep()
def on_powersave_enable(self, keyboard):
'''Gives 10 cycles to allow other extentions to clean up before powersave'''
if keyboard._trigger_powersave_enable:
if self._loopcounter > 10:
self._loopcounter += 1
return
self._loopcounter = 0
keyboard._trigger_powersave_enable = False
self.enable_powersave(keyboard)
return
def on_powersave_disable(self, keyboard):
keyboard._trigger_powersave_disable = False
self.disable_powersave()
return
def enable_powersave(self, keyboard):
'''Enables power saving features'''
if keyboard.i2c_deinit_count >= self._i2c and self.powersave_pin:
# Allows power save to prevent RGB drain.
# Example here https://docs.nicekeyboards.com/#/nice!nano/pinout_schematic
if not self._psp:
self._psp = digitalio.DigitalInOut(self.powersave_pin)
self._psp.direction = digitalio.Direction.OUTPUT
self._psp.value = True
self.enable = True
def disable_powersave(self):
'''Disables power saving features'''
if self.powersave_pin:
# Allows power save to prevent RGB drain.
# Example here https://docs.nicekeyboards.com/#/nice!nano/pinout_schematic
if not self._psp:
self._psp = digitalio.DigitalInOut(self.powersave_pin)
self._psp.direction = digitalio.Direction.OUTPUT
self._psp.value = False
self.enable = False
def psleep(self):
'''
Sleeps longer and longer to save power the more time in between updates.
'''
if ticks_diff(ticks_ms(), self._powersave_start) <= 60000:
sleep_ms(8)
elif ticks_diff(ticks_ms(), self._powersave_start) >= 240000:
sleep_ms(180)
def psave_time_reset(self):
self._powersave_start = ticks_ms()
def _i2c_scan(self):
i2c = board.I2C()
while not i2c.try_lock():
pass
try:
self._i2c = len(i2c.scan())
finally:
i2c.unlock()
def usb_rescan_timer(self):
return bool(ticks_diff(ticks_ms(), self._usb_last_scan) > 5000)
def usb_time_reset(self):
self._usb_last_scan = ticks_ms()
def usb_scan(self):
# TODO Add USB detection here. Currently lies that it's connected
# https://github.com/adafruit/circuitpython/pull/3513
return True
def _ps_tog(self, key, keyboard, *args, **kwargs):
if self.enable:
keyboard._trigger_powersave_disable = True
else:
keyboard._trigger_powersave_enable = True
def _ps_enable(self, key, keyboard, *args, **kwargs):
if not self.enable:
keyboard._trigger_powersave_enable = True
def _ps_disable(self, key, keyboard, *args, **kwargs):
if self.enable:
keyboard._trigger_powersave_disable = True

133
kmk/modules/split.py Normal file
View File

@@ -0,0 +1,133 @@
import busio
from kmk.matrix import intify_coordinate
from kmk.modules import Module
from storage import getmount
class SplitType:
UART = 1
I2C = 2 # unused
ONEWIRE = 3 # unused
class Split(Module):
def __init__(
self,
is_target=True,
extra_data_pin=None,
split_offset=None,
split_flip=True,
split_side=None,
split_type=SplitType.UART,
target_left=True,
uart_flip=True,
uart_pin=None,
uart_pin2=None,
uart_timeout=20,
):
self._is_target = is_target
self.extra_data_pin = extra_data_pin
self.split_offsets = split_offset
self.split_flip = split_flip
self.split_side = split_side
self.split_type = split_type
self.split_target_left = target_left
self._uart = None
self._uart_buffer = []
self.uart_flip = uart_flip
self.uart_pin = uart_pin
self.uart_pin2 = uart_pin2
self.uart_timeout = uart_timeout
def during_bootup(self, keyboard):
try:
# Working around https://github.com/adafruit/circuitpython/issues/1769
keyboard._hid_helper_inst.create_report([]).send()
# Line above is broken and needs fixed for aut detection
self._is_target = True
except OSError:
self._is_target = False
if self.split_side is None:
l_or_r = str(getmount('/').label)
if l_or_r.endswith('L'):
# If name ends in 'L' assume left and strip from name
self.split_side = 0
elif l_or_r.endswith('R'):
# If name ends in 'R' assume right and strip from name
self.split_side = 1
if self.split_flip and not self._is_target:
keyboard.col_pins = list(reversed(keyboard.col_pins))
if self.split_side == 0:
self.split_target_left = self._is_target
elif self.split_side == 1:
self.split_target_left = not self._is_target
if self.uart_pin is not None:
if self._is_target:
self._uart = busio.UART(
tx=self.uart_pin2, rx=self.uart_pin, timeout=self.uart_timeout
)
else:
self._uart = busio.UART(
tx=self.uart_pin, rx=self.uart_pin2, timeout=self.uart_timeout
)
# Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping:
keyboard.coord_mapping = []
self.split_offset = len(keyboard.col_pins)
rows_to_calc = len(keyboard.row_pins) * 2
cols_to_calc = len(keyboard.col_pins) * 2
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
keyboard.coord_mapping.append(intify_coordinate(ridx, cidx))
def before_matrix_scan(self, keyboard):
if self._is_target or self.uart_pin2:
return self._receive(keyboard)
return None
def after_matrix_scan(self, keyboard):
if keyboard.matrix_update is not None and not self._is_target:
self._send(keyboard.matrix_update)
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
return
def on_powersave_disable(self, keyboard):
return
def _send(self, update):
if self.split_target_left:
update[1] += self.split_offset
else:
update[1] -= self.split_offsets
if self._uart is not None:
self._uart.write(update)
def _receive(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
if self._uart.in_waiting >= 60:
# This is a dirty hack to prevent crashes in unrealistic cases
import microcontroller
microcontroller.reset()
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return
return None