draft concept for ble-split to transport refactor

This commit is contained in:
xs5871 2022-09-07 19:02:42 +00:00 committed by Kyle Brown
parent 17f2961c0b
commit ec6bb9564b
2 changed files with 262 additions and 249 deletions

View File

@ -1,13 +1,10 @@
'''Enables splitting keyboards wirelessly or wired''' '''Enables splitting keyboards wirelessly or wired'''
import busio
from micropython import const from micropython import const
from supervisor import runtime, ticks_ms from supervisor import runtime
from keypad import Event as KeyEvent from keypad import Event as KeyEvent
from storage import getmount from storage import getmount
from kmk.hid import HIDModes
from kmk.kmktime import check_deadline
from kmk.modules import Module from kmk.modules import Module
@ -21,6 +18,7 @@ class SplitType:
I2C = const(2) # unused I2C = const(2) # unused
ONEWIRE = const(3) # unused ONEWIRE = const(3) # unused
BLE = const(4) BLE = const(4)
PIO_UART = const(5)
class Split(Module): class Split(Module):
@ -50,71 +48,27 @@ class Split(Module):
self.data_pin2 = data_pin2 self.data_pin2 = data_pin2
self.uart_flip = uart_flip self.uart_flip = uart_flip
self._use_pio = use_pio self._use_pio = use_pio
self._uart = None self._transport = None
self._uart_interval = uart_interval self._uart_interval = uart_interval
self._debug_enabled = debug_enabled self._debug_enabled = debug_enabled
self.uart_header = bytearray([0xB2]) # Any non-zero byte should work self.uart_header = bytearray([0xB2]) # Any non-zero byte should work
if self.split_type == SplitType.BLE: if split_type == SplitType.UART and use_pio:
try: split_type = SplitType.PIO_UART
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import (
ProvideServicesAdvertisement,
)
from adafruit_ble.services.nordic import UARTService
self.BLERadio = BLERadio if split_type == SplitType.UART:
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement import busio
self.UARTService = UARTService elif split_type == SplitType.PIO_UART:
except ImportError:
print('BLE Import error')
return # BLE isn't supported on this platform
self._ble_last_scan = ticks_ms() - 5000
self._connection_count = 0
self._split_connected = False
self._uart_connection = None
self._advertisment = None # Seems to not be used anywhere
self._advertising = False
self._psave_enable = False
if self._use_pio:
from kmk.transports.pio_uart import PIO_UART from kmk.transports.pio_uart import PIO_UART
elif split_type == SplitType.BLE:
self.PIO_UART = PIO_UART from kmk.transports.ble import BLE_UART
def during_bootup(self, keyboard): def during_bootup(self, keyboard):
# Set up name for target side detection and BLE advertisment # Set up name for target side detection and BLE advertisment
name = str(getmount('/').label) if not self.data_pin:
if self.split_type == SplitType.BLE: self.data_pin = keyboard.data_pin
if keyboard.hid_type == HIDModes.BLE:
self._ble = keyboard._hid_helper.ble
else:
self._ble = self.BLERadio()
self._ble.name = name
else:
# Try to guess data pins if not supplied
if not self.data_pin:
self.data_pin = keyboard.data_pin
# if split side was given, find target from split_side. self._get_side()
if self.split_side == SplitSide.LEFT:
self._is_target = bool(self.split_target_left)
elif self.split_side == SplitSide.RIGHT:
self._is_target = not bool(self.split_target_left)
else:
# Detect split side from name
if (
self.split_type == SplitType.UART
or self.split_type == SplitType.ONEWIRE
):
self._is_target = runtime.usb_connected
elif self.split_type == SplitType.BLE:
self._is_target = name.endswith('L') == self.split_target_left
if name.endswith('L'):
self.split_side = SplitSide.LEFT
elif name.endswith('R'):
self.split_side = SplitSide.RIGHT
if not self._is_target: if not self._is_target:
keyboard._hid_send_enabled = False keyboard._hid_send_enabled = False
@ -122,41 +76,11 @@ class Split(Module):
if self.split_offset is None: if self.split_offset is None:
self.split_offset = keyboard.matrix[-1].coord_mapping[-1] + 1 self.split_offset = keyboard.matrix[-1].coord_mapping[-1] + 1
if self.split_type == SplitType.UART and self.data_pin is not None: self._init_transport(keyboard)
if self._is_target or not self.uart_flip:
if self._use_pio:
self._uart = self.PIO_UART(tx=self.data_pin2, rx=self.data_pin)
else:
self._uart = busio.UART(
tx=self.data_pin2, rx=self.data_pin, timeout=self._uart_interval
)
else:
if self._use_pio:
self._uart = self.PIO_UART(tx=self.data_pin, rx=self.data_pin2)
else:
self._uart = busio.UART(
tx=self.data_pin, rx=self.data_pin2, timeout=self._uart_interval
)
# Attempt to sanely guess a coord_mapping if one is not provided. # Attempt to sanely guess a coord_mapping if one is not provided.
if not keyboard.coord_mapping: if not keyboard.coord_mapping:
cm = [] self._guess_coord_mapping()
rows_to_calc = len(keyboard.row_pins)
cols_to_calc = len(keyboard.col_pins)
# Flips the col order if PCB is the same but flipped on right
cols_rhs = list(range(cols_to_calc))
if self.split_flip:
cols_rhs = list(reversed(cols_rhs))
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
cm.append(cols_to_calc * ridx + cidx)
for cidx in cols_rhs:
cm.append(cols_to_calc * (rows_to_calc + ridx) + cidx)
keyboard.coord_mapping = tuple(cm)
if self.split_side == SplitSide.RIGHT: if self.split_side == SplitSide.RIGHT:
offset = self.split_offset offset = self.split_offset
@ -166,14 +90,13 @@ class Split(Module):
def before_matrix_scan(self, keyboard): def before_matrix_scan(self, keyboard):
if self.split_type == SplitType.BLE: if self.split_type == SplitType.BLE:
self._check_all_connections(keyboard) self._transport.check_connection(keyboard)
self._receive_ble(keyboard)
elif self.split_type == SplitType.UART: if self.split_type == SplitType.UART:
if self._is_target or self.data_pin2: if self._is_target or self.data_pin2:
self._receive_uart(keyboard) self._receive_uart(keyboard)
elif self.split_type == SplitType.ONEWIRE: else:
pass # Protocol needs written self._receive_uart(keyboard)
return
def after_matrix_scan(self, keyboard): def after_matrix_scan(self, keyboard):
if keyboard.matrix_update: if keyboard.matrix_update:
@ -182,14 +105,8 @@ class Split(Module):
self._send_uart(keyboard.matrix_update) self._send_uart(keyboard.matrix_update)
else: else:
pass # explicit pass just for dev sanity... pass # explicit pass just for dev sanity...
elif self.split_type == SplitType.BLE:
self._send_ble(keyboard.matrix_update)
elif self.split_type == SplitType.ONEWIRE:
pass # Protocol needs written
else: else:
print('Unexpected case in after_matrix_scan') self._send_uart(keyboard.matrix_update)
return
def before_hid_send(self, keyboard): def before_hid_send(self, keyboard):
if not self._is_target: if not self._is_target:
@ -202,119 +119,11 @@ class Split(Module):
def on_powersave_enable(self, keyboard): def on_powersave_enable(self, keyboard):
if self.split_type == SplitType.BLE: if self.split_type == SplitType.BLE:
if self._uart_connection and not self._psave_enable: self._transport.enable_powersave(True)
self._uart_connection.connection_interval = self._uart_interval
self._psave_enable = True
def on_powersave_disable(self, keyboard): def on_powersave_disable(self, keyboard):
if self.split_type == SplitType.BLE: if self.split_type == SplitType.BLE:
if self._uart_connection and self._psave_enable: self._transport.enable_powersave(False)
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self, keyboard):
'''Validates the correct number of BLE connections'''
self._previous_connection_count = self._connection_count
self._connection_count = len(self._ble.connections)
if self._is_target:
if self._advertising or not self._check_if_split_connected():
self._target_advertise()
elif self._connection_count < 2 and keyboard.hid_type == HIDModes.BLE:
keyboard._hid_helper.start_advertising()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _check_if_split_connected(self):
# I'm looking for a way how to recognize which connection is on and which one off
# For now, I found that service name relation to having other CP device
if self._connection_count == 0:
return False
if self._connection_count == 2:
self._split_connected = True
return True
# Polling this takes some time so I check only if connection_count changed
if self._previous_connection_count == self._connection_count:
return self._split_connected
bleio_connection = self._ble.connections[0]._bleio_connection
connection_services = bleio_connection.discover_remote_services()
for service in connection_services:
if str(service.uuid).startswith("UUID('adaf0001"):
self._split_connected = True
return True
return False
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if self.UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(
self.ProvideServicesAdvertisement, timeout=20
):
if self._debug_enabled:
print('Scanning')
if self.UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
# Give previous advertising some time to complete
if self._advertising:
if self._check_if_split_connected():
if self._debug_enabled:
print('Advertising complete')
self._ble.stop_advertising()
self._advertising = False
return
if not self.ble_rescan_timer():
return
if self._debug_enabled:
print('Advertising not answered')
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = self.UARTService()
advertisement = self.ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self._advertising = True
self.ble_time_reset()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return not bool(check_deadline(ticks_ms(), self._ble_last_scan, 5000))
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()
def _serialize_update(self, update): def _serialize_update(self, update):
buffer = bytearray(2) buffer = bytearray(2)
@ -326,30 +135,6 @@ class Split(Module):
kevent = KeyEvent(key_number=update[0], pressed=update[1]) kevent = KeyEvent(key_number=update[0], pressed=update[1])
return kevent return kevent
def _send_ble(self, update):
if self._uart:
try:
self._uart.write(self._serialize_update(update))
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
def _receive_ble(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer:
while self._uart.in_waiting >= 2:
update = self._deserialize_update(self._uart.read(2))
self._uart_buffer.append(update)
if self._uart_buffer:
keyboard.secondary_matrix_update = self._uart_buffer.pop(0)
def _checksum(self, update): def _checksum(self, update):
checksum = bytes([sum(update) & 0xFF]) checksum = bytes([sum(update) & 0xFF])
@ -358,27 +143,82 @@ class Split(Module):
def _send_uart(self, update): def _send_uart(self, update):
# Change offsets depending on where the data is going to match the correct # Change offsets depending on where the data is going to match the correct
# matrix location of the receiever # matrix location of the receiever
if self._uart is not None: update = self._serialize_update(update)
update = self._serialize_update(update) self._transport.write(self.uart_header)
self._uart.write(self.uart_header) self._transport.write(update)
self._uart.write(update) self._transport.write(self._checksum(update))
self._uart.write(self._checksum(update))
def _receive_uart(self, keyboard): def _receive_uart(self, keyboard):
if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: if self._transport.in_waiting > 0 or self._uart_buffer:
if self._uart.in_waiting >= 60: if self._transport.in_waiting >= 60:
# This is a dirty hack to prevent crashes in unrealistic cases # This is a dirty hack to prevent crashes in unrealistic cases
import microcontroller import microcontroller
microcontroller.reset() microcontroller.reset()
while self._uart.in_waiting >= 4: while self._transport.in_waiting >= 4:
# Check the header # Check the header
if self._uart.read(1) == self.uart_header: if self._transport.read(1) == self.uart_header:
update = self._uart.read(2) update = self._transport.read(2)
# check the checksum # check the checksum
if self._checksum(update) == self._uart.read(1): if self._checksum(update) == self._transport.read(1):
self._uart_buffer.append(self._deserialize_update(update)) self._uart_buffer.append(self._deserialize_update(update))
if self._uart_buffer: if self._uart_buffer:
keyboard.secondary_matrix_update = self._uart_buffer.pop(0) keyboard.secondary_matrix_update = self._uart_buffer.pop(0)
def _get_side(self):
name = str(getmount('/').label)
# if split side was given, find target from split_side.
if self.split_side == SplitSide.LEFT:
self._is_target = bool(self.split_target_left)
elif self.split_side == SplitSide.RIGHT:
self._is_target = not bool(self.split_target_left)
else:
# Detect split side from name
if (
self.split_type == SplitType.UART
or self.split_type == SplitType.ONEWIRE
):
self._is_target = runtime.usb_connected
if name.endswith('L'):
self.split_side = SplitSide.LEFT
elif name.endswith('R'):
self.split_side = SplitSide.RIGHT
def _init_transport(self, keyboard):
if self._is_target or not self.uart_flip:
tx_pin = self.data_pin2
rx_pin = self.data_pin
else:
tx_pin = self.data_pin
rx_pin = self.data_pin2
if split_type == SplitType.UART:
self._transport = busio.UART(tx=tx_pin, rx=rx_pin, timeout=self._uart_interval)
elif split_type == SplitType.PIO_UART:
self._transport = PIO_UART(tx=tx_pin, rx=rx_pin)
elif split_type == SplitType.BLE:
self._transport = BLE_UART(self._is_target, uart_interval)
else:
raise NotImplementedError
def _guess_coord_mapping(self, keyboard):
cm = []
rows_to_calc = len(keyboard.row_pins)
cols_to_calc = len(keyboard.col_pins)
# Flips the col order if PCB is the same but flipped on right
cols_rhs = list(range(cols_to_calc))
if self.split_flip:
cols_rhs = list(reversed(cols_rhs))
for ridx in range(rows_to_calc):
for cidx in range(cols_to_calc):
cm.append(cols_to_calc * ridx + cidx)
for cidx in cols_rhs:
cm.append(cols_to_calc * (rows_to_calc + ridx) + cidx)
keyboard.coord_mapping = tuple(cm)

173
kmk/transports/ble.py Normal file
View File

@ -0,0 +1,173 @@
from supervisor import ticks_ms
from kmk.hid import HIDModes
from kmk.kmktime import check_deadline
class BLE_UART:
def __init__(self, is_target, uart_interval=20):
try:
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.services.nordic import UARTService
self.BLERadio = BLERadio
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement
self.UARTService = UARTService
except ImportError:
print('BLE Import error')
return # BLE isn't supported on this platform
self._debug_enabled = True
self._ble_last_scan = ticks_ms() - 5000
self._connection_count = 0
self._split_connected = False
self._uart_connection = None
self._advertisment = None # Seems to not be used anywhere
self._advertising = False
self._psave_enabled = False
self._uart = None
self._uart_interval = uart_interval
self._is_target = is_target
@property
def in_waiting(self):
return self._uart.in_waiting
def read(self, n):
return self._uart.read(n)
def readinto(self, buf):
return self._uart.readinto(buf)
def write(self, buffer):
try:
self._uart.write(buffer)
except OSError:
try:
self._uart.disconnect()
except: # noqa: E722
if self._debug_enabled:
print('UART disconnect failed')
if self._debug_enabled:
print('Connection error')
self._uart_connection = None
self._uart = None
def check_connection(self, keyboard):
self._check_all_connections(keyboard)
def enable_powersave(self, enable=True):
if enable:
if self._uart_connection and not self._psave_enable:
self._uart_connection.connection_interval = self._uart_interval
self._psave_enabled = True
else:
if self._uart_connection and self._psave_enable:
self._uart_connection.connection_interval = 11.25
self._psave_enable = False
def _check_all_connections(self, keyboard):
'''Validates the correct number of BLE connections'''
self._previous_connection_count = self._connection_count
self._connection_count = len(self._ble.connections)
if self._is_target:
if self._advertising or not self._check_if_split_connected():
self._target_advertise()
elif self._connection_count < 2 and keyboard.hid_type == HIDModes.BLE:
keyboard._hid_helper.start_advertising()
elif not self._is_target and self._connection_count < 1:
self._initiator_scan()
def _check_if_split_connected(self):
# I'm looking for a way how to recognize which connection is on and which one off
# For now, I found that service name relation to having other CP device
if self._connection_count == 0:
return False
if self._connection_count == 2:
self._split_connected = True
return True
# Polling this takes some time so I check only if connection_count changed
if self._previous_connection_count == self._connection_count:
return self._split_connected
bleio_connection = self._ble.connections[0]._bleio_connection
connection_services = bleio_connection.discover_remote_services()
for service in connection_services:
if str(service.uuid).startswith("UUID('adaf0001"):
self._split_connected = True
return True
return False
def _initiator_scan(self):
'''Scans for target device'''
self._uart = None
self._uart_connection = None
# See if any existing connections are providing UARTService.
self._connection_count = len(self._ble.connections)
if self._connection_count > 0 and not self._uart:
for connection in self._ble.connections:
if self.UARTService in connection:
self._uart_connection = connection
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
break
if not self._uart:
if self._debug_enabled:
print('Scanning')
self._ble.stop_scan()
for adv in self._ble.start_scan(
self.ProvideServicesAdvertisement, timeout=20
):
if self._debug_enabled:
print('Scanning')
if self.UARTService in adv.services and adv.rssi > -70:
self._uart_connection = self._ble.connect(adv)
self._uart_connection.connection_interval = 11.25
self._uart = self._uart_connection[self.UARTService]
self._ble.stop_scan()
if self._debug_enabled:
print('Scan complete')
break
self._ble.stop_scan()
def _target_advertise(self):
'''Advertises the target for the initiator to find'''
# Give previous advertising some time to complete
if self._advertising:
if self._check_if_split_connected():
if self._debug_enabled:
print('Advertising complete')
self._ble.stop_advertising()
self._advertising = False
return
if not self.ble_rescan_timer():
return
if self._debug_enabled:
print('Advertising not answered')
self._ble.stop_advertising()
if self._debug_enabled:
print('Advertising')
# Uart must not change on this connection if reconnecting
if not self._uart:
self._uart = self.UARTService()
advertisement = self.ProvideServicesAdvertisement(self._uart)
self._ble.start_advertising(advertisement)
self._advertising = True
self.ble_time_reset()
def ble_rescan_timer(self):
'''If true, the rescan timer is up'''
return not bool(check_deadline(ticks_ms(), self._ble_last_scan, 5000))
def ble_time_reset(self):
'''Resets the rescan timer'''
self._ble_last_scan = ticks_ms()