Split BLE background advertising
This commit is contained in:
parent
d47143463b
commit
3687212ed7
@ -112,3 +112,14 @@ def hid_switch(key, keyboard, *args, **kwargs):
|
|||||||
)
|
)
|
||||||
keyboard._init_hid()
|
keyboard._init_hid()
|
||||||
return keyboard
|
return keyboard
|
||||||
|
|
||||||
|
|
||||||
|
def ble_refresh(key, keyboard, *args, **kwargs):
|
||||||
|
from kmk.hid import HIDModes
|
||||||
|
|
||||||
|
if keyboard.hid_type != HIDModes.BLE:
|
||||||
|
return keyboard
|
||||||
|
|
||||||
|
keyboard._hid_helper.stop_advertising()
|
||||||
|
keyboard._hid_helper.start_advertising()
|
||||||
|
return keyboard
|
||||||
|
@ -165,6 +165,8 @@ class KeyAttrDict(AttrDict):
|
|||||||
)
|
)
|
||||||
elif key in ('HID_SWITCH', 'HID'):
|
elif key in ('HID_SWITCH', 'HID'):
|
||||||
make_key(names=('HID_SWITCH', 'HID'), on_press=handlers.hid_switch)
|
make_key(names=('HID_SWITCH', 'HID'), on_press=handlers.hid_switch)
|
||||||
|
elif key in ('BLE_REFRESH'):
|
||||||
|
make_key(names=('BLE_REFRESH',), on_press=handlers.ble_refresh)
|
||||||
else:
|
else:
|
||||||
maybe_key = first_truthy(
|
maybe_key = first_truthy(
|
||||||
key,
|
key,
|
||||||
|
@ -8,6 +8,7 @@ from storage import getmount
|
|||||||
from kmk.kmktime import check_deadline
|
from kmk.kmktime import check_deadline
|
||||||
from kmk.matrix import KeyEvent, intify_coordinate
|
from kmk.matrix import KeyEvent, intify_coordinate
|
||||||
from kmk.modules import Module
|
from kmk.modules import Module
|
||||||
|
from kmk.hid import HIDModes
|
||||||
|
|
||||||
|
|
||||||
class SplitSide:
|
class SplitSide:
|
||||||
@ -64,16 +65,17 @@ class Split(Module):
|
|||||||
)
|
)
|
||||||
from adafruit_ble.services.nordic import UARTService
|
from adafruit_ble.services.nordic import UARTService
|
||||||
|
|
||||||
|
self.BLERadio = BLERadio
|
||||||
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement
|
self.ProvideServicesAdvertisement = ProvideServicesAdvertisement
|
||||||
self.UARTService = UARTService
|
self.UARTService = UARTService
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print('BLE Import error')
|
print('BLE Import error')
|
||||||
return # BLE isn't supported on this platform
|
return # BLE isn't supported on this platform
|
||||||
self._ble = BLERadio()
|
|
||||||
self._ble_last_scan = ticks_ms() - 5000
|
self._ble_last_scan = ticks_ms() - 5000
|
||||||
self._connection_count = 0
|
self._connection_count = 0
|
||||||
|
self._split_connected = False
|
||||||
self._uart_connection = None
|
self._uart_connection = None
|
||||||
self._advertisment = None
|
self._advertisment = None # Seems to not be used anywhere
|
||||||
self._advertising = False
|
self._advertising = False
|
||||||
self._psave_enable = False
|
self._psave_enable = False
|
||||||
|
|
||||||
@ -86,6 +88,10 @@ class Split(Module):
|
|||||||
# Set up name for target side detection and BLE advertisment
|
# Set up name for target side detection and BLE advertisment
|
||||||
name = str(getmount('/').label)
|
name = str(getmount('/').label)
|
||||||
if self.split_type == SplitType.BLE:
|
if self.split_type == SplitType.BLE:
|
||||||
|
if keyboard.hid_type == HIDModes.BLE:
|
||||||
|
self._ble = keyboard._hid_helper.ble
|
||||||
|
else:
|
||||||
|
self._ble = self.BLERadio()
|
||||||
self._ble.name = name
|
self._ble.name = name
|
||||||
else:
|
else:
|
||||||
# Try to guess data pins if not supplied
|
# Try to guess data pins if not supplied
|
||||||
@ -211,12 +217,36 @@ class Split(Module):
|
|||||||
|
|
||||||
def _check_all_connections(self):
|
def _check_all_connections(self):
|
||||||
'''Validates the correct number of BLE connections'''
|
'''Validates the correct number of BLE connections'''
|
||||||
|
self._previous_connection_count = self._connection_count
|
||||||
self._connection_count = len(self._ble.connections)
|
self._connection_count = len(self._ble.connections)
|
||||||
if self._is_target and self._connection_count < 2:
|
print(self._connection_count)
|
||||||
self._target_advertise()
|
if self._is_target:
|
||||||
|
if self._advertising or not self._check_if_split_connected():
|
||||||
|
self._target_advertise()
|
||||||
elif not self._is_target and self._connection_count < 1:
|
elif not self._is_target and self._connection_count < 1:
|
||||||
self._initiator_scan()
|
self._initiator_scan()
|
||||||
|
|
||||||
|
def _check_if_split_connected(self):
|
||||||
|
# I'm looking for a way how to recognize which connection is on and which one off
|
||||||
|
# For now, I found that service name relation to having other CP device
|
||||||
|
if self._connection_count == 0:
|
||||||
|
return False
|
||||||
|
if self._connection_count == 2:
|
||||||
|
self._split_connected = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Polling this takes some time so I check only if connection_count changed
|
||||||
|
if self._previous_connection_count == self._connection_count:
|
||||||
|
return self._split_connected
|
||||||
|
|
||||||
|
bleio_connection = self._ble.connections[0]._bleio_connection
|
||||||
|
connection_services = bleio_connection.discover_remote_services()
|
||||||
|
for service in connection_services:
|
||||||
|
if str(service.uuid).startswith('UUID(\'adaf0001'):
|
||||||
|
self._split_connected = True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def _initiator_scan(self):
|
def _initiator_scan(self):
|
||||||
'''Scans for target device'''
|
'''Scans for target device'''
|
||||||
self._uart = None
|
self._uart = None
|
||||||
@ -252,6 +282,21 @@ class Split(Module):
|
|||||||
|
|
||||||
def _target_advertise(self):
|
def _target_advertise(self):
|
||||||
'''Advertises the target for the initiator to find'''
|
'''Advertises the target for the initiator to find'''
|
||||||
|
# Give previous advertising some time to complete
|
||||||
|
if self._advertising:
|
||||||
|
if self._check_if_split_connected():
|
||||||
|
if self._debug_enabled:
|
||||||
|
print('Advertising complete')
|
||||||
|
self._ble.stop_advertising()
|
||||||
|
self._advertising = False
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.ble_rescan_timer():
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._debug_enabled:
|
||||||
|
print('Advertising not answered')
|
||||||
|
|
||||||
self._ble.stop_advertising()
|
self._ble.stop_advertising()
|
||||||
if self._debug_enabled:
|
if self._debug_enabled:
|
||||||
print('Advertising')
|
print('Advertising')
|
||||||
@ -261,16 +306,8 @@ class Split(Module):
|
|||||||
advertisement = self.ProvideServicesAdvertisement(self._uart)
|
advertisement = self.ProvideServicesAdvertisement(self._uart)
|
||||||
|
|
||||||
self._ble.start_advertising(advertisement)
|
self._ble.start_advertising(advertisement)
|
||||||
|
self._advertising = True
|
||||||
self.ble_time_reset()
|
self.ble_time_reset()
|
||||||
while not self.ble_rescan_timer():
|
|
||||||
self._connection_count = len(self._ble.connections)
|
|
||||||
if self._connection_count > 1:
|
|
||||||
self.ble_time_reset()
|
|
||||||
if self._debug_enabled:
|
|
||||||
print('Advertising complete')
|
|
||||||
break
|
|
||||||
self._ble.stop_advertising()
|
|
||||||
|
|
||||||
def ble_rescan_timer(self):
|
def ble_rescan_timer(self):
|
||||||
'''If true, the rescan timer is up'''
|
'''If true, the rescan timer is up'''
|
||||||
|
Loading…
x
Reference in New Issue
Block a user