Added explicit typing

This commit is contained in:
Pascal Serrarens 2025-04-30 11:10:27 +02:00
parent f8c6037538
commit 97eafeccfc
13 changed files with 244 additions and 251 deletions

View File

@ -1,30 +1,31 @@
#from Thing import Thing
from Thing import Thing
from Messages import IMessage
class BinaryMsg():
from typing import Optional, Union
class BinaryMsg(IMessage):
id = 0xB1
length = 4
def __init__(self, data, thing = None):
if isinstance(data, bytes):
def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None):
if thing is None:
data = bytes(arg1)
self.thing_id = data[2]
self.data_length = data[3]
self.data = data[4:]
else:
self.network_id = data
self.network_id = int(arg1)
self.thing_id = thing.id
self.thing = thing
def Serialize(self, buffer_ref):
if self.thing_id is None:
return 0
def Serialize(self, buffer_ref: list[bytearray]):
buffer: bytearray = buffer_ref[0]
ix = self.length
self.data_length = self.thing.GenerateBinary(buffer, {ix})
if ix <= self.length:
return 0
print(f'Send BinaryMsg [{self.network_id}/{self.thing_id}] {self.thing_type} {self.parent_id}')
print(f'Send BinaryMsg [{self.network_id}/{self.thing_id}] {self.data_length}')
buffer[0] = self.id
buffer[1] = self.network_id

View File

@ -1,4 +1,5 @@
#import Messages.LowLevelMessages as LowLevelMessages
# from ParticipantUDP import ParticipantUDP
class IMessage:
id = 0x00
@ -6,84 +7,17 @@ class IMessage:
## Serialize the message into the given buffer
#
## @returns: the length of the message
def Serialize(buffer):
def Serialize(self, buffer_ref: list[bytearray]) -> int:
return 0
def SendTo(self, participant):
buffer_size = self.Serialize([participant.buffer])
if buffer_size == 0:
return False
return participant.SendBuffer(buffer_size)
# def SendTo(self, participant: ParticipantUDP) -> bool:
# buffer_size = self.Serialize([participant.buffer])
# if buffer_size == 0:
# return False
# return participant.SendBuffer(buffer_size)
def Publish(self, participant):
bufferSize = self.Serialize([participant.buffer])
if bufferSize == 0:
return False
return participant.PublishBuffer(bufferSize)
# class InvestigateMsg():
# id = 0x81
# length = 3
# def __init__(self, buffer):
# self.thing_id = buffer[2]
# class PoseMsg(IMessage):
# """! Message to communicate the pose of the thing
# The pose is in local space relative to the parent.
# If there is not parent (the thing is a root thing), the pose will be in world space.
# """
# ## The message ID
# id = 0x10
# ## The length of the message
# length = 4
# def __init__(self, thing):
# self.thing = thing
# def Serialize(self, buffer_ref):
# if self.thing is None:
# return 0
# buffer: bytearray = buffer_ref[0]
# buffer[0:PoseMsg.length] = [
# PoseMsg.id,
# 0, # Network id
# self.thing.id,
# self.thing.pose_updated
# ]
# ix = [4]
# if self.thing.pose_updated & Thing.Position:
# LowLevelMessages.SendSpherical(buffer, ix, self.thing.position)
# if self.thing.pose_updated & Thing.Orientation:
# LowLevelMessages.SendQuat32(buffer, ix, self.thing.orientation)
# if self.thing.pose_updated & Thing.LinearVelocity:
# LowLevelMessages.SendSpherical(buffer, ix, self.thing.linearVelocity)
# if self.thing.pose_updated & Thing.AngularVelocity:
# LowLevelMessages.SendSpherical(buffer, ix, self.thing.angularVelocity)
# return ix[0]
# class BinaryMsg():
# id = 0xB1
# def __init__(self, buffer):
# self.thing_id = buffer[2]
# self.thing = Thing.Get(self.thing_id)
# self.data = buffer[3:]
# def SendTo(participant, thing, data: bytearray):
# length = 3
# if thing is None or data is None:
# return False
# participant.buffer[0:length] = [
# BinaryMsg.id,
# 0, # network_id,
# thing.id
# ]
# full_length = length + len(data)
# participant.buffer[length:full_length] = data
# participant.SendBuffer(full_length)
# return True
# def Publish(self, participant: Participant) -> bool:
# bufferSize = self.Serialize([participant.buffer])
# if bufferSize == 0:
# return False
# return participant.PublishBuffer(bufferSize)

