Compare commits

..

1 Commits

Author SHA1 Message Date
xs5871
ddaf9946eb
Add support for cirque glidepoint trackpad 2023-03-24 22:01:50 +00:00
13 changed files with 274 additions and 570 deletions

View File

@ -1,7 +1,7 @@
---
name: Bug report
about: Create a report to help us improve
title: "[BUG] Title"
title: "[BUG]"
labels: bug
assignees: ''
@ -16,7 +16,7 @@ Steps to reproduce the behavior:
* Setup and configuration of peripherals
* Input: keys pressed, ...
(Choose which are applicable.)
**Expected behavior**
A clear and concise description of what you expected to happen.

View File

@ -1,7 +1,7 @@
---
name: Feature request
about: Suggest an idea for this project
title: "[Enhancement] Title"
title: "[Enhancement]"
labels: enhancement
assignees: ''

View File

@ -188,7 +188,6 @@
| `KC.RESET` | Restarts the keyboard |
| `KC.RELOAD`, `KC.RLD` | Reloads the keyboard software, preserving any serial connections |
| `KC.DEBUG` | Toggle `debug_enabled`, which enables log spew to serial console |
| `KC.ANY` | Any key between `A and `/` |
| `KC.GESC` | Escape when tapped, <code>&#96;</code> when pressed with Shift or GUI |
| `KC.BKDL` | Backspace when tapped, Delete when pressed with GUI |
| `KC.UC_MODE_NOOP` | Sets UnicodeMode to NOOP |

View File

@ -1,272 +0,0 @@
import busio
from supervisor import ticks_ms
import adafruit_displayio_ssd1306
import displayio
import terminalio
from adafruit_display_text import label
from kmk.extensions import Extension
from kmk.handlers.stock import passthrough as handler_passthrough
from kmk.keys import make_key
from kmk.kmktime import PeriodicTimer, ticks_diff
from kmk.modules.split import Split, SplitSide
from kmk.utils import clamp
displayio.release_displays()
class TextEntry:
def __init__(
self,
text='',
x=0,
y=0,
x_anchor='L',
y_anchor='T',
direction='LTR',
line_spacing=0.75,
inverted=False,
layer=None,
side=None,
):
self.text = text
self.direction = direction
self.line_spacing = line_spacing
self.inverted = inverted
self.layer = layer
self.color = 0xFFFFFF
self.background_color = 0x000000
self.x_anchor = 0.0
self.y_anchor = 0.0
if x_anchor == 'L':
self.x_anchor = 0.0
x = x + 1
if x_anchor == 'M':
self.x_anchor = 0.5
if x_anchor == 'R':
self.x_anchor = 1.0
if y_anchor == 'T':
self.y_anchor = 0.0
if y_anchor == 'M':
self.y_anchor = 0.5
if y_anchor == 'B':
self.y_anchor = 1.0
self.anchor_point = (self.x_anchor, self.y_anchor)
self.anchored_position = (x, y)
if inverted:
self.color = 0x000000
self.background_color = 0xFFFFFF
self.side = side
if side == 'L':
self.side = SplitSide.LEFT
if side == 'R':
self.side = SplitSide.RIGHT
class ImageEntry:
def __init__(self, x=0, y=0, image='', layer=None, side=None):
self.x = x
self.y = y
self.image = displayio.OnDiskBitmap(image)
self.layer = layer
self.side = side
if side == 'L':
self.side = SplitSide.LEFT
if side == 'R':
self.side = SplitSide.RIGHT
class Oled(Extension):
def __init__(
self,
i2c=None,
sda=None,
scl=None,
device_address=0x3C,
entries=[],
width=128,
height=32,
flip: bool = False,
flip_left: bool = False,
flip_right: bool = False,
brightness=0.8,
brightness_step=0.1,
dim_time=20,
dim_target=0.1,
off_time=60,
powersave_dim_time=10,
powersave_dim_target=0.1,
powersave_off_time=30,
):
self.device_address = device_address
self.flip = flip
self.flip_left = flip_left
self.flip_right = flip_right
self.entries = entries
self.width = width
self.height = height
self.prev_layer = None
self.brightness = brightness
self.brightness_step = brightness_step
self.timer_start = ticks_ms()
self.powersave = False
self.dim_time_ms = dim_time * 1000
self.dim_target = dim_target
self.off_time_ms = off_time * 1000
self.powersavedim_time_ms = powersave_dim_time * 1000
self.powersave_dim_target = powersave_dim_target
self.powersave_off_time_ms = powersave_off_time * 1000
self.dim_period = PeriodicTimer(50)
self.split_side = None
# i2c initialization
self.i2c = i2c
if self.i2c is None:
self.i2c = busio.I2C(scl, sda)
make_key(
names=('OLED_BRI',),
on_press=self.oled_brightness_increase,
on_release=handler_passthrough,
)
make_key(
names=('OLED_BRD',),
on_press=self.oled_brightness_decrease,
on_release=handler_passthrough,
)
def render(self, layer):
splash = displayio.Group()
for entry in self.entries:
if entry.layer != layer and entry.layer is not None:
continue
if isinstance(entry, TextEntry):
splash.append(
label.Label(
terminalio.FONT,
text=entry.text,
color=entry.color,
background_color=entry.background_color,
anchor_point=entry.anchor_point,
anchored_position=entry.anchored_position,
label_direction=entry.direction,
line_spacing=entry.line_spacing,
padding_left=1,
)
)
elif isinstance(entry, ImageEntry):
splash.append(
displayio.TileGrid(
entry.image,
pixel_shader=entry.image.pixel_shader,
x=entry.x,
y=entry.y,
)
)
self.display.show(splash)
def on_runtime_enable(self, sandbox):
return
def on_runtime_disable(self, sandbox):
return
def during_bootup(self, keyboard):
for module in keyboard.modules:
if isinstance(module, Split):
self.split_side = module.split_side
if self.split_side == SplitSide.LEFT:
self.flip = self.flip_left
elif self.split_side == SplitSide.RIGHT:
self.flip = self.flip_right
for idx, entry in enumerate(self.entries):
if entry.side != self.split_side and entry.side is not None:
del self.entries[idx]
self.display = adafruit_displayio_ssd1306.SSD1306(
displayio.I2CDisplay(self.i2c, device_address=self.device_address),
width=self.width,
height=self.height,
rotation=180 if self.flip else 0,
brightness=self.brightness,
)
def before_matrix_scan(self, sandbox):
if self.dim_period.tick():
self.dim()
if sandbox.active_layers[0] != self.prev_layer:
self.prev_layer = sandbox.active_layers[0]
self.render(sandbox.active_layers[0])
def after_matrix_scan(self, sandbox):
if sandbox.matrix_update or sandbox.secondary_matrix_update:
self.timer_start = ticks_ms()
def before_hid_send(self, sandbox):
return
def after_hid_send(self, sandbox):
return
def on_powersave_enable(self, sandbox):
self.powersave = True
def on_powersave_disable(self, sandbox):
self.powersave = False
def deinit(self, sandbox):
displayio.release_displays()
self.i2c.deinit()
def oled_brightness_increase(self):
self.display.brightness = clamp(
self.display.brightness + self.brightness_step, 0, 1
)
self.brightness = self.display.brightness # Save current brightness
def oled_brightness_decrease(self):
self.display.brightness = clamp(
self.display.brightness - self.brightness_step, 0, 1
)
self.brightness = self.display.brightness # Save current brightness
def dim(self):
if self.powersave:
if (
self.powersave_off_time_ms
and ticks_diff(ticks_ms(), self.timer_start)
> self.powersave_off_time_ms
):
self.display.sleep()
elif (
self.powersave_dim_time_ms
and ticks_diff(ticks_ms(), self.timer_start)
> self.powersave_dim_time_ms
):
self.display.brightness = self.powersave_dim_target
else:
self.display.brightness = self.brightness
self.display.wake()
elif (
self.off_time_ms
and ticks_diff(ticks_ms(), self.timer_start) > self.off_time_ms
):
self.display.sleep()
elif (
self.dim_time_ms
and ticks_diff(ticks_ms(), self.timer_start) > self.dim_time_ms
):
self.display.brightness = self.dim_target
else:
self.display.brightness = self.brightness
self.display.wake()

View File

@ -4,7 +4,7 @@ from math import e, exp, pi, sin
from kmk.extensions import Extension
from kmk.handlers.stock import passthrough as handler_passthrough
from kmk.keys import make_key
from kmk.scheduler import create_task
from kmk.kmktime import PeriodicTimer
from kmk.utils import Debug, clamp
debug = Debug(__name__)
@ -229,7 +229,7 @@ class RGB(Extension):
for n, pixels in enumerate(self.pixels):
debug(f'pixels[{n}] = {pixels.__class__}[{len(pixels)}]')
self._task = create_task(self.animate, period_ms=(1000 // self.refresh_rate))
self._timer = PeriodicTimer(1000 // self.refresh_rate)
def before_matrix_scan(self, sandbox):
return
@ -241,7 +241,7 @@ class RGB(Extension):
return
def after_hid_send(self, sandbox):
pass
self.animate()
def on_powersave_enable(self, sandbox):
return
@ -435,7 +435,7 @@ class RGB(Extension):
if self.animation_mode is AnimationModes.STATIC_STANDBY:
return
if self.enable:
if self.enable and self._timer.tick():
self._animation_step()
if self.animation_mode == AnimationModes.BREATHING:
self.effect_breathing()

View File

@ -137,10 +137,3 @@ def ble_disconnect(key, keyboard, *args, **kwargs):
keyboard._hid_helper.clear_bonds()
return keyboard
def any_pressed(key, keyboard, *args, **kwargs):
from random import randint
key.code = randint(4, 56)
default_pressed(key, keyboard, *args, **kwargs)

View File

@ -370,7 +370,6 @@ def maybe_make_firmware_key(candidate: str) -> Optional[Key]:
((('HID_SWITCH', 'HID'), handlers.hid_switch)),
((('RELOAD', 'RLD'), handlers.reload)),
((('RESET',), handlers.reset)),
((('ANY',), handlers.any_pressed)),
)
for names, handler in keys:

View File

@ -1,17 +1,19 @@
try:
from typing import Callable, Optional
from typing import Callable, Optional, Tuple
except ImportError:
pass
from supervisor import ticks_ms
from collections import namedtuple
from keypad import Event as KeyEvent
from kmk.consts import UnicodeMode
from kmk.hid import BLEHID, USBHID, AbstractHID, HIDModes
from kmk.keys import KC, Key
from kmk.kmktime import ticks_add, ticks_diff
from kmk.modules import Module
from kmk.scanners.keypad import MatrixScanner
from kmk.scheduler import Task, cancel_task, create_task, get_due_task
from kmk.utils import Debug
debug = Debug('kmk.keyboard')
@ -264,17 +266,60 @@ class KMKKeyboard:
def tap_key(self, keycode: Key) -> None:
self.add_key(keycode)
# On the next cycle, we'll remove the key.
self.set_timeout(0, lambda: self.remove_key(keycode))
self.set_timeout(False, lambda: self.remove_key(keycode))
def set_timeout(self, after_ticks: int, callback: Callable[[None], None]) -> [Task]:
return create_task(callback, after_ms=after_ticks)
def set_timeout(
self, after_ticks: int, callback: Callable[[None], None]
) -> Tuple[int, int]:
# We allow passing False as an implicit "run this on the next process timeouts cycle"
if after_ticks is False:
after_ticks = 0
if after_ticks == 0 and self._processing_timeouts:
after_ticks += 1
timeout_key = ticks_add(ticks_ms(), after_ticks)
if timeout_key not in self._timeouts:
self._timeouts[timeout_key] = []
idx = len(self._timeouts[timeout_key])
self._timeouts[timeout_key].append(callback)
return (timeout_key, idx)
def cancel_timeout(self, timeout_key: int) -> None:
cancel_task(timeout_key)
try:
self._timeouts[timeout_key[0]][timeout_key[1]] = None
except (KeyError, IndexError):
if debug.enabled:
debug(f'no such timeout: {timeout_key}')
def _process_timeouts(self) -> None:
for task in get_due_task():
task()
if not self._timeouts:
return
# Copy timeout keys to a temporary list to allow sorting.
# Prevent net timeouts set during handling from running on the current
# cycle by setting a flag `_processing_timeouts`.
current_time = ticks_ms()
timeout_keys = []
self._processing_timeouts = True
for k in self._timeouts.keys():
if ticks_diff(k, current_time) <= 0:
timeout_keys.append(k)
if timeout_keys and debug.enabled:
debug('processing timeouts')
for k in sorted(timeout_keys):
for callback in self._timeouts[k]:
if callback:
callback()
del self._timeouts[k]
self._processing_timeouts = False
def _init_sanity_check(self) -> None:
'''

212
kmk/modules/glidepoint.py Normal file
View File

@ -0,0 +1,212 @@
try:
from typing import Callable
except ImportError:
pass
from micropython import const
from kmk.keys import AX
from kmk.modules import Module
from kmk.utils import Debug
debug = Debug(__name__)
_ADDRESS = const(0x2A)
_MASK_READ = const(0xA0)
_MASK_WRITE = const(0x80)
_REG_FW_ID = const(0x00)
_REG_FW_VER = const(0x01)
_REG_STATUS = const(0x02)
_REG_SYS_CFG = const(0x03)
_REG_FEED_CFG1 = const(0x04)
_REG_FEED_CFG2 = const(0x05)
_REG_CAL_CFG = const(0x07)
_REG_AUX_CTL = const(0x08)
_REG_SAMPLE_RATE = const(0x09)
_REG_ZIDLE = const(0x0A)
_REG_Z_SCALER = const(0x0B)
_REG_SLEEP_INTERVAL = const(0x0C)
_REG_SLEEP_TIMER = const(0x0D)
_REG_DATA = const(0x12)
_FEED1_ENABLE = const(0x01)
_FEED1_ABSOLUTE = const(0x02)
_FEED1_NO_FILTER = const(0x04)
_FEED1_NO_X_DATA = const(0x08)
_FEED1_NO_Y_DATA = const(0x10)
_FEED1_INVERT_X = const(0x40)
_FEED1_INVERT_Y = const(0x80)
_FEED2_INTELLIMOUSE = const(0x01)
_FEED2_NO_TAPS = const(0x02)
_FEED2_NO_SEC_TAPS = const(0x04)
_FEED2_NO_SCROLL = const(0x08)
_FEED2_NO_GLIDEEXTEND = const(0x10)
_FEED2_SWAP_XY = const(0x80)
def with_i2c_lock(rw: Callable[[object, int], None]) -> Callable[[object, int], None]:
def _(self, n: int) -> None:
if not self._i2c.try_lock():
if debug.enabled:
debug("can't acquire lock")
return
try:
rw(self, n)
finally:
self._i2c.unlock()
return _
class AbsoluteHandler:
cfg = (
_REG_FEED_CFG2,
0x00,
_REG_FEED_CFG1,
_FEED1_ENABLE | _FEED1_NO_FILTER | _FEED1_ABSOLUTE | _FEED1_INVERT_X,
)
def handle(buffer: bytearray, keyboard) -> None:
button = buffer[0]
x_low = buffer[2]
y_low = buffer[3]
high = buffer[4]
z_lvl = buffer[5]
x_pos = ((high & 0x0F) << 8) | x_low
y_pos = ((high & 0xF0) << 4) | y_low
if debug.enabled:
debug(
'buttons:',
bin(button),
' x_pos:',
x_pos,
' y_pos:',
y_pos,
'z_lvl:',
z_lvl,
)
class RelativeHandler:
cfg = (
_REG_FEED_CFG2,
0x00,
_REG_FEED_CFG1,
_FEED1_ENABLE | _FEED1_NO_FILTER | _FEED1_INVERT_X,
)
def handle(buffer: bytearray, keyboard) -> None:
button = buffer[0] & 0b00000111
x_sign = buffer[0] & 0b00010000
y_sign = buffer[0] & 0b00100000
x_delta = buffer[1]
y_delta = buffer[2]
w_delta = buffer[3]
if x_sign:
x_delta -= 0xFF
if y_sign:
y_delta -= 0xFF
if x_delta != 0:
AX.X.move(keyboard, x_delta)
if y_delta != 0:
AX.Y.move(keyboard, y_delta)
if debug.enabled:
debug(
'buttons:',
bin(button),
' x_delta:',
x_delta,
' y_delta:',
y_delta,
' w_delta',
w_delta,
)
class GlidePoint(Module):
def __init__(self, i2c):
self._i2c = i2c
self._buffer = bytearray(8)
# self.handler = RelativeHandler
self.handler = AbsoluteHandler
@with_i2c_lock
def _read(self, n: int) -> None:
self._buffer[0] |= _MASK_READ
self._i2c.writeto_then_readfrom(
_ADDRESS, self._buffer, self._buffer, out_end=1, in_end=n
)
@with_i2c_lock
def _write(self, n: int) -> None:
for i in range(0, n, 2):
self._buffer[i] |= _MASK_WRITE
self._i2c.writeto(_ADDRESS, self._buffer, end=n)
def _check_firmware(self) -> bool:
self._buffer[0] = _REG_FW_ID
self._read(2)
return self._buffer[:2] == b'\x07:'
def _clear_status_flags(self) -> None:
self._buffer[0] = _REG_STATUS
self._buffer[1] = 0x00
self._write(2)
def _configure(self) -> None:
self._buffer[0] = _REG_SYS_CFG
self._buffer[1] = 0x00
for idx, val in enumerate(self.handler.cfg):
self._buffer[idx + 2] = val
self._write(2 + len(self.handler.cfg))
def _data_ready(self) -> bool:
self._buffer[0] = _REG_STATUS
self._read(1)
return self._buffer[0] != 0x00
def during_bootup(self, keyboard):
if not self._check_firmware:
raise OSError('Firmware ID mismatch')
self._clear_status_flags()
self._configure()
def before_matrix_scan(self, keyboard):
pass
def after_matrix_scan(self, keyboard):
if not self._data_ready():
return
self._buffer[0] = _REG_DATA
self._read(6)
self.handler.handle(self._buffer, keyboard)
self._clear_status_flags()
def before_hid_send(self, keyboard):
pass
def after_hid_send(self, keyboard):
pass
def on_powersave_enable(self, keyboard):
self._buffer[0] = _REG_SYS_CFG
self._buffer[1] = 0x04
self._write(2)
def on_powersave_disable(self, keyboard):
self._buffer[0] = _REG_SYS_CFG
self._buffer[1] = 0x00
self._write(2)

View File

@ -1,67 +0,0 @@
'''
Here we're abusing _asyncios TaskQueue to implement a very simple priority
queue task scheduler.
Despite documentation, Circuitpython doesn't usually ship with a min-heap
module; it does however implement a pairing-heap for `TaskQueue` in native code.
'''
try:
from typing import Callable
except ImportError:
pass
from supervisor import ticks_ms
from _asyncio import Task, TaskQueue
from kmk.kmktime import ticks_add, ticks_diff
_task_queue = TaskQueue()
class PeriodicTaskMeta:
def __init__(self, func: Callable[[None], None], period: int) -> None:
self._task = Task(self.call)
self._coro = func
self.period = period
def call(self) -> None:
self._coro()
after_ms = ticks_add(self._task.ph_key, self.period)
_task_queue.push_sorted(self._task, after_ms)
def create_task(
func: Callable[[None], None],
*,
after_ms: int = 0,
period_ms: int = 0,
) -> [Task, PeriodicTaskMeta]:
if period_ms:
r = PeriodicTaskMeta(func, period_ms)
t = r._task
else:
t = r = Task(func)
if after_ms:
after_ms = ticks_add(ticks_ms(), after_ms)
_task_queue.push_sorted(t, after_ms)
else:
_task_queue.push_head(t)
return r
def get_due_task() -> [Callable, None]:
while True:
t = _task_queue.peek()
if not t or ticks_diff(t.ph_key, ticks_ms()) > 0:
break
_task_queue.pop_head()
yield t.coro
def cancel_task(t: [Task, PeriodicTaskMeta]) -> None:
if isinstance(t, PeriodicTaskMeta):
t = t._task
_task_queue.remove(t)

View File

@ -6,7 +6,6 @@ from kmk.keys import KC, ModifierKey
from kmk.kmk_keyboard import KMKKeyboard
from kmk.scanners import DiodeOrientation
from kmk.scanners.digitalio import MatrixScanner
from kmk.scheduler import _task_queue
class DigitalInOut(Mock):
@ -82,7 +81,7 @@ class KeyboardTest:
timeout = time.time_ns() + 10 * 1_000_000_000
while timeout > time.time_ns():
self.do_main_loop()
if not _task_queue.peek() and not self.keyboard._resume_buffer:
if not self.keyboard._timeouts and not self.keyboard._resume_buffer:
break
assert timeout > time.time_ns(), 'infinite loop detected'

View File

@ -9,10 +9,6 @@ class KeyEvent:
self.pressed = pressed
def ticks_ms():
return (time.time_ns() // 1_000_000) % (1 << 29)
def init_circuit_python_modules_mocks():
sys.modules['usb_hid'] = Mock()
sys.modules['digitalio'] = Mock()
@ -30,8 +26,4 @@ def init_circuit_python_modules_mocks():
sys.modules['micropython'].const = lambda x: x
sys.modules['supervisor'] = Mock()
sys.modules['supervisor'].ticks_ms = ticks_ms
from . import task
sys.modules['_asyncio'] = task
sys.modules['supervisor'].ticks_ms = lambda: time.time_ns() // 1_000_000

View File

@ -1,196 +0,0 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
# This file is a modified version, based on the extmod in Circuitpython, for
# unit testing in KMK only.
from supervisor import ticks_ms
from kmk.kmktime import ticks_diff
cur_task = None
__task_queue = None
class CancelledError(BaseException):
pass
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
if parent is None or parent.ph_child is None:
return heap
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
if not n:
return heap
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push_sorted(self, v, key):
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
self.heap = ph_meld(v, self.heap)
def push_head(self, v):
self.push_sorted(v, ticks_ms())
def pop_head(self):
v = self.heap
self.heap = ph_pairing(v.ph_child)
# v.ph_child = None
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __await__(self):
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
return self
def __next__(self):
if not self.state:
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push_head(cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
cur_task.data = self
def done(self):
return not self.state
def cancel(self):
# Check if task is already finished.
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, 'remove'):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
__task_queue.push_head(self)
elif ticks_diff(self.ph_key, ticks_ms()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
__task_queue.remove(self)
__task_queue.push_head(self)
self.data = CancelledError
return True