diff --git a/boards/crkbd/kb.py b/boards/crkbd/kb.py index ca9742d..c93b487 100644 --- a/boards/crkbd/kb.py +++ b/boards/crkbd/kb.py @@ -16,7 +16,7 @@ class KMKKeyboard(_KMKKeyboard): ) row_pins = (board.P0_22, board.P0_24, board.P1_00, board.P0_11) diode_orientation = DiodeOrientation.COLUMNS - uart_pin = board.P0_08 + data_pin = board.P0_08 rgb_pixel_pin = board.P0_06 i2c = board.I2C powersave_pin = board.P0_13 diff --git a/boards/crkbd/main.py b/boards/crkbd/main.py index d39e4d8..2417ce3 100644 --- a/boards/crkbd/main.py +++ b/boards/crkbd/main.py @@ -1,11 +1,21 @@ from kb import KMKKeyboard, rgb_pixel_pin from kmk.extensions.rgb import RGB from kmk.keys import KC -from kmk.modules.ble_split import BLE_Split from kmk.modules.layers import Layers +from kmk.modules.split import Split, SplitType keyboard = KMKKeyboard() +# Adding extentions +rgb = RGB(pixel_pin=rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5) + +split = Split(split_type=SplitType.BLE) + +layers_ext = Layers() + +keyboard.modules = [layers_ext, split] +keyboard.extensions = [rgb] +# # Cleaner key names _______ = KC.TRNS XXXXXXX = KC.NO @@ -22,19 +32,6 @@ RGB_SAD = KC.RGB_SAD RGB_VAI = KC.RGB_VAI RGB_VAD = KC.RGB_VAD -# Adding extentions -rgb = RGB(pixel_pin=rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5) - -# TODO Comment one of these on each side -# Left is 0, Right is 1 -split_side = 0 -split_side = 1 -split = BLE_Split(split_side=split_side) - -layers_ext = Layers() - -keyboard.modules = [layers_ext, split] -keyboard.extensions = [rgb] keyboard.keymap = [ [ #QWERTY diff --git a/kmk/kmk_keyboard.py b/kmk/kmk_keyboard.py index 9fe08da..920344f 100644 --- a/kmk/kmk_keyboard.py +++ b/kmk/kmk_keyboard.py @@ -8,6 +8,12 @@ from kmk.matrix import MatrixScanner, intify_coordinate from kmk.types import TapDanceKeyMeta +class Sandbox: + matrix_update = None + secondary_matrix_update = None + active_layers = None + + class KMKKeyboard: ##### # User-configurable @@ -28,11 +34,7 @@ class KMKKeyboard: modules = [] extensions = [] - sandbox = { - 'matrix_update': None, - 'secondary_matrix_update': None, - 'active_layers': [0], - } + sandbox = Sandbox() ##### # Internal State @@ -304,10 +306,7 @@ class KMKKeyboard: To save RAM on boards that don't use Split, we don't import Split and do an isinstance check, but instead do string detection ''' - if any( - x.__class__.__module__ in {'kmk.modules.split', 'kmk.modules.ble_split'} - for x in self.modules - ): + if any(x.__class__.__module__ == 'kmk.modules.split' for x in self.modules): return if not self.coord_mapping: @@ -417,7 +416,7 @@ class KMKKeyboard: print('Failed to run post hid function in extension: ', err, ext) def powersave_disable(self): - for module in self.extensions: + for module in self.modules: try: module.on_powersave_disable(self) except Exception as err: @@ -438,6 +437,12 @@ class KMKKeyboard: self._init_coord_mapping() self._init_hid() + for module in self.modules: + try: + module.during_bootup(self) + except Exception: + if self.debug_enabled: + print('Failed to load module', module) for ext in self.extensions: try: ext.during_bootup(self) @@ -451,7 +456,7 @@ class KMKKeyboard: while True: self.state_changed = False - self.sandbox.active_layers = self.active_layers + self.sandbox.active_layers = self.active_layers.copy() self.before_matrix_scan() diff --git a/kmk/modules/ble_split.py b/kmk/modules/ble_split.py deleted file mode 100644 index e549e08..0000000 --- a/kmk/modules/ble_split.py +++ /dev/null @@ -1,199 +0,0 @@ -'''Enables splitting keyboards wirelessly''' -from adafruit_ble import BLERadio -from adafruit_ble.advertising.standard import ProvideServicesAdvertisement -from adafruit_ble.services.nordic import UARTService -from kmk.hid import HIDModes -from kmk.kmktime import ticks_diff, ticks_ms -from kmk.matrix import intify_coordinate -from kmk.modules import Module -from storage import getmount - - -class BLE_Split(Module): - '''Enables splitting keyboards wirelessly''' - - def __init__( - self, split_flip=True, split_side=None, uart_interval=30, hid_type=HIDModes.BLE - ): - self._is_target = True - self._uart_buffer = [] - self.hid_type = hid_type - self.split_flip = split_flip - self.split_side = split_side - self.split_offset = None - self._ble = BLERadio() - self._ble_last_scan = ticks_ms() - 5000 - self._is_target = True - self._connection_count = 0 - self._uart = None - self._uart_connection = None - self._advertisment = None - self._advertising = False - self._uart_interval = uart_interval - self._psave_enable = False - self._debug_enabled = False - - def __repr__(self): - return f'BLE_SPLIT({self._to_dict()})' - - def _to_dict(self): - return { - '_ble': self._ble, - '_ble_last_scan': self._ble_last_scan, - '_is_target': self._is_target, - 'uart_buffer': self._uart_buffer, - '_split_flip': self.split_flip, - '_split_side': self.split_side, - } - - def during_bootup(self, keyboard): - self._debug_enabled = keyboard.debug_enabled - self._ble.name = str(getmount('/').label) - if self.split_side is None: - if self._ble.name.endswith('L'): - # If name ends in 'L' assume left and strip from name - self._is_target = True - elif self._ble.name.endswith('R'): - # If name ends in 'R' assume right and strip from name - self._is_target = False - else: - self._is_target = bool(self.split_side == 0) - - if self.split_flip and not self._is_target: - keyboard.col_pins = list(reversed(keyboard.col_pins)) - - self.split_offset = len(keyboard.col_pins) - - # Attempt to sanely guess a coord_mapping if one is not provided. - if not keyboard.coord_mapping: - keyboard.coord_mapping = [] - - rows_to_calc = len(keyboard.row_pins) * 2 - cols_to_calc = len(keyboard.col_pins) * 2 - - for ridx in range(rows_to_calc): - for cidx in range(cols_to_calc): - keyboard.coord_mapping.append(intify_coordinate(ridx, cidx)) - - def before_matrix_scan(self, keyboard): - self._check_all_connections() - return self._receive(keyboard) - - def after_matrix_scan(self, keyboard): - if keyboard.matrix_update: - keyboard.matrix_update = self._send(keyboard.matrix_update) - return - - def before_hid_send(self, keyboard): - return - - def after_hid_send(self, keyboard): - return - - def on_powersave_enable(self, keyboard): - if self._uart_connection and not self._psave_enable: - self._uart_connection.connection_interval = self._uart_interval - self._psave_enable = True - - def on_powersave_disable(self, keyboard): - if self._uart_connection and self._psave_enable: - self._uart_connection.connection_interval = 11.25 - self._psave_enable = False - - def _check_all_connections(self): - '''Validates the correct number of BLE connections''' - self._connection_count = len(self._ble.connections) - if self._is_target and self._connection_count < 2: - self._target_advertise() - elif not self._is_target and self._connection_count < 1: - self._initiator_scan() - - def _initiator_scan(self): - '''Scans for target device''' - self._uart = None - self._uart_connection = None - # See if any existing connections are providing UARTService. - self._connection_count = len(self._ble.connections) - if self._connection_count > 0 and not self._uart: - for connection in self._ble.connections: - if UARTService in connection: - self._uart_connection = connection - self._uart_connection.connection_interval = 11.25 - self._uart = self._uart_connection[UARTService] - break - - if not self._uart: - if self._debug_enabled: - print('Scanning') - self._ble.stop_scan() - for adv in self._ble.start_scan(ProvideServicesAdvertisement, timeout=20): - if self._debug_enabled: - print('Scanning') - if UARTService in adv.services and adv.rssi > -70: - self._uart_connection = self._ble.connect(adv) - self._uart_connection.connection_interval = 11.25 - self._uart = self._uart_connection[UARTService] - self._ble.stop_scan() - if self._debug_enabled: - print('Scan complete') - break - self._ble.stop_scan() - - def _target_advertise(self): - '''Advertises the target for the initiator to find''' - self._ble.stop_advertising() - if self._debug_enabled: - print('Advertising') - # Uart must not change on this connection if reconnecting - if not self._uart: - self._uart = UARTService() - advertisement = ProvideServicesAdvertisement(self._uart) - - self._ble.start_advertising(advertisement) - - self.ble_time_reset() - while not self.ble_rescan_timer(): - self._connection_count = len(self._ble.connections) - if self._connection_count > 1: - self.ble_time_reset() - if self._debug_enabled: - print('Advertising complete') - break - self._ble.stop_advertising() - - def ble_rescan_timer(self): - '''If true, the rescan timer is up''' - return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000) - - def ble_time_reset(self): - '''Resets the rescan timer''' - self._ble_last_scan = ticks_ms() - - def _send(self, update): - if self._uart: - try: - if not self._is_target: - update[1] += self.split_offset - self._uart.write(update) - except OSError: - try: - self._uart.disconnect() - except: # noqa: E722 - if self._debug_enabled: - print('UART disconnect failed') - - if self._debug_enabled: - print('Connection error') - self._uart_connection = None - self._uart = None - return update - - def _receive(self, keyboard): - if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: - while self._uart.in_waiting >= 3: - self._uart_buffer.append(self._uart.read(3)) - if self._uart_buffer: - keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) - return - - return None diff --git a/kmk/modules/power.py b/kmk/modules/power.py index 93bbc41..0df5bea 100644 --- a/kmk/modules/power.py +++ b/kmk/modules/power.py @@ -14,7 +14,7 @@ class Power(Module): self._powersave_start = ticks_ms() self._usb_last_scan = ticks_ms() - 5000 self._psp = None # Powersave pin object - self._i2c = None + self._i2c = 0 self._loopcounter = 0 make_key( @@ -58,18 +58,15 @@ class Power(Module): def on_powersave_enable(self, keyboard): '''Gives 10 cycles to allow other extentions to clean up before powersave''' - if keyboard._trigger_powersave_enable: - if self._loopcounter > 10: - self._loopcounter += 1 - return - self._loopcounter = 0 - keyboard._trigger_powersave_enable = False + if self._loopcounter > 10: self.enable_powersave(keyboard) + self._loopcounter = 0 + else: + self._loopcounter += 1 return def on_powersave_disable(self, keyboard): - keyboard._trigger_powersave_disable = False - self.disable_powersave() + self.disable_powersave(keyboard) return def enable_powersave(self, keyboard): @@ -80,23 +77,24 @@ class Power(Module): if not self._psp: self._psp = digitalio.DigitalInOut(self.powersave_pin) - self._psp.direction = digitalio.Direction.OUTPUT - self._psp.value = True + self._psp.direction = digitalio.Direction.OUTPUT + if self._psp: + self._psp.value = True self.enable = True + keyboard._trigger_powersave_enable = False + return - def disable_powersave(self): + def disable_powersave(self, keyboard): '''Disables power saving features''' - if self.powersave_pin: + if self._psp: + self._psp.value = False # Allows power save to prevent RGB drain. # Example here https://docs.nicekeyboards.com/#/nice!nano/pinout_schematic - if not self._psp: - self._psp = digitalio.DigitalInOut(self.powersave_pin) - self._psp.direction = digitalio.Direction.OUTPUT - self._psp.value = False - + keyboard._trigger_powersave_disable = False self.enable = False + return def psleep(self): ''' @@ -106,6 +104,7 @@ class Power(Module): sleep_ms(8) elif ticks_diff(ticks_ms(), self._powersave_start) >= 240000: sleep_ms(180) + return def psave_time_reset(self): self._powersave_start = ticks_ms() @@ -118,12 +117,14 @@ class Power(Module): self._i2c = len(i2c.scan()) finally: i2c.unlock() + return def usb_rescan_timer(self): return bool(ticks_diff(ticks_ms(), self._usb_last_scan) > 5000) def usb_time_reset(self): self._usb_last_scan = ticks_ms() + return def usb_scan(self): # TODO Add USB detection here. Currently lies that it's connected diff --git a/kmk/modules/split-common.py b/kmk/modules/split-common.py deleted file mode 100644 index d6df5e2..0000000 --- a/kmk/modules/split-common.py +++ /dev/null @@ -1,306 +0,0 @@ -'''Enables splitting keyboards wirelessly or wired''' -import busio -from micropython import const - -from kmk.hid import HIDModes -from kmk.kmktime import ticks_diff, ticks_ms -from kmk.matrix import intify_coordinate -from kmk.modules import Module -from storage import getmount - - -class SplitSide: - LEFT = const(1) - RIGHT = const(2) - - -class SplitType: - UART = const(1) - I2C = const(2) # unused - ONEWIRE = const(3) # unused - BLE = const(4) - - -class Split_Common(Module): - '''Enables splitting keyboards wirelessly, or wired''' - - def __init__( - self, - split_flip=True, - split_side=None, - split_type=SplitType.BLE, - split_target_left=True, - uart_interval=20, - hid_type=HIDModes.BLE, - data_pin=None, - data_pin2=None, - target_left=True, - uart_flip=True, - ): - self._is_target = True - self._uart_buffer = [] - self.hid_type = hid_type - self.split_flip = split_flip - self.split_side = split_side - self.split_type = split_type - self.split_target_left = split_target_left - self.split_offset = None - self.data_pin = data_pin - self.data_pin2 = data_pin2 - self.target_left = target_left - self.uart_flip = uart_flip - self._is_target = True - self._uart = None - self._uart_interval = uart_interval - self._debug_enabled = False - if self.split_type == SplitType.BLE: - try: - from adafruit_ble import BLERadio - from adafruit_ble.advertising.standard import ( - ProvideServicesAdvertisement, - ) - from adafruit_ble.services.nordic import UARTService - - self.ProvideServicesAdvertisement = ProvideServicesAdvertisement - self.UARTService = UARTService - except ImportError: - pass # BLE isn't supported on this platform - self._ble = BLERadio() - self._ble_last_scan = ticks_ms() - 5000 - self._connection_count = 0 - self._uart_connection = None - self._advertisment = None - self._advertising = False - self._psave_enable = False - - def __repr__(self): - return f'BLE_SPLIT({self._to_dict()})' - - def _to_dict(self): - return { - '_ble': self._ble, - '_ble_last_scan': self._ble_last_scan, - '_is_target': self._is_target, - 'uart_buffer': self._uart_buffer, - '_split_flip': self.split_flip, - '_split_side': self.split_side, - } - - def during_bootup(self, keyboard): - # Set up name for target side detection and BLE advertisment - name = str(getmount('/').label) - if self.split_type == SplitType.BLE: - self._ble.name = name - - # Detect split side from name - if self.split_side is None: - if name.endswith('L'): - # If name ends in 'L' assume left and strip from name - self._is_target = bool(self.split_target_left) - self.split_side = SplitSide.LEFT - elif name.endswith('R'): - # If name ends in 'R' assume right and strip from name - self._is_target = not bool(self.split_target_left) - self.split_side = SplitSide.RIGHT - - # if split side was given, find master from split_side. - elif self.split_side == SplitSide.LEFT: - self._is_target = bool(self.split_target_left) - elif self.split_side == SplitSide.RIGHT: - self._is_target = not bool(self.split_target_left) - - # Flips the col pins if PCB is the same but flipped on right - if self.split_flip and self.split_side == SplitSide.RIGHT: - keyboard.col_pins = list(reversed(keyboard.col_pins)) - - self.split_offset = len(keyboard.col_pins) - - if self.split_type == SplitType.UART and self.data_pin is not None: - if self._is_target: - self._uart = busio.UART( - tx=self.data_pin2, rx=self.data_pin, timeout=self._uart_interval - ) - else: - self._uart = busio.UART( - tx=self.data_pin, rx=self.data_pin2, timeout=self._uart_interval - ) - - # Attempt to sanely guess a coord_mapping if one is not provided. - if not keyboard.coord_mapping: - keyboard.coord_mapping = [] - - rows_to_calc = len(keyboard.row_pins) * 2 - cols_to_calc = len(keyboard.col_pins) * 2 - - for ridx in range(rows_to_calc): - for cidx in range(cols_to_calc): - keyboard.coord_mapping.append(intify_coordinate(ridx, cidx)) - - def before_matrix_scan(self, keyboard): - if self.split_type == SplitType.BLE: - self._check_all_connections() - self._receive_ble(keyboard) - elif self.split_type == SplitType.UART: - if self._is_target or self.data_pin2: - self._receive_uart(keyboard) - elif self.split_type == SplitType.ONEWIRE: - pass # Protocol needs written - return - - def after_matrix_scan(self, keyboard): - if keyboard.matrix_update: - if self.split_type == SplitType.BLE: - self._send_ble(keyboard.matrix_update) - elif self.split_type == SplitType.UART and self.data_pin2: - self._send_uart(keyboard.matrix_update) - elif self.split_type == SplitType.ONEWIRE: - pass # Protocol needs written - - return - - def before_hid_send(self, keyboard): - return - - def after_hid_send(self, keyboard): - return - - def on_powersave_enable(self, keyboard): - if self.split_type == SplitType.BLE: - if self._uart_connection and not self._psave_enable: - self._uart_connection.connection_interval = self._uart_interval - self._psave_enable = True - - def on_powersave_disable(self, keyboard): - if self.split_type == SplitType.BLE: - if self._uart_connection and self._psave_enable: - self._uart_connection.connection_interval = 11.25 - self._psave_enable = False - - def _check_all_connections(self): - '''Validates the correct number of BLE connections''' - self._connection_count = len(self._ble.connections) - if self._is_target and self._connection_count < 2: - self._target_advertise() - elif not self._is_target and self._connection_count < 1: - self._initiator_scan() - - def _initiator_scan(self): - '''Scans for target device''' - self._uart = None - self._uart_connection = None - # See if any existing connections are providing UARTService. - self._connection_count = len(self._ble.connections) - if self._connection_count > 0 and not self._uart: - for connection in self._ble.connections: - if self.UARTService in connection: - self._uart_connection = connection - self._uart_connection.connection_interval = 11.25 - self._uart = self._uart_connection[self.UARTService] - break - - if not self._uart: - if self._debug_enabled: - print('Scanning') - self._ble.stop_scan() - for adv in self._ble.start_scan( - self.ProvideServicesAdvertisement, timeout=20 - ): - if self._debug_enabled: - print('Scanning') - if self.UARTService in adv.services and adv.rssi > -70: - self._uart_connection = self._ble.connect(adv) - self._uart_connection.connection_interval = 11.25 - self._uart = self._uart_connection[self.UARTService] - self._ble.stop_scan() - if self._debug_enabled: - print('Scan complete') - break - self._ble.stop_scan() - - def _target_advertise(self): - '''Advertises the target for the initiator to find''' - self._ble.stop_advertising() - if self._debug_enabled: - print('Advertising') - # Uart must not change on this connection if reconnecting - if not self._uart: - self._uart = self.UARTService() - advertisement = self.ProvideServicesAdvertisement(self._uart) - - self._ble.start_advertising(advertisement) - - self.ble_time_reset() - while not self.ble_rescan_timer(): - self._connection_count = len(self._ble.connections) - if self._connection_count > 1: - self.ble_time_reset() - if self._debug_enabled: - print('Advertising complete') - break - self._ble.stop_advertising() - - def ble_rescan_timer(self): - '''If true, the rescan timer is up''' - return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000) - - def ble_time_reset(self): - '''Resets the rescan timer''' - self._ble_last_scan = ticks_ms() - - def _send_ble(self, update): - if self._uart: - try: - if not self._is_target: - update[1] += self.split_offset - self._uart.write(update) - except OSError: - try: - self._uart.disconnect() - except: # noqa: E722 - if self._debug_enabled: - print('UART disconnect failed') - - if self._debug_enabled: - print('Connection error') - self._uart_connection = None - self._uart = None - - def _receive_ble(self, keyboard): - if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: - while self._uart.in_waiting >= 3: - self._uart_buffer.append(self._uart.read(3)) - if self._uart_buffer: - keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) - return - - def _send_uart(self, update): - # Change offsets depending on where the data is going to match the correct - # matrix location of the receiever - if self._is_target: - if self.split_target_left: - update[1] += self.split_offset - else: - update[1] -= self.split_offset - else: - if self.split_target_left: - update[1] -= self.split_offset - else: - update[1] += self.split_offset - - if self._uart is not None: - self._uart.write(update) - - def _receive_uart(self, keyboard): - if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: - if self._uart.in_waiting >= 60: - # This is a dirty hack to prevent crashes in unrealistic cases - import microcontroller - - microcontroller.reset() - - while self._uart.in_waiting >= 3: - self._uart_buffer.append(self._uart.read(3)) - if self._uart_buffer: - keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) - - return diff --git a/kmk/modules/split.py b/kmk/modules/split.py index 878862b..5bb85c9 100644 --- a/kmk/modules/split.py +++ b/kmk/modules/split.py @@ -1,84 +1,134 @@ +'''Enables splitting keyboards wirelessly or wired''' import busio +from micropython import const +from kmk.hid import HIDModes +from kmk.kmktime import ticks_diff, ticks_ms from kmk.matrix import intify_coordinate from kmk.modules import Module from storage import getmount +class SplitSide: + LEFT = const(1) + RIGHT = const(2) + + class SplitType: - UART = 1 - I2C = 2 # unused - ONEWIRE = 3 # unused + UART = const(1) + I2C = const(2) # unused + ONEWIRE = const(3) # unused + BLE = const(4) class Split(Module): + '''Enables splitting keyboards wirelessly, or wired''' + def __init__( self, - is_target=True, - extra_data_pin=None, - split_offset=None, split_flip=True, split_side=None, - split_type=SplitType.UART, + split_type=SplitType.BLE, + split_target_left=True, + uart_interval=20, + hid_type=HIDModes.BLE, + data_pin=None, + data_pin2=None, target_left=True, uart_flip=True, - uart_pin=None, - uart_pin2=None, - uart_timeout=20, ): - self._is_target = is_target - self.extra_data_pin = extra_data_pin - self.split_offsets = split_offset + self._is_target = True + self._uart_buffer = [] + self.hid_type = hid_type self.split_flip = split_flip self.split_side = split_side self.split_type = split_type - self.split_target_left = target_left - self._uart = None - self._uart_buffer = [] + self.split_target_left = split_target_left + self.split_offset = None + self.data_pin = data_pin + self.data_pin2 = data_pin2 + self.target_left = target_left self.uart_flip = uart_flip - self.uart_pin = uart_pin - self.uart_pin2 = uart_pin2 - self.uart_timeout = uart_timeout + self._is_target = True + self._uart = None + self._uart_interval = uart_interval + self._debug_enabled = False + if self.split_type == SplitType.BLE: + try: + from adafruit_ble import BLERadio + from adafruit_ble.advertising.standard import ( + ProvideServicesAdvertisement, + ) + from adafruit_ble.services.nordic import UARTService + + self.ProvideServicesAdvertisement = ProvideServicesAdvertisement + self.UARTService = UARTService + except ImportError: + pass # BLE isn't supported on this platform + self._ble = BLERadio() + self._ble_last_scan = ticks_ms() - 5000 + self._connection_count = 0 + self._uart_connection = None + self._advertisment = None + self._advertising = False + self._psave_enable = False + + def __repr__(self): + return f'BLE_SPLIT({self._to_dict()})' + + def _to_dict(self): + return { + '_ble': self._ble, + '_ble_last_scan': self._ble_last_scan, + '_is_target': self._is_target, + 'uart_buffer': self._uart_buffer, + '_split_flip': self.split_flip, + '_split_side': self.split_side, + } def during_bootup(self, keyboard): - try: - # Working around https://github.com/adafruit/circuitpython/issues/1769 - keyboard._hid_helper_inst.create_report([]).send() - # Line above is broken and needs fixed for aut detection - self._is_target = True - except OSError: - self._is_target = False + # Set up name for target side detection and BLE advertisment + name = str(getmount('/').label) + if self.split_type == SplitType.BLE: + self._ble.name = name + + # Detect split side from name if self.split_side is None: - l_or_r = str(getmount('/').label) - if l_or_r.endswith('L'): + if name.endswith('L'): # If name ends in 'L' assume left and strip from name - self.split_side = 0 - elif l_or_r.endswith('R'): + self._is_target = bool(self.split_target_left) + self.split_side = SplitSide.LEFT + elif name.endswith('R'): # If name ends in 'R' assume right and strip from name - self.split_side = 1 + self._is_target = not bool(self.split_target_left) + self.split_side = SplitSide.RIGHT - if self.split_flip and not self._is_target: + # if split side was given, find master from split_side. + elif self.split_side == SplitSide.LEFT: + self._is_target = bool(self.split_target_left) + elif self.split_side == SplitSide.RIGHT: + self._is_target = not bool(self.split_target_left) + + # Flips the col pins if PCB is the same but flipped on right + if self.split_flip and self.split_side == SplitSide.RIGHT: keyboard.col_pins = list(reversed(keyboard.col_pins)) - if self.split_side == 0: - self.split_target_left = self._is_target - elif self.split_side == 1: - self.split_target_left = not self._is_target - if self.uart_pin is not None: + self.split_offset = len(keyboard.col_pins) + + if self.split_type == SplitType.UART and self.data_pin is not None: if self._is_target: self._uart = busio.UART( - tx=self.uart_pin2, rx=self.uart_pin, timeout=self.uart_timeout + tx=self.data_pin2, rx=self.data_pin, timeout=self._uart_interval ) else: self._uart = busio.UART( - tx=self.uart_pin, rx=self.uart_pin2, timeout=self.uart_timeout + tx=self.data_pin, rx=self.data_pin2, timeout=self._uart_interval ) + # Attempt to sanely guess a coord_mapping if one is not provided. if not keyboard.coord_mapping: keyboard.coord_mapping = [] - self.split_offset = len(keyboard.col_pins) - rows_to_calc = len(keyboard.row_pins) * 2 cols_to_calc = len(keyboard.col_pins) * 2 @@ -87,13 +137,26 @@ class Split(Module): keyboard.coord_mapping.append(intify_coordinate(ridx, cidx)) def before_matrix_scan(self, keyboard): - if self._is_target or self.uart_pin2: - return self._receive(keyboard) - return None + if self.split_type == SplitType.BLE: + self._check_all_connections() + self._receive_ble(keyboard) + elif self.split_type == SplitType.UART: + if self._is_target or self.data_pin2: + self._receive_uart(keyboard) + elif self.split_type == SplitType.ONEWIRE: + pass # Protocol needs written + return def after_matrix_scan(self, keyboard): - if keyboard.matrix_update is not None and not self._is_target: - self._send(keyboard.matrix_update) + if keyboard.matrix_update: + if self.split_type == SplitType.BLE: + self._send_ble(keyboard.matrix_update) + elif self.split_type == SplitType.UART and self.data_pin2: + self._send_uart(keyboard.matrix_update) + elif self.split_type == SplitType.ONEWIRE: + pass # Protocol needs written + + return def before_hid_send(self, keyboard): return @@ -102,20 +165,132 @@ class Split(Module): return def on_powersave_enable(self, keyboard): - return + if self.split_type == SplitType.BLE: + if self._uart_connection and not self._psave_enable: + self._uart_connection.connection_interval = self._uart_interval + self._psave_enable = True def on_powersave_disable(self, keyboard): - return + if self.split_type == SplitType.BLE: + if self._uart_connection and self._psave_enable: + self._uart_connection.connection_interval = 11.25 + self._psave_enable = False - def _send(self, update): - if self.split_target_left: - update[1] += self.split_offset + def _check_all_connections(self): + '''Validates the correct number of BLE connections''' + self._connection_count = len(self._ble.connections) + if self._is_target and self._connection_count < 2: + self._target_advertise() + elif not self._is_target and self._connection_count < 1: + self._initiator_scan() + + def _initiator_scan(self): + '''Scans for target device''' + self._uart = None + self._uart_connection = None + # See if any existing connections are providing UARTService. + self._connection_count = len(self._ble.connections) + if self._connection_count > 0 and not self._uart: + for connection in self._ble.connections: + if self.UARTService in connection: + self._uart_connection = connection + self._uart_connection.connection_interval = 11.25 + self._uart = self._uart_connection[self.UARTService] + break + + if not self._uart: + if self._debug_enabled: + print('Scanning') + self._ble.stop_scan() + for adv in self._ble.start_scan( + self.ProvideServicesAdvertisement, timeout=20 + ): + if self._debug_enabled: + print('Scanning') + if self.UARTService in adv.services and adv.rssi > -70: + self._uart_connection = self._ble.connect(adv) + self._uart_connection.connection_interval = 11.25 + self._uart = self._uart_connection[self.UARTService] + self._ble.stop_scan() + if self._debug_enabled: + print('Scan complete') + break + self._ble.stop_scan() + + def _target_advertise(self): + '''Advertises the target for the initiator to find''' + self._ble.stop_advertising() + if self._debug_enabled: + print('Advertising') + # Uart must not change on this connection if reconnecting + if not self._uart: + self._uart = self.UARTService() + advertisement = self.ProvideServicesAdvertisement(self._uart) + + self._ble.start_advertising(advertisement) + + self.ble_time_reset() + while not self.ble_rescan_timer(): + self._connection_count = len(self._ble.connections) + if self._connection_count > 1: + self.ble_time_reset() + if self._debug_enabled: + print('Advertising complete') + break + self._ble.stop_advertising() + + def ble_rescan_timer(self): + '''If true, the rescan timer is up''' + return bool(ticks_diff(ticks_ms(), self._ble_last_scan) > 5000) + + def ble_time_reset(self): + '''Resets the rescan timer''' + self._ble_last_scan = ticks_ms() + + def _send_ble(self, update): + if self._uart: + try: + if not self._is_target: + update[1] += self.split_offset + self._uart.write(update) + except OSError: + try: + self._uart.disconnect() + except: # noqa: E722 + if self._debug_enabled: + print('UART disconnect failed') + + if self._debug_enabled: + print('Connection error') + self._uart_connection = None + self._uart = None + + def _receive_ble(self, keyboard): + if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: + while self._uart.in_waiting >= 3: + self._uart_buffer.append(self._uart.read(3)) + if self._uart_buffer: + keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) + return + + def _send_uart(self, update): + # Change offsets depending on where the data is going to match the correct + # matrix location of the receiever + if self._is_target: + if self.split_target_left: + update[1] += self.split_offset + else: + update[1] -= self.split_offset else: - update[1] -= self.split_offsets + if self.split_target_left: + update[1] -= self.split_offset + else: + update[1] += self.split_offset + if self._uart is not None: self._uart.write(update) - def _receive(self, keyboard): + def _receive_uart(self, keyboard): if self._uart is not None and self._uart.in_waiting > 0 or self._uart_buffer: if self._uart.in_waiting >= 60: # This is a dirty hack to prevent crashes in unrealistic cases @@ -129,5 +304,3 @@ class Split(Module): keyboard.secondary_matrix_update = bytearray(self._uart_buffer.pop(0)) return - - return None diff --git a/user_keymaps/kdb424/corne.py b/user_keymaps/kdb424/corne.py index 023053e..185bc26 100644 --- a/user_keymaps/kdb424/corne.py +++ b/user_keymaps/kdb424/corne.py @@ -6,33 +6,27 @@ import displayio import terminalio from adafruit_display_text import label from kb import KMKKeyboard -from kmk.extensions.ble_split import BLE_Split -from kmk.extensions.power import Power from kmk.extensions.rgb import RGB -from kmk.handlers.sequences import send_string, simple_key_sequence from kmk.hid import HIDModes from kmk.keys import KC -from kmk_side import split_side +from kmk.modules.layers import Layers +from kmk.modules.power import Power +from kmk.modules.split import Split, SplitType keyboard = KMKKeyboard() -_______ = KC.TRNS -XXXXXXX = KC.NO - -LT1_SP = KC.MO(2) -LT2_SP = KC.LT(3, KC.SPC) -TAB_SB = KC.LT(5, KC.TAB) -SUPER_L = KC.LM(4, KC.LGUI) keyboard.tap_time = 320 keyboard.debug_enabled = False rgb_ext = RGB(pixel_pin=keyboard.rgb_pixel_pin, num_pixels=27, val_limit=100, hue_default=190, sat_default=100, val_default=5) -split = BLE_Split() +split = Split(split_type=SplitType.BLE) power = Power(powersave_pin=keyboard.powersave_pin) +layers = Layers() -keyboard.extensions = [split, rgb_ext, power] +keyboard.modules = [split, power, layers] +keyboard.extensions = [rgb_ext] enable_oled = False @@ -47,6 +41,14 @@ else: displayio.release_displays() keyboard.i2c_deinit_count += 1 +_______ = KC.TRNS +XXXXXXX = KC.NO + +LT1_SP = KC.MO(2) +LT2_SP = KC.LT(3, KC.SPC) +TAB_SB = KC.LT(5, KC.TAB) +SUPER_L = KC.LM(4, KC.LGUI) + keyboard.keymap = [ # DVORAK # ,-----------------------------------------. ,-----------------------------------------.