View File

@ -1,25 +1,25 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from typing import Union, Optional
class ModelUrlMsg(IMessage):
id = 0x90
length = 4
thing_id = None
url = None
def __init__(self, data, thing = None):
if isinstance(data, bytes):
def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None):
if thing is None:
data = bytes(arg1)
self.thing_id = data[2]
# model url length is not needed here
self.url = data[ModelUrlMsg.length:].decode("utf-8")
else:
if thing is not None:
self.network_id = data
self.thing_id = thing.id
self.url = thing.model_url
self.network_id: int = int(arg1)
self.thing_id: int = thing.id
self.url: Optional[str] = thing.model_url
def Serialize(self, buffer_ref):
if self.thing_id is None or self.url is None:
def Serialize(self, buffer_ref: list[bytearray]):
if self.url is None:
return 0
buffer: bytearray = buffer_ref[0]

View File

@ -1,25 +1,25 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from typing import Union, Optional
class NameMsg(IMessage):
id = 0x91
length = 4
thing_id = None
name = None
def __init__(self, data, thing = None):
if isinstance(data, bytes):
def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None):
if thing is None:
data = bytes(arg1)
self.thing_id = data[2]
# name_length is not needed here
self.name = data[NameMsg.length:].decode("utf-8")
else:
if thing is not None:
self.network_id = data
self.thing_id = thing.id
self.name = thing.name
self.network_id: int = int(arg1)
self.thing_id: int = thing.id
self.name: Optional[str] = thing.name
def Serialize(self, buffer_ref):
if self.thing_id is None or self.name is None:
def Serialize(self, buffer_ref: list[bytearray]):
if self.name is None:
return 0
buffer: bytearray = buffer_ref[0]

View File

@ -1,27 +1,31 @@
from RoboidControl.Messages.Messages import IMessage
from typing import Union
## A network id message invites another participant to a site
#
## This can be sent in response to a ClientMsg
class NetworkIdMsg(IMessage):
id = 0xA1
length = 2
id: int = 0xA1
length: int = 2
## Create a network id message
def __init__(self, arg1 = None):
def __init__(self, arg1: Union[bytes, int]):
self.network_id: int = 0
if isinstance(arg1, bytes):
buffer = arg1
buffer = bytearray(arg1)
self.network_id = buffer[1]
else:
self.network_id = arg1
self.network_id = int(arg1)
## Serialize the message into the given buffer
#
## @param buffer_ref A reference to the buffer to use. This should be a list with the buffer as its first and only element
## @returns the length of the message
def Serialize(self, buffer_ref):
def Serialize(self, buffer_ref: list[bytearray]) -> int:
buffer: bytearray = buffer_ref[0]
buffer[0:NetworkIdMsg.length] = [
last:int = NetworkIdMsg.length
buffer[0:last] = [
NetworkIdMsg.id,
self.network_id
]

View File

@ -1,7 +1,7 @@
import RoboidControl.Messages.LowLevelMessages as LowLevelMessages
#from Thing import Thing
class PoseMsg():
class PoseMsg(IMessage):
id = 0x10
length = 4

View File

@ -1,34 +1,30 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from typing import Optional, Union
class ThingMsg(IMessage):
id = 0x80
length = 5
thing_id = None
thing_type = None
parent_id = None
def __init__(self, arg1, thing=None):
if isinstance(arg1, bytes):
buffer = arg1
self.network_id = buffer[1]
self.thing_id = buffer[2]
self.thing_type = buffer[3]
self.parent_id = buffer[4]
def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing]=None):
if thing is None:
buffer = bytes(arg1)
self.network_id: int = buffer[1]
self.thing_id: int = buffer[2]
self.thing_type: int = buffer[3]
self.parent_id: int = buffer[4]
else:
if thing is not None:
self.network_id = arg1
self.thing_id = thing.id
self.thing_type = thing.type
if thing.parent is not None:
self.parent_id = thing.parent.id
else:
self.parent_id = 0
self.network_id = int(arg1)
self.thing_id = thing.id
self.thing_type = thing.type
if thing.parent is not None:
self.parent_id = thing.parent.id
else:
self.parent_id = 0
def Serialize(self, buffer_ref):
if self.thing_id is None:
return 0
def Serialize(self, buffer_ref: list[bytearray]):
print(f'Send ThingMsg [{self.network_id}/{self.thing_id}] {self.thing_type} {self.parent_id}')
buffer: bytearray = buffer_ref[0]

