working modules and sandbox

This commit is contained in:
Kyle Brown
2020-11-13 12:59:22 -08:00
parent f05c9a3732
commit 33e969230f
8 changed files with 290 additions and 617 deletions

View File

@@ -16,7 +16,7 @@ class KMKKeyboard(_KMKKeyboard):
) )
row_pins = (board.P0_22, board.P0_24, board.P1_00, board.P0_11) row_pins = (board.P0_22, board.P0_24, board.P1_00, board.P0_11)
diode_orientation = DiodeOrientation.COLUMNS diode_orientation = DiodeOrientation.COLUMNS
uart_pin = board.P0_08 data_pin = board.P0_08
rgb_pixel_pin = board.P0_06 rgb_pixel_pin = board.P0_06
i2c = board.I2C i2c = board.I2C
powersave_pin = board.P0_13 powersave_pin = board.P0_13

View File

@@ -1,11 +1,21 @@
from kb import KMKKeyboard, rgb_pixel_pin from kb import KMKKeyboard, rgb_pixel_pin
from kmk.extensions.rgb import RGB from kmk.extensions.rgb import RGB
from kmk.keys import KC from kmk.keys import KC
from kmk.modules.ble_split import BLE_Split
from kmk.modules.layers import Layers from kmk.modules.layers import Layers
from kmk.modules.split import Split, SplitType
keyboard = KMKKeyboard() keyboard = KMKKeyboard()
# Adding extentions
rgb = RGB(pixel_pin=rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5)
split = Split(split_type=SplitType.BLE)
layers_ext = Layers()
keyboard.modules = [layers_ext, split]
keyboard.extensions = [rgb]
#
# Cleaner key names # Cleaner key names
_______ = KC.TRNS _______ = KC.TRNS
XXXXXXX = KC.NO XXXXXXX = KC.NO
@@ -22,19 +32,6 @@ RGB_SAD = KC.RGB_SAD
RGB_VAI = KC.RGB_VAI RGB_VAI = KC.RGB_VAI
RGB_VAD = KC.RGB_VAD RGB_VAD = KC.RGB_VAD
# Adding extentions
rgb = RGB(pixel_pin=rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5)
# TODO Comment one of these on each side
# Left is 0, Right is 1
split_side = 0
split_side = 1
split = BLE_Split(split_side=split_side)
layers_ext = Layers()
keyboard.modules = [layers_ext, split]
keyboard.extensions = [rgb]
keyboard.keymap = [ keyboard.keymap = [
[ #QWERTY [ #QWERTY

View File

@@ -8,6 +8,12 @@ from kmk.matrix import MatrixScanner, intify_coordinate
from kmk.types import TapDanceKeyMeta from kmk.types import TapDanceKeyMeta
class Sandbox:
matrix_update = None
secondary_matrix_update = None
active_layers = None
class KMKKeyboard: class KMKKeyboard:
##### #####
# User-configurable # User-configurable
@@ -28,11 +34,7 @@ class KMKKeyboard:
modules = [] modules = []
extensions = [] extensions = []
sandbox = { sandbox = Sandbox()
'matrix_update': None,
'secondary_matrix_update': None,
'active_layers': [0],
}
##### #####
# Internal State # Internal State
@@ -304,10 +306,7 @@ class KMKKeyboard:
To save RAM on boards that don't use Split, we don't import Split To save RAM on boards that don't use Split, we don't import Split
and do an isinstance check, but instead do string detection and do an isinstance check, but instead do string detection
''' '''
if any( if any(x.__class__.__module__ == 'kmk.modules.split' for x in self.modules):
x.__class__.__module__ in {'kmk.modules.split', 'kmk.modules.ble_split'}
for x in self.modules
):
return return
if not self.coord_mapping: if not self.coord_mapping:
@@ -417,7 +416,7 @@ class KMKKeyboard:
print('Failed to run post hid function in extension: ', err, ext) print('Failed to run post hid function in extension: ', err, ext)
def powersave_disable(self): def powersave_disable(self):
for module in self.extensions: for module in self.modules:
try: try:
module.on_powersave_disable(self) module.on_powersave_disable(self)
except Exception as err: except Exception as err:
@@ -438,6 +437,12 @@ class KMKKeyboard:
self._init_coord_mapping() self._init_coord_mapping()
self._init_hid() self._init_hid()
for module in self.modules:
try:
module.during_bootup(self)
except Exception:
if self.debug_enabled:
print('Failed to load module', module)
for ext in self.extensions: for ext in self.extensions:
try: try:
ext.during_bootup(self) ext.during_bootup(self)
@@ -451,7 +456,7 @@ class KMKKeyboard:
while True: while True:
self.state_changed = False self.state_changed = False
self.sandbox.active_layers = self.active_layers self.sandbox.active_layers = self.active_layers.copy()
self.before_matrix_scan() self.before_matrix_scan()

View File

@@ -1,199 +0,0 @@
'''Enables splitting keyboards wirelessly'''
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.services.nordic import UARTService
from kmk.hid import HIDModes
from kmk.kmktime import ticks_diff, ticks_ms
from kmk.matrix import intify_coordinate
from kmk.modules import Module
from storage import getmount
class BLE_Split(Module):
'''Enables splitting keyboards wirelessly'''
def __init__(
self, split_flip=True, split_side=None, uart_interval=30, hid_type=HIDModes.BLE
):
self._is_target = True
self._uart_buffer = []
self.hid_type = hid_type
self.split_flip = split_flip
self.split_side = split_side
self.split_offset = None
self._ble = BLERadio()
self._ble_last_scan = ticks_ms() - 5000
self._is_target = True
self._connection_count = 0
self._uart = None
self._uart_connection = None
self._advertisment = None
self._advertising = False
self._uart_interval = uart_interval
self._psave_enable = False
self._debug_enabled = False
def __repr__(self):
return f'BLE_SPLIT({self._to_dict()})'
def _to_dict(self):
return {
'_ble': self._ble,
'_ble_last_scan': self._ble_last_scan,
'_is_target': self._is_target,
'uart_buffer': self._uart_buffer,
'_split_flip': self.split_flip,
'_split_side': self.split_side,
}
def during_bootup(self, keyboard):
self._debug_enabled = keyboard.debug_enabled
self._ble.name = str(getmount('/').label)
if self.split_side is None:
if self._ble.name.endswith('L'):
# If name ends in 'L' assume left and strip from name
self._is_target = True
elif self._ble.name.endswith('R'):
# If name ends in 'R' assume right and strip from name
self._is_target = False
else:
self._is_target = bool(self.split_side == 0)
if self.split_flip and not self._is_target:
keyboard.col_pins = list(reversed(keyboard.col_pins))
self.split_offset = len(keyboard.col_pins)
# Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping:
keyboard.coord_mapping = []
rows_to_calc = len(keyboard.row_pins) * 2
cols_to_calc = len(keyboard.col_pins) * 2
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
keyboard.coord_mapping.append(intify_coordinate(ridx, cidx))
def before_matrix_scan(self, keyboard):
self._check_all_connections()
return self._receive(keyboard)
def after_matrix_scan(self, keyboard):
if keyboard.matrix_update:
keyboard.matrix_update = self._send(keyboard.matrix_update)
return
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
if self._uart_connection and not self._psave_enable:
self._uart_connection.connection_interval = self._uart_interval
self._psave_enable = True
def on_powersave_disable(self, keyboard):
if self._uart_connection and self._psave_enable:
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self):
'''Validates the correct number of BLE connections'''
self._connection_count = len(self._ble.connections)
if self._is_target and self._connection_count < 2:
self._target_advertise()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(ProvideServicesAdvertisement, timeout=20):
if self._debug_enabled:
print('Scanning')
if UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = UARTService()
advertisement = ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self.ble_time_reset()
while not self.ble_rescan_timer():
self._connection_count = len(self._ble.connections)
if self._connection_count > 1:
self.ble_time_reset()
if self._debug_enabled:
print('Advertising complete')
break
self._ble.stop_advertising()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000)
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()
def _send(self, update):
if self._uart:
try:
if not self._is_target:
update[1] += self.split_offset
self._uart.write(update)
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
return update
def _receive(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return
return None

View File

@@ -14,7 +14,7 @@ class Power(Module):
self._powersave_start = ticks_ms() self._powersave_start = ticks_ms()
self._usb_last_scan = ticks_ms() - 5000 self._usb_last_scan = ticks_ms() - 5000
self._psp = None # Powersave pin object self._psp = None # Powersave pin object
self._i2c = None self._i2c = 0
self._loopcounter = 0 self._loopcounter = 0
make_key( make_key(
@@ -58,18 +58,15 @@ class Power(Module):
def on_powersave_enable(self, keyboard): def on_powersave_enable(self, keyboard):
'''Gives 10 cycles to allow other extentions to clean up before powersave''' '''Gives 10 cycles to allow other extentions to clean up before powersave'''
if keyboard._trigger_powersave_enable:
if self._loopcounter > 10: if self._loopcounter > 10:
self._loopcounter += 1
return
self._loopcounter = 0
keyboard._trigger_powersave_enable = False
self.enable_powersave(keyboard) self.enable_powersave(keyboard)
self._loopcounter = 0
else:
self._loopcounter += 1
return return
def on_powersave_disable(self, keyboard): def on_powersave_disable(self, keyboard):
keyboard._trigger_powersave_disable = False self.disable_powersave(keyboard)
self.disable_powersave()
return return
def enable_powersave(self, keyboard): def enable_powersave(self, keyboard):
@@ -81,22 +78,23 @@ class Power(Module):
if not self._psp: if not self._psp:
self._psp = digitalio.DigitalInOut(self.powersave_pin) self._psp = digitalio.DigitalInOut(self.powersave_pin)
self._psp.direction = digitalio.Direction.OUTPUT self._psp.direction = digitalio.Direction.OUTPUT
if self._psp:
self._psp.value = True self._psp.value = True
self.enable = True self.enable = True
keyboard._trigger_powersave_enable = False
return
def disable_powersave(self): def disable_powersave(self, keyboard):
'''Disables power saving features''' '''Disables power saving features'''
if self.powersave_pin: if self._psp:
self._psp.value = False
# Allows power save to prevent RGB drain. # Allows power save to prevent RGB drain.
# Example here https://docs.nicekeyboards.com/#/nice!nano/pinout_schematic # Example here https://docs.nicekeyboards.com/#/nice!nano/pinout_schematic
if not self._psp: keyboard._trigger_powersave_disable = False
self._psp = digitalio.DigitalInOut(self.powersave_pin)
self._psp.direction = digitalio.Direction.OUTPUT
self._psp.value = False
self.enable = False self.enable = False
return
def psleep(self): def psleep(self):
''' '''
@@ -106,6 +104,7 @@ class Power(Module):
sleep_ms(8) sleep_ms(8)
elif ticks_diff(ticks_ms(), self._powersave_start) >= 240000: elif ticks_diff(ticks_ms(), self._powersave_start) >= 240000:
sleep_ms(180) sleep_ms(180)
return
def psave_time_reset(self): def psave_time_reset(self):
self._powersave_start = ticks_ms() self._powersave_start = ticks_ms()
@@ -118,12 +117,14 @@ class Power(Module):
self._i2c = len(i2c.scan()) self._i2c = len(i2c.scan())
finally: finally:
i2c.unlock() i2c.unlock()
return
def usb_rescan_timer(self): def usb_rescan_timer(self):
return bool(ticks_diff(ticks_ms(), self._usb_last_scan) > 5000) return bool(ticks_diff(ticks_ms(), self._usb_last_scan) > 5000)
def usb_time_reset(self): def usb_time_reset(self):
self._usb_last_scan = ticks_ms() self._usb_last_scan = ticks_ms()
return
def usb_scan(self): def usb_scan(self):
# TODO Add USB detection here. Currently lies that it's connected # TODO Add USB detection here. Currently lies that it's connected

View File

@@ -1,306 +0,0 @@
'''Enables splitting keyboards wirelessly or wired'''
import busio
from micropython import const
from kmk.hid import HIDModes
from kmk.kmktime import ticks_diff, ticks_ms
from kmk.matrix import intify_coordinate
from kmk.modules import Module
from storage import getmount
class SplitSide:
LEFT = const(1)
RIGHT = const(2)
class SplitType:
UART = const(1)
I2C = const(2) # unused
ONEWIRE = const(3) # unused
BLE = const(4)
class Split_Common(Module):
'''Enables splitting keyboards wirelessly, or wired'''
def __init__(
self,
split_flip=True,
split_side=None,
split_type=SplitType.BLE,
split_target_left=True,
uart_interval=20,
hid_type=HIDModes.BLE,
data_pin=None,
data_pin2=None,
target_left=True,
uart_flip=True,
):
self._is_target = True
self._uart_buffer = []
self.hid_type = hid_type
self.split_flip = split_flip
self.split_side = split_side
self.split_type = split_type
self.split_target_left = split_target_left
self.split_offset = None
self.data_pin = data_pin
self.data_pin2 = data_pin2
self.target_left = target_left
self.uart_flip = uart_flip
self._is_target = True
self._uart = None
self._uart_interval = uart_interval
self._debug_enabled = False
if self.split_type == SplitType.BLE:
try:
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import (
ProvideServicesAdvertisement,
)
from adafruit_ble.services.nordic import UARTService
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement
self.UARTService = UARTService
except ImportError:
pass # BLE isn't supported on this platform
self._ble = BLERadio()
self._ble_last_scan = ticks_ms() - 5000
self._connection_count = 0
self._uart_connection = None
self._advertisment = None
self._advertising = False
self._psave_enable = False
def __repr__(self):
return f'BLE_SPLIT({self._to_dict()})'
def _to_dict(self):
return {
'_ble': self._ble,
'_ble_last_scan': self._ble_last_scan,
'_is_target': self._is_target,
'uart_buffer': self._uart_buffer,
'_split_flip': self.split_flip,
'_split_side': self.split_side,
}
def during_bootup(self, keyboard):
# Set up name for target side detection and BLE advertisment
name = str(getmount('/').label)
if self.split_type == SplitType.BLE:
self._ble.name = name
# Detect split side from name
if self.split_side is None:
if name.endswith('L'):
# If name ends in 'L' assume left and strip from name
self._is_target = bool(self.split_target_left)
self.split_side = SplitSide.LEFT
elif name.endswith('R'):
# If name ends in 'R' assume right and strip from name
self._is_target = not bool(self.split_target_left)
self.split_side = SplitSide.RIGHT
# if split side was given, find master from split_side.
elif self.split_side == SplitSide.LEFT:
self._is_target = bool(self.split_target_left)
elif self.split_side == SplitSide.RIGHT:
self._is_target = not bool(self.split_target_left)
# Flips the col pins if PCB is the same but flipped on right
if self.split_flip and self.split_side == SplitSide.RIGHT:
keyboard.col_pins = list(reversed(keyboard.col_pins))
self.split_offset = len(keyboard.col_pins)
if self.split_type == SplitType.UART and self.data_pin is not None:
if self._is_target:
self._uart = busio.UART(
tx=self.data_pin2, rx=self.data_pin, timeout=self._uart_interval
)
else:
self._uart = busio.UART(
tx=self.data_pin, rx=self.data_pin2, timeout=self._uart_interval
)
# Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping:
keyboard.coord_mapping = []
rows_to_calc = len(keyboard.row_pins) * 2
cols_to_calc = len(keyboard.col_pins) * 2
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
keyboard.coord_mapping.append(intify_coordinate(ridx, cidx))
def before_matrix_scan(self, keyboard):
if self.split_type == SplitType.BLE:
self._check_all_connections()
self._receive_ble(keyboard)
elif self.split_type == SplitType.UART:
if self._is_target or self.data_pin2:
self._receive_uart(keyboard)
elif self.split_type == SplitType.ONEWIRE:
pass # Protocol needs written
return
def after_matrix_scan(self, keyboard):
if keyboard.matrix_update:
if self.split_type == SplitType.BLE:
self._send_ble(keyboard.matrix_update)
elif self.split_type == SplitType.UART and self.data_pin2:
self._send_uart(keyboard.matrix_update)
elif self.split_type == SplitType.ONEWIRE:
pass # Protocol needs written
return
def before_hid_send(self, keyboard):
return
def after_hid_send(self, keyboard):
return
def on_powersave_enable(self, keyboard):
if self.split_type == SplitType.BLE:
if self._uart_connection and not self._psave_enable:
self._uart_connection.connection_interval = self._uart_interval
self._psave_enable = True
def on_powersave_disable(self, keyboard):
if self.split_type == SplitType.BLE:
if self._uart_connection and self._psave_enable:
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self):
'''Validates the correct number of BLE connections'''
self._connection_count = len(self._ble.connections)
if self._is_target and self._connection_count < 2:
self._target_advertise()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if self.UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(
self.ProvideServicesAdvertisement, timeout=20
):
if self._debug_enabled:
print('Scanning')
if self.UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = self.UARTService()
advertisement = self.ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self.ble_time_reset()
while not self.ble_rescan_timer():
self._connection_count = len(self._ble.connections)
if self._connection_count > 1:
self.ble_time_reset()
if self._debug_enabled:
print('Advertising complete')
break
self._ble.stop_advertising()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000)
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()
def _send_ble(self, update):
if self._uart:
try:
if not self._is_target:
update[1] += self.split_offset
self._uart.write(update)
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
def _receive_ble(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return
def _send_uart(self, update):
# Change offsets depending on where the data is going to match the correct
# matrix location of the receiever
if self._is_target:
if self.split_target_left:
update[1] += self.split_offset
else:
update[1] -= self.split_offset
else:
if self.split_target_left:
update[1] -= self.split_offset
else:
update[1] += self.split_offset
if self._uart is not None:
self._uart.write(update)
def _receive_uart(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
if self._uart.in_waiting >= 60:
# This is a dirty hack to prevent crashes in unrealistic cases
import microcontroller
microcontroller.reset()
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return

View File

@@ -1,84 +1,134 @@
'''Enables splitting keyboards wirelessly or wired'''
import busio import busio
from micropython import const
from kmk.hid import HIDModes
from kmk.kmktime import ticks_diff, ticks_ms
from kmk.matrix import intify_coordinate from kmk.matrix import intify_coordinate
from kmk.modules import Module from kmk.modules import Module
from storage import getmount from storage import getmount
class SplitSide:
LEFT = const(1)
RIGHT = const(2)
class SplitType: class SplitType:
UART = 1 UART = const(1)
I2C = 2 # unused I2C = const(2) # unused
ONEWIRE = 3 # unused ONEWIRE = const(3) # unused
BLE = const(4)
class Split(Module): class Split(Module):
'''Enables splitting keyboards wirelessly, or wired'''
def __init__( def __init__(
self, self,
is_target=True,
extra_data_pin=None,
split_offset=None,
split_flip=True, split_flip=True,
split_side=None, split_side=None,
split_type=SplitType.UART, split_type=SplitType.BLE,
split_target_left=True,
uart_interval=20,
hid_type=HIDModes.BLE,
data_pin=None,
data_pin2=None,
target_left=True, target_left=True,
uart_flip=True, uart_flip=True,
uart_pin=None,
uart_pin2=None,
uart_timeout=20,
): ):
self._is_target = is_target self._is_target = True
self.extra_data_pin = extra_data_pin self._uart_buffer = []
self.split_offsets = split_offset self.hid_type = hid_type
self.split_flip = split_flip self.split_flip = split_flip
self.split_side = split_side self.split_side = split_side
self.split_type = split_type self.split_type = split_type
self.split_target_left = target_left self.split_target_left = split_target_left
self._uart = None self.split_offset = None
self._uart_buffer = [] self.data_pin = data_pin
self.data_pin2 = data_pin2
self.target_left = target_left
self.uart_flip = uart_flip self.uart_flip = uart_flip
self.uart_pin = uart_pin self._is_target = True
self.uart_pin2 = uart_pin2 self._uart = None
self.uart_timeout = uart_timeout self._uart_interval = uart_interval
self._debug_enabled = False
if self.split_type == SplitType.BLE:
try:
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import (
ProvideServicesAdvertisement,
)
from adafruit_ble.services.nordic import UARTService
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement
self.UARTService = UARTService
except ImportError:
pass # BLE isn't supported on this platform
self._ble = BLERadio()
self._ble_last_scan = ticks_ms() - 5000
self._connection_count = 0
self._uart_connection = None
self._advertisment = None
self._advertising = False
self._psave_enable = False
def __repr__(self):
return f'BLE_SPLIT({self._to_dict()})'
def _to_dict(self):
return {
'_ble': self._ble,
'_ble_last_scan': self._ble_last_scan,
'_is_target': self._is_target,
'uart_buffer': self._uart_buffer,
'_split_flip': self.split_flip,
'_split_side': self.split_side,
}
def during_bootup(self, keyboard): def during_bootup(self, keyboard):
try: # Set up name for target side detection and BLE advertisment
# Working around https://github.com/adafruit/circuitpython/issues/1769 name = str(getmount('/').label)
keyboard._hid_helper_inst.create_report([]).send() if self.split_type == SplitType.BLE:
# Line above is broken and needs fixed for aut detection self._ble.name = name
self._is_target = True
except OSError: # Detect split side from name
self._is_target = False
if self.split_side is None: if self.split_side is None:
l_or_r = str(getmount('/').label) if name.endswith('L'):
if l_or_r.endswith('L'):
# If name ends in 'L' assume left and strip from name # If name ends in 'L' assume left and strip from name
self.split_side = 0 self._is_target = bool(self.split_target_left)
elif l_or_r.endswith('R'): self.split_side = SplitSide.LEFT
elif name.endswith('R'):
# If name ends in 'R' assume right and strip from name # If name ends in 'R' assume right and strip from name
self.split_side = 1 self._is_target = not bool(self.split_target_left)
self.split_side = SplitSide.RIGHT
if self.split_flip and not self._is_target: # if split side was given, find master from split_side.
elif self.split_side == SplitSide.LEFT:
self._is_target = bool(self.split_target_left)
elif self.split_side == SplitSide.RIGHT:
self._is_target = not bool(self.split_target_left)
# Flips the col pins if PCB is the same but flipped on right
if self.split_flip and self.split_side == SplitSide.RIGHT:
keyboard.col_pins = list(reversed(keyboard.col_pins)) keyboard.col_pins = list(reversed(keyboard.col_pins))
if self.split_side == 0:
self.split_target_left = self._is_target
elif self.split_side == 1:
self.split_target_left = not self._is_target
if self.uart_pin is not None: self.split_offset = len(keyboard.col_pins)
if self.split_type == SplitType.UART and self.data_pin is not None:
if self._is_target: if self._is_target:
self._uart = busio.UART( self._uart = busio.UART(
tx=self.uart_pin2, rx=self.uart_pin, timeout=self.uart_timeout tx=self.data_pin2, rx=self.data_pin, timeout=self._uart_interval
) )
else: else:
self._uart = busio.UART( self._uart = busio.UART(
tx=self.uart_pin, rx=self.uart_pin2, timeout=self.uart_timeout tx=self.data_pin, rx=self.data_pin2, timeout=self._uart_interval
) )
# Attempt to sanely guess a coord_mapping if one is not provided. # Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping: if not keyboard.coord_mapping:
keyboard.coord_mapping = [] keyboard.coord_mapping = []
self.split_offset = len(keyboard.col_pins)
rows_to_calc = len(keyboard.row_pins) * 2 rows_to_calc = len(keyboard.row_pins) * 2
cols_to_calc = len(keyboard.col_pins) * 2 cols_to_calc = len(keyboard.col_pins) * 2
@@ -87,13 +137,26 @@ class Split(Module):
keyboard.coord_mapping.append(intify_coordinate(ridx, cidx)) keyboard.coord_mapping.append(intify_coordinate(ridx, cidx))
def before_matrix_scan(self, keyboard): def before_matrix_scan(self, keyboard):
if self._is_target or self.uart_pin2: if self.split_type == SplitType.BLE:
return self._receive(keyboard) self._check_all_connections()
return None self._receive_ble(keyboard)
elif self.split_type == SplitType.UART:
if self._is_target or self.data_pin2:
self._receive_uart(keyboard)
elif self.split_type == SplitType.ONEWIRE:
pass # Protocol needs written
return
def after_matrix_scan(self, keyboard): def after_matrix_scan(self, keyboard):
if keyboard.matrix_update is not None and not self._is_target: if keyboard.matrix_update:
self._send(keyboard.matrix_update) if self.split_type == SplitType.BLE:
self._send_ble(keyboard.matrix_update)
elif self.split_type == SplitType.UART and self.data_pin2:
self._send_uart(keyboard.matrix_update)
elif self.split_type == SplitType.ONEWIRE:
pass # Protocol needs written
return
def before_hid_send(self, keyboard): def before_hid_send(self, keyboard):
return return
@@ -102,20 +165,132 @@ class Split(Module):
return return
def on_powersave_enable(self, keyboard): def on_powersave_enable(self, keyboard):
return if self.split_type == SplitType.BLE:
if self._uart_connection and not self._psave_enable:
self._uart_connection.connection_interval = self._uart_interval
self._psave_enable = True
def on_powersave_disable(self, keyboard): def on_powersave_disable(self, keyboard):
if self.split_type == SplitType.BLE:
if self._uart_connection and self._psave_enable:
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self):
'''Validates the correct number of BLE connections'''
self._connection_count = len(self._ble.connections)
if self._is_target and self._connection_count < 2:
self._target_advertise()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if self.UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(
self.ProvideServicesAdvertisement, timeout=20
):
if self._debug_enabled:
print('Scanning')
if self.UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = self.UARTService()
advertisement = self.ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self.ble_time_reset()
while not self.ble_rescan_timer():
self._connection_count = len(self._ble.connections)
if self._connection_count > 1:
self.ble_time_reset()
if self._debug_enabled:
print('Advertising complete')
break
self._ble.stop_advertising()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000)
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()
def _send_ble(self, update):
if self._uart:
try:
if not self._is_target:
update[1] += self.split_offset
self._uart.write(update)
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
def _receive_ble(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
while self._uart.in_waiting >= 3:
self._uart_buffer.append(self._uart.read(3))
if self._uart_buffer:
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return return
def _send(self, update): def _send_uart(self, update):
# Change offsets depending on where the data is going to match the correct
# matrix location of the receiever
if self._is_target:
if self.split_target_left: if self.split_target_left:
update[1] += self.split_offset update[1] += self.split_offset
else: else:
update[1] -= self.split_offsets update[1] -= self.split_offset
else:
if self.split_target_left:
update[1] -= self.split_offset
else:
update[1] += self.split_offset
if self._uart is not None: if self._uart is not None:
self._uart.write(update) self._uart.write(update)
def _receive(self, keyboard): def _receive_uart(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
if self._uart.in_waiting >= 60: if self._uart.in_waiting >= 60:
# This is a dirty hack to prevent crashes in unrealistic cases # This is a dirty hack to prevent crashes in unrealistic cases
@@ -129,5 +304,3 @@ class Split(Module):
keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0))
return return
return None

View File

@@ -6,33 +6,27 @@ import displayio
import terminalio import terminalio
from adafruit_display_text import label from adafruit_display_text import label
from kb import KMKKeyboard from kb import KMKKeyboard
from kmk.extensions.ble_split import BLE_Split
from kmk.extensions.power import Power
from kmk.extensions.rgb import RGB from kmk.extensions.rgb import RGB
from kmk.handlers.sequences import send_string, simple_key_sequence
from kmk.hid import HIDModes from kmk.hid import HIDModes
from kmk.keys import KC from kmk.keys import KC
from kmk_side import split_side from kmk.modules.layers import Layers
from kmk.modules.power import Power
from kmk.modules.split import Split, SplitType
keyboard = KMKKeyboard() keyboard = KMKKeyboard()
_______ = KC.TRNS
XXXXXXX = KC.NO
LT1_SP = KC.MO(2)
LT2_SP = KC.LT(3, KC.SPC)
TAB_SB = KC.LT(5, KC.TAB)
SUPER_L = KC.LM(4, KC.LGUI)
keyboard.tap_time = 320 keyboard.tap_time = 320
keyboard.debug_enabled = False keyboard.debug_enabled = False
rgb_ext = RGB(pixel_pin=keyboard.rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5) rgb_ext = RGB(pixel_pin=keyboard.rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5)
split = BLE_Split() split = Split(split_type=SplitType.BLE)
power = Power(powersave_pin=keyboard.powersave_pin) power = Power(powersave_pin=keyboard.powersave_pin)
layers = Layers()
keyboard.extensions = [split, rgb_ext, power] keyboard.modules = [split, power, layers]
keyboard.extensions = [rgb_ext]
enable_oled = False enable_oled = False
@@ -47,6 +41,14 @@ else:
displayio.release_displays() displayio.release_displays()
keyboard.i2c_deinit_count += 1 keyboard.i2c_deinit_count += 1
_______ = KC.TRNS
XXXXXXX = KC.NO
LT1_SP = KC.MO(2)
LT2_SP = KC.LT(3, KC.SPC)
TAB_SB = KC.LT(5, KC.TAB)
SUPER_L = KC.LM(4, KC.LGUI)
keyboard.keymap = [ keyboard.keymap = [
# DVORAK # DVORAK
# ,-----------------------------------------. ,-----------------------------------------. # ,-----------------------------------------. ,-----------------------------------------.