From d7e43f7d0d79e344b1aab0215b097a18a68865de Mon Sep 17 00:00:00 2001 From: Kyle Brown Date: Sun, 9 Oct 2022 13:17:05 -0700 Subject: [PATCH] Continue split refactor --- kmk/modules/split.py | 93 ++++++++++++++++-------------------------- kmk/transports/ble.py | 12 +++++- kmk/transports/uart.py | 51 +++++++++++++++++++++++ 3 files changed, 96 insertions(+), 60 deletions(-) create mode 100644 kmk/transports/uart.py diff --git a/kmk/modules/split.py b/kmk/modules/split.py index b3c6947..31215b2 100644 --- a/kmk/modules/split.py +++ b/kmk/modules/split.py @@ -56,13 +56,6 @@ class Split(Module): if split_type == SplitType.UART and use_pio: split_type = SplitType.PIO_UART - if split_type == SplitType.UART: - import busio - elif split_type == SplitType.PIO_UART: - from kmk.transports.pio_uart import PIO_UART - elif split_type == SplitType.BLE: - from kmk.transports.ble import BLE_UART - def during_bootup(self, keyboard): # Set up name for target side detection and BLE advertisment if not self.data_pin: @@ -89,24 +82,12 @@ class Split(Module): offset += matrix.key_count def before_matrix_scan(self, keyboard): - if self.split_type == SplitType.BLE: - self._transport.check_connection(keyboard) - - if self.split_type == SplitType.UART: - if self._is_target or self.data_pin2: - self._receive_uart(keyboard) - else: - self._receive_uart(keyboard) + if self._can_receive(keyboard): + keyboard.secondary_matrix_update = self._transport.receive(keyboard) def after_matrix_scan(self, keyboard): if keyboard.matrix_update: - if self.split_type == SplitType.UART: - if not self._is_target or self.data_pin2: - self._send_uart(keyboard.matrix_update) - else: - pass # explicit pass just for dev sanity... - else: - self._send_uart(keyboard.matrix_update) + self._transport.write(keyboard, keyboard.matrix_update) def before_hid_send(self, keyboard): if not self._is_target: @@ -118,12 +99,10 @@ class Split(Module): return def on_powersave_enable(self, keyboard): - if self.split_type == SplitType.BLE: - self._transport.enable_powersave(True) + self._transport.powersave(True) def on_powersave_disable(self, keyboard): - if self.split_type == SplitType.BLE: - self._transport.enable_powersave(False) + self._transport.powersave(False) def _serialize_update(self, update): buffer = bytearray(2) @@ -140,33 +119,6 @@ class Split(Module): return checksum - def _send_uart(self, update): - # Change offsets depending on where the data is going to match the correct - # matrix location of the receiever - update = self._serialize_update(update) - self._transport.write(self.uart_header) - self._transport.write(update) - self._transport.write(self._checksum(update)) - - def _receive_uart(self, keyboard): - if self._transport.in_waiting > 0 or self._uart_buffer: - if self._transport.in_waiting >= 60: - # This is a dirty hack to prevent crashes in unrealistic cases - import microcontroller - - microcontroller.reset() - - while self._transport.in_waiting >= 4: - # Check the header - if self._transport.read(1) == self.uart_header: - update = self._transport.read(2) - - # check the checksum - if self._checksum(update) == self._transport.read(1): - self._uart_buffer.append(self._deserialize_update(update)) - if self._uart_buffer: - keyboard.secondary_matrix_update = self._uart_buffer.pop(0) - def _get_side(self): name = str(getmount('/').label) # if split side was given, find target from split_side. @@ -188,6 +140,13 @@ class Split(Module): self.split_side = SplitSide.RIGHT def _init_transport(self, keyboard): + if self.split_type == SplitType.UART: + from kmk.transports.uart import UART + elif self.split_type == SplitType.PIO_UART: + from kmk.transports.pio_uart import PIO_UART + elif self.split_type == SplitType.BLE: + from kmk.transports.ble import BLE_UART + if self._is_target or not self.uart_flip: tx_pin = self.data_pin2 rx_pin = self.data_pin @@ -195,12 +154,19 @@ class Split(Module): tx_pin = self.data_pin rx_pin = self.data_pin2 - if split_type == SplitType.UART: - self._transport = busio.UART(tx=tx_pin, rx=rx_pin, timeout=self._uart_interval) - elif split_type == SplitType.PIO_UART: + if self.split_type == SplitType.UART: + self._transport = UART( + tx=tx_pin, + rx=rx_pin, + target=self.is_target, + uart_interval=self._uart_interval, + ) + elif self.split_type == SplitType.PIO_UART: self._transport = PIO_UART(tx=tx_pin, rx=rx_pin) - elif split_type == SplitType.BLE: - self._transport = BLE_UART(self._is_target, uart_interval) + elif self.split_type == SplitType.BLE: + self._transport = BLE_UART( + target=self._is_target, uart_interval=self.uart_interval + ) else: raise NotImplementedError @@ -222,3 +188,14 @@ class Split(Module): cm.append(cols_to_calc * (rows_to_calc + ridx) + cidx) keyboard.coord_mapping = tuple(cm) + + def _can_receive(self, keyboard) -> bool: + if self.split_type == SplitType.BLE: + self._transport.check_connection(keyboard) + return True + + elif self.split_type == SplitType.UART: + if self._is_target or self.data_pin2: + return True + + return False diff --git a/kmk/transports/ble.py b/kmk/transports/ble.py index f2879cc..98e36d6 100644 --- a/kmk/transports/ble.py +++ b/kmk/transports/ble.py @@ -5,7 +5,7 @@ from kmk.kmktime import check_deadline class BLE_UART: - def __init__(self, is_target, uart_interval=20): + def __init__(self, is_target=True, uart_interval=20): try: from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement @@ -58,7 +58,7 @@ class BLE_UART: def check_connection(self, keyboard): self._check_all_connections(keyboard) - def enable_powersave(self, enable=True): + def powersave(self, enable=True): if enable: if self._uart_connection and not self._psave_enable: self._uart_connection.connection_interval = self._uart_interval @@ -171,3 +171,11 @@ class BLE_UART: def ble_time_reset(self): '''Resets the rescan timer''' self._ble_last_scan = ticks_ms() + + def receive(self, keyboard): + if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: + while self._uart.in_waiting >= 2: + update = self._deserialize_update(self._uart.read(2)) + self._uart_buffer.append(update) + if self._uart_buffer: + keyboard.secondary_matrix_update = self._uart_buffer.pop(0) diff --git a/kmk/transports/uart.py b/kmk/transports/uart.py new file mode 100644 index 0000000..1f8d0ca --- /dev/null +++ b/kmk/transports/uart.py @@ -0,0 +1,51 @@ +class UART: + def __init__(self, tx=None, rx=None, is_target=True, uart_interval=20): + self._debug_enabled = True + self._uart_connection = None + self._uart = None + self._uart_interval = uart_interval + self._is_target = is_target + + @property + def in_waiting(self): + return self._uart.in_waiting + + def read(self, n): + return self._uart.read(n) + + def readinto(self, buf): + return self._uart.readinto(buf) + + def powersave(enable: bool): + return + + def write(self, buffer, update): + if self._uart is not None: + if not self._is_target or self.data_pin2: + update = self._serialize_update(update) + self._uart.write(self.uart_header) + self._uart.write(update) + self._uart.write(self._checksum(update)) + + def receive(self, keyboard): + if self._transport.in_waiting > 0 or self._uart_buffer: + if self._transport.in_waiting >= 60: + # This is a dirty hack to prevent crashes in unrealistic cases + # TODO See if this hack is needed with checksum, and if not, use + # that to fix it. + import microcontroller + + microcontroller.reset() + + while self._transport.in_waiting >= 4: + # Check the header + if self._transport.read(1) == self.uart_header: + update = self._transport.read(2) + + # check the checksum + if self._checksum(update) == self._transport.read(1): + self._uart_buffer.append(self._deserialize_update(update)) + if self._uart_buffer: + return self._uart_buffer.pop(0) + + return None