View File

@ -1,3 +1,7 @@
from Thing import Thing
from typing import Optional
class Participant:
"""! A participant is a device which manages things.
@ -6,35 +10,35 @@ class Participant:
It also maintains the communcation information to contact the participant.
It is used as a basis for the local participant, but also as a reference to remote participants.
"""
def __init__(self, ip_address, port):
def __init__(self, ip_address: str, port: int) -> None:
"""! Create a new participant with the given communcation info
@param ip_address The IP address of the participant
@param port The UDP port of the participant
"""
## The Ip Address of a participant. When the participant is local, this contains 0.0.0.0
self.ip_address = ip_address
self.ip_address: str = ip_address
## The port number for UDP communication with the participant. This is 0 for isolated participants.
self.port = port
self.port: int = port
## he network Id to identify the participant.
self.network_id =0
self.network_id: int = 0
## The things managed by this participant
self.things = set()
self.things: set[Thing] = set()
def Get(self, thing_id):
def Get(self, thing_id: int) -> Optional[Thing]:
"""! Get the thing with the given properties
@param thing_id The ID of the thing
@return The thing if found, None in other cases
"""
for thing in self.things:
if thing is not None and thing.id == thing_id:
if thing.id == thing_id:
return thing
return None
def Add(self, thing, check_id = True):
def Add(self, thing: Thing, check_id: bool = True):
"""! Add a new thing for this participant.
@param thing The thing to add
@param check_id If true, the thing.id is regenerated if it is zero
@ -47,16 +51,15 @@ class Participant:
if found_thing == None:
self.things.add(thing)
def Remove(self, thing):
def Remove(self, thing: Thing) -> None:
"""! Remove a thing for this participant
@param thing The thing to remove
"""
self.things.remove(thing)
def Update(self, currentTimeMs=0):
def Update(self, currentTimeMs: int = 0) -> None:
"""! Update all things for this participant
@param The current time in milliseconds (optional)
"""
for thing in list(self.things):
if thing is not None:
thing.Update(currentTimeMs)
thing.Update(currentTimeMs)

View File

@ -1,7 +1,3 @@
import socket
import threading
import time
from RoboidControl.Participant import Participant
from RoboidControl.Thing import Thing
from RoboidControl.Messages.ParticipantMsg import ParticipantMsg
@ -9,13 +5,19 @@ from RoboidControl.Messages.NetworkIdMsg import NetworkIdMsg
from RoboidControl.Messages.ThingMsg import ThingMsg
from RoboidControl.Messages.NameMsg import NameMsg
from RoboidControl.Messages.ModelUrlMsg import ModelUrlMsg
from RoboidControl.Messages.Messages import IMessage
from RoboidControl.Messages import *
import sys
micropython = 'micropython' in sys.modules
import socket
import threading
import time
from typing import Optional
if micropython:
from MicroPython.uPythonParticipant import Bla
# import sys
# micropython = 'micropython' in sys.modules
# if micropython:
# from MicroPython.uPythonParticipant import uPythonParticipant
class ParticipantUDP(Participant):
"""! A local participant is the local device which can communicate with other participants.
@ -26,14 +28,18 @@ class ParticipantUDP(Participant):
Currently, only UDP communication is supported
"""
buffer = None
#buffer = None
nextPublishMe = 0
others = None
thread = None
#others = None
#thread = None
name = "Participant"
isolated_participant = None
def __init__(self, port=7681, ip_address=None, local_port=7681):
def __init__(self,
port: int = 7681,
ip_address: Optional[str] = None,
local_port: int = 7681
) -> None:
super().__init__(ip_address = "127.0.0.1", port = local_port)
# if local_port == 0:
@ -42,11 +48,11 @@ class ParticipantUDP(Participant):
## True if the participant is running isolated.
# Isolated participants do not communicate with other participants
self.is_isolated = True
self.remote_site = None
self.is_isolated: bool = True
self.remote_site: Optional[Participant] = None
## The other participants communicating with this participant
self.others = []
self.others: list[Participant] = []
if port != 0:
self.is_isolated = False
@ -54,7 +60,7 @@ class ParticipantUDP(Participant):
self.remote_site = Participant(ip_address, port)
self.others.append(self.remote_site)
self.buffer = bytearray(256)
self.buffer: bytearray = bytearray(256)
self.publishInterval = 3000 # 3 seconds
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -65,14 +71,15 @@ class ParticipantUDP(Participant):
self.thread.daemon = True
self.thread.start()
def Isolated():
@staticmethod
def Isolated() -> 'ParticipantUDP':
if ParticipantUDP.isolated_participant == None:
ParticipantUDP.isolated_participant = ParticipantUDP(0)
return ParticipantUDP.isolated_participant
#region Update
def GetParticipant(self, ip_address, port):
def GetParticipant(self, ip_address: str, port: int):
# print(f'{self.name} Get participant {ip_address} {port}')
# for item in self.others:
# print(f'- {item.ip_address} {item.port}')
@ -82,15 +89,15 @@ class ParticipantUDP(Participant):
participant = next(found_participants, None)
return participant
def AddParticipant(self, ip_address, port):
def AddParticipant(self, ip_address: str, port: int):
print(f'{self.name} Add participant {ip_address} {port}')
remote_participant = Participant(ip_address = ip_address, port = port)
self.others.append(remote_participant)
return remote_participant
def Update(self, currentTimeMs = None):
def Update(self, currentTimeMs: Optional[int] = None):
if currentTimeMs is None:
currentTimeMs = time.time() * 1000
currentTimeMs = int(time.time() * 1000)
if self.is_isolated == False:
if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe:
@ -104,16 +111,15 @@ class ParticipantUDP(Participant):
self.UpdateMyThings(currentTimeMs)
def UpdateMyThings(self, currentTimeMs):
def UpdateMyThings(self, currentTimeMs: int):
for thing in self.things:
if thing is None:
continue
# if thing.hierarchyChanged and not (self.isIsolated or self.network_id == ):
# thingMsg = ThingMsg(self.network_id, thing)
# self.Send(self.remote_site, thingMsg)
poseMsg = PoseMsg(thing.owner.network_id, thing)
self.Send(self.remote_site, poseMsg)
if thing.owner is not None:
poseMsg = PoseMsg(thing.owner.network_id, thing)
if self.remote_site is not None:
self.Send(self.remote_site, poseMsg)
thing.Update(currentTimeMs, False)
if not(self.is_isolated or self.network_id == 0):
@ -121,11 +127,12 @@ class ParticipantUDP(Participant):
# destroyMsg = DestroyMsg(self.network_id, thing)
# self.Send(self.remote_site, destroyMsg)
# else:
# Send to remote site
poseMsg = PoseMsg(thing.owner.network_id, thing)
self.Send(self.remote_site, poseMsg)
binaryMsg = BinaryMsg(thing.owner.network_id, thing)
self.Send(self.remote_site, binaryMsg)
if self.remote_site is not None and thing.owner is not None:
# Send to remote site
poseMsg = PoseMsg(thing.owner.network_id, thing)
self.Send(self.remote_site, poseMsg)
binaryMsg = BinaryMsg(thing.owner.network_id, thing)
self.Send(self.remote_site, binaryMsg)
# if thing.terminate:
# self.Remove(thing)
@ -133,7 +140,7 @@ class ParticipantUDP(Participant):
#region Send
def SendThingInfo(self, owner, thing, recursively: bool = False):
def SendThingInfo(self, owner: Participant, thing: 'Thing', recursively: bool = False):
self.Send(owner, ThingMsg(self.network_id, thing))
self.Send(owner, NameMsg(self.network_id, thing))
self.Send(owner, ModelUrlMsg(self.network_id, thing))
@ -145,7 +152,7 @@ class ParticipantUDP(Participant):
self.SendThingInfo(owner, child, recursively)
def Send(self, owner, msg):
def Send(self, owner: Participant, msg: IMessage):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
@ -154,7 +161,7 @@ class ParticipantUDP(Participant):
self.udp_socket.sendto(self.buffer[:buffer_size], (owner.ip_address, owner.port))
return True
def Publish(self, msg):
def Publish(self, msg: IMessage):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
@ -183,7 +190,7 @@ class ParticipantUDP(Participant):
self.network_id = 0
pass
def ReceiveData(self, data, sender):
def ReceiveData(self, data: bytes, sender: Participant):
msgId = data[0]
# print(f'msg {msgId} ')
match msgId:
@ -200,12 +207,14 @@ class ParticipantUDP(Participant):
case ModelUrlMsg.id:
self.ProcessModelUrlMsg(ModelUrlMsg(data))
case BinaryMsg.id:
self.ProcessBinaryMsg(BinaryMsg(data))
self.ProcessBinaryMsg(BinaryMsg(data))
case _:
pass
def ProcessParticipantMsg(self, sender, msg: ParticipantMsg):
def ProcessParticipantMsg(self, sender: Participant, msg: ParticipantMsg):
pass
def ProcessSiteIdMsg(self, sender, msg: NetworkIdMsg):
def ProcessSiteIdMsg(self, sender: Participant, msg: NetworkIdMsg):
print(f'{self.name} Process SiteMsg {self.network_id} -> {msg.network_id}')
if self.network_id != msg.network_id:
self.network_id = msg.network_id
@ -227,8 +236,8 @@ class ParticipantUDP(Participant):
def ProcessBinaryMsg(self, msg: BinaryMsg):
# print('received binary data')
thing: Thing = self.Get(msg.thing_id)
if thing != None:
thing: Optional[Thing] = self.Get(msg.thing_id)
if thing is not None:
thing.ProcessBinary(msg.data)
#endregion

View File

@ -1,8 +1,8 @@
from LinearAlgebra.Spherical import *
from LinearAlgebra.Quaternion import *
from LinearAlgebra.SwingTwist import *
from RoboidControl.Messages import *
from Participant import Participant
from LinearAlgebra.Spherical import Spherical
from LinearAlgebra.Quaternion import Quaternion
from typing import Optional
import time
class Thing:
@ -34,7 +34,7 @@ class Thing:
# region Init
def __init__(self, owner = None, parent = None, thing_type = Type.Undetermined, thing_id = 0):
def __init__(self, owner: Optional[Participant] = None, parent: Optional['Thing'] = None, thing_type: int = Type.Undetermined, thing_id: int = 0) -> None:
"""! Create a new thing
@param owner The owning participant
@param parent The parent thing (will override owner if set)
@ -42,16 +42,16 @@ class Thing:
@param thingId The ID of the thing, leave out or set to zero to generate an ID
"""
## The participant owning this thing
self.owner = None
self.owner: Optional[Participant] = None
## The ID of the thing
self.id = thing_id
self.id: int = thing_id
## The type of the thing
#
## This can be either a \ref RoboidControl::Thing::Thing::Type "Thing.Type" or a byte value for custom types.
self.type = thing_type
self.type: int = thing_type
## The parent of this thing
self.parent = None
self.parent: Optional[Thing] = None
if parent is not None:
self.owner = parent.owner
self.SetParent(parent)
@ -62,12 +62,12 @@ class Thing:
self.owner = owner
## The children of this thing
self.children = []
self.children: list[Thing] = list()
## The name of the thing
self.name = None
self.name: Optional[str] = None
## An URL pointing to the location where a model of the thing can be found
self.model_url = None
self.model_url: Optional[str] = None
## The position of the thing in local space, in meters
self.position: Spherical = Spherical.zero
@ -89,14 +89,14 @@ class Thing:
## Boolean indicating the thing has an updated angular velocity
self.angular_velocity_updated: bool = False
#self.pose_updated = 0x00 # the bits indicate which fields have been updated
self.owner.Add(self)
if self.owner is not None:
self.owner.Add(self)
# endregion Init
# region Hierarchy
def SetParent(self, parent):
def SetParent(self, parent: Optional['Thing']):
"""! Sets the parent of this Thing
@param The Thing which should become the parent
@note Do not set Thing.parent directly, as that will break the parent-child relation
@ -109,7 +109,7 @@ class Thing:
else:
parent.AddChild(self)
def AddChild(self, child):
def AddChild(self, child: 'Thing'):
"""! Add a child Thing to this Thing
@param child The Thing which should become a child
@remark When the Thing is already a child, it will not be added again
@ -120,21 +120,20 @@ class Thing:
child.parent = self
self.children.append(child)
def RemoveChild(self, child):
def RemoveChild(self, child: 'Thing'):
"""! Remove the given thing as a child of this thing
@param child The child to remove
"""
self.children.remove(child)
def GetChild(self, thing_id, recurse = False):
def GetChild(self, thing_id: int, recurse: bool = False) -> Optional['Thing']:
"""! Get a child by thing Id
@param id The thing ID to find
@param recurse Look recursively through all descendants
@returns The found thing of nullptr when nothing is found
"""
for child in self.children:
if child is None:
continue
if child.id == thing_id:
return child
if recurse:
@ -143,19 +142,17 @@ class Thing:
return found_child
return None
def FindChild(self, name, recurse = True):
def FindChild(self, name: str, recurse: bool = True) -> Optional['Thing']:
"""! Find a thing by name
@param name The name of the thing
@return The found thing or nullptr when nothing is found
@param recurse Look recursively through all descendants
"""
for child in self.children:
if child is None:
continue
if child.name == name:
return child
if recurse:
found_child = child.GetChild(name, recurse)
found_child = child.FindChild(name, recurse)
if found_child is not None:
return found_child
return None
@ -163,21 +160,21 @@ class Thing:
# region Pose
def SetPosition(self, position):
def SetPosition(self, position: Spherical) -> None:
"""! Set the position of the thing
@param position The new position in local space, in meters
"""
self.position = position
self.position_updated = True
def SetOrientation(self, orientation):
def SetOrientation(self, orientation: Quaternion) -> None:
"""! Set the orientation of the thing
@param orientation The new orientation in local space
"""
self.orientation = orientation
self.orientation_updated = True
def SetLinearVelocity(self, linear_velocity):
def SetLinearVelocity(self, linear_velocity: Spherical) -> None:
"""! Set the linear velocity of the thing
@param linearVelocity The new linear velocity in local space, in meters per second
"""
@ -186,7 +183,7 @@ class Thing:
self.linear_velocity = linear_velocity
def SetAngularVelocity(self, angular_velocity):
def SetAngularVelocity(self, angular_velocity: Spherical) -> None:
"""! Set the angular velocity of the thing
@param angularVelocity the new angular velocity in local space
"""
@ -200,13 +197,13 @@ class Thing:
# region Update
@staticmethod
def GetTimeMs():
def GetTimeMs() -> int:
"""! Get the current time in milliseconds
@return The current time in milliseconds
"""
return time.time() * 1000
return int(time.time() * 1000)
def Update(self, currentTime = 0, recurse = False):
def Update(self, currentTimeMs: int = 0, recurse: bool = False) -> None:
"""! Update de state of the thing
@param currentTimeMs The current clock time in milliseconds; If this is zero, the current time is retrieved automatically
@param recurse When true, this will Update the descendants recursively
@ -218,21 +215,19 @@ class Thing:
if recurse:
for child in self.children:
if child is None:
continue
child.Update(currentTime, recurse)
child.Update(currentTimeMs, recurse)
# endregion Update
def GenerateBinary(self, buffer, ix_ref) -> int:
def GenerateBinary(self, bytes: bytearray, ix_ref: set[int]) -> int:
"""! Function used to generate binary data for this thing
@param buffer The byte array for thw binary data
@param ix The starting position for writing the binary data
@param ix_ref A single element array with the starting position for writing the binary data
@returns The size of the binary data
"""
return 0
def ProcessBinary(self, data):
def ProcessBinary(self, data: bytes):
"""! Function used to process binary data received for this thing
@param bytes The binary data
"""

View File

@ -0,0 +1,32 @@
from RoboidControl.Thing import Thing
class TouchSensor(Thing):
"""! A sensor which can detect touches
"""
def __init__(self, owner = None, parent = None, thing_id = 0):
"""! Create a touch sensor
"""
super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor, thing_id = thing_id)
## Value which is true when the sensor is touching something, false otherwise
self.touched_something = False
def GenerateBinary(self, bytes, ix_ref):
"""! Function used to generate binary data for this touch sensor
@param bytes The byte array for thw binary data
@param ix_ref A single element array with the starting position for writing the binary data
@returns The size of the binary data
"""
ix = ix_ref[0]
if self.touched_something:
bytes[ix] = 0
else:
bytes[ix] = 1
ix_ref[0] += 1
return 1
def ProcessBinary(self, bytes):
"""! Function used to process binary data received for this touch sensor
@param bytes The binary data to process
"""
self.touched_something = bytes[0] == 1

View File

@ -2,18 +2,26 @@ from RoboidControl.Thing import Thing
from RoboidControl.Messages import LowLevelMessages
class TemperatureSensor(Thing):
def __init__(self, thing_id):
super().__init__(thing_id, thing_type = Thing.Type.TemperatureSensor)
self.temp = 0
self._watchers = []
def OnUpdate(self, handler):
self._watchers.append(handler)
def CancelOnUpdate(self, handler):
self._watchers.remove(handler)
"""! A temperature sensor
"""
def __init__(self, owner = None, parent = None, thing_id = 0):
super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TemperatureSensor, thing_id = thing_id)
def ProcessBinary(self, data):
## The temperature in degrees celcius
self.temperature = 0
def GenerateBinary(self, bytes, ix_ref):
"""! Function used to generate binary data for this temperature sensor
@param bytes The byte array for thw binary data
@param ix_ref A single element array with the starting position for writing the binary data
@returns The size of the binary data
"""
LowLevelMessages.SendFloat16(bytes, ix_ref, self.temperature)
return 2
def ProcessBinary(self, bytes):
"""! Function used to process the binary data received for this temperature sensor
@param bytes The binary data to process
"""
ix = 0
self.temp = LowLevelMessages.ReceiveFloat16(data, [ix])
for watcher in self._watchers:
watcher(self.temp)
self.temperature = LowLevelMessages.ReceiveFloat16(bytes, [ix])

View File

@ -3,19 +3,30 @@ from RoboidControl.Thing import Thing
class TouchSensor(Thing):
"""! A sensor which can detect touches
"""
def __init__(self, owner = None, parent = None):
def __init__(self, owner = None, parent = None, thing_id = 0):
"""! Create a touch sensor
"""
super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor)
super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor, thing_id = thing_id)
## Value which is true when the sensor is touching something, false otherwise
self.touched_something = False
def GenerateBinary(self, bytes, ix_ref):
"""! Function used to generate binary data for this touch sensor
@param bytes The byte array for thw binary data
@param ix_ref A single element array with the starting position for writing the binary data
@returns The size of the binary data
"""
ix = ix_ref[0]
if self.touched_something:
bytes[ix] = 0
else:
bytes[ix] = 1
ix_ref[0] += 1
return 1
def ProcessBinary(self, bytes):
"""! Function to extract the touch state received in a binary message
"""! Function used to process binary data received for this touch sensor
@param bytes The binary data to process
"""
self.touched_something = bytes[0] == 1
if self.touched_something:
print(f"{self.name} touched something!")
else:
print(f"{self.name} touching nothing")