diff --git a/RoboidControl/Messages/BinaryMsg.py b/RoboidControl/Messages/BinaryMsg.py index 9f45d1c..f714b17 100644 --- a/RoboidControl/Messages/BinaryMsg.py +++ b/RoboidControl/Messages/BinaryMsg.py @@ -1,30 +1,31 @@ -#from Thing import Thing +from Thing import Thing +from Messages import IMessage -class BinaryMsg(): +from typing import Optional, Union + +class BinaryMsg(IMessage): id = 0xB1 length = 4 - def __init__(self, data, thing = None): - if isinstance(data, bytes): + def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None): + if thing is None: + data = bytes(arg1) self.thing_id = data[2] self.data_length = data[3] self.data = data[4:] else: - self.network_id = data + self.network_id = int(arg1) self.thing_id = thing.id self.thing = thing - def Serialize(self, buffer_ref): - if self.thing_id is None: - return 0 - + def Serialize(self, buffer_ref: list[bytearray]): buffer: bytearray = buffer_ref[0] ix = self.length self.data_length = self.thing.GenerateBinary(buffer, {ix}) if ix <= self.length: return 0 - print(f'Send BinaryMsg [{self.network_id}/{self.thing_id}] {self.thing_type} {self.parent_id}') + print(f'Send BinaryMsg [{self.network_id}/{self.thing_id}] {self.data_length}') buffer[0] = self.id buffer[1] = self.network_id diff --git a/RoboidControl/Messages/Messages.py b/RoboidControl/Messages/Messages.py index 7fc1325..0ec826b 100644 --- a/RoboidControl/Messages/Messages.py +++ b/RoboidControl/Messages/Messages.py @@ -1,4 +1,5 @@ #import Messages.LowLevelMessages as LowLevelMessages +# from ParticipantUDP import ParticipantUDP class IMessage: id = 0x00 @@ -6,84 +7,17 @@ class IMessage: ## Serialize the message into the given buffer # ## @returns: the length of the message - def Serialize(buffer): + def Serialize(self, buffer_ref: list[bytearray]) -> int: return 0 - def SendTo(self, participant): - buffer_size = self.Serialize([participant.buffer]) - if buffer_size == 0: - return False - return participant.SendBuffer(buffer_size) + # def SendTo(self, participant: ParticipantUDP) -> bool: + # buffer_size = self.Serialize([participant.buffer]) + # if buffer_size == 0: + # return False + # return participant.SendBuffer(buffer_size) - def Publish(self, participant): - bufferSize = self.Serialize([participant.buffer]) - if bufferSize == 0: - return False - return participant.PublishBuffer(bufferSize) - -# class InvestigateMsg(): -# id = 0x81 -# length = 3 - -# def __init__(self, buffer): -# self.thing_id = buffer[2] - -# class PoseMsg(IMessage): -# """! Message to communicate the pose of the thing - -# The pose is in local space relative to the parent. -# If there is not parent (the thing is a root thing), the pose will be in world space. -# """ -# ## The message ID -# id = 0x10 -# ## The length of the message -# length = 4 - -# def __init__(self, thing): -# self.thing = thing - -# def Serialize(self, buffer_ref): -# if self.thing is None: -# return 0 - -# buffer: bytearray = buffer_ref[0] -# buffer[0:PoseMsg.length] = [ -# PoseMsg.id, -# 0, # Network id -# self.thing.id, -# self.thing.pose_updated -# ] -# ix = [4] -# if self.thing.pose_updated & Thing.Position: -# LowLevelMessages.SendSpherical(buffer, ix, self.thing.position) -# if self.thing.pose_updated & Thing.Orientation: -# LowLevelMessages.SendQuat32(buffer, ix, self.thing.orientation) -# if self.thing.pose_updated & Thing.LinearVelocity: -# LowLevelMessages.SendSpherical(buffer, ix, self.thing.linearVelocity) -# if self.thing.pose_updated & Thing.AngularVelocity: -# LowLevelMessages.SendSpherical(buffer, ix, self.thing.angularVelocity) -# return ix[0] - -# class BinaryMsg(): -# id = 0xB1 - -# def __init__(self, buffer): -# self.thing_id = buffer[2] -# self.thing = Thing.Get(self.thing_id) -# self.data = buffer[3:] - -# def SendTo(participant, thing, data: bytearray): -# length = 3 - -# if thing is None or data is None: -# return False - -# participant.buffer[0:length] = [ -# BinaryMsg.id, -# 0, # network_id, -# thing.id -# ] -# full_length = length + len(data) -# participant.buffer[length:full_length] = data -# participant.SendBuffer(full_length) -# return True \ No newline at end of file + # def Publish(self, participant: Participant) -> bool: + # bufferSize = self.Serialize([participant.buffer]) + # if bufferSize == 0: + # return False + # return participant.PublishBuffer(bufferSize) \ No newline at end of file diff --git a/RoboidControl/Messages/ModelUrlMsg.py b/RoboidControl/Messages/ModelUrlMsg.py index 30f1174..c396571 100644 --- a/RoboidControl/Messages/ModelUrlMsg.py +++ b/RoboidControl/Messages/ModelUrlMsg.py @@ -1,25 +1,25 @@ from RoboidControl.Messages.Messages import IMessage +from Thing import Thing + +from typing import Union, Optional class ModelUrlMsg(IMessage): id = 0x90 length = 4 - - thing_id = None - url = None - def __init__(self, data, thing = None): - if isinstance(data, bytes): + def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None): + if thing is None: + data = bytes(arg1) self.thing_id = data[2] # model url length is not needed here self.url = data[ModelUrlMsg.length:].decode("utf-8") else: - if thing is not None: - self.network_id = data - self.thing_id = thing.id - self.url = thing.model_url + self.network_id: int = int(arg1) + self.thing_id: int = thing.id + self.url: Optional[str] = thing.model_url - def Serialize(self, buffer_ref): - if self.thing_id is None or self.url is None: + def Serialize(self, buffer_ref: list[bytearray]): + if self.url is None: return 0 buffer: bytearray = buffer_ref[0] diff --git a/RoboidControl/Messages/NameMsg.py b/RoboidControl/Messages/NameMsg.py index c43bdba..811e102 100644 --- a/RoboidControl/Messages/NameMsg.py +++ b/RoboidControl/Messages/NameMsg.py @@ -1,25 +1,25 @@ from RoboidControl.Messages.Messages import IMessage +from Thing import Thing + +from typing import Union, Optional class NameMsg(IMessage): id = 0x91 length = 4 - thing_id = None - name = None - - def __init__(self, data, thing = None): - if isinstance(data, bytes): + def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing] = None): + if thing is None: + data = bytes(arg1) self.thing_id = data[2] # name_length is not needed here self.name = data[NameMsg.length:].decode("utf-8") else: - if thing is not None: - self.network_id = data - self.thing_id = thing.id - self.name = thing.name + self.network_id: int = int(arg1) + self.thing_id: int = thing.id + self.name: Optional[str] = thing.name - def Serialize(self, buffer_ref): - if self.thing_id is None or self.name is None: + def Serialize(self, buffer_ref: list[bytearray]): + if self.name is None: return 0 buffer: bytearray = buffer_ref[0] diff --git a/RoboidControl/Messages/NetworkIdMsg.py b/RoboidControl/Messages/NetworkIdMsg.py index 4b6b55b..73836b5 100644 --- a/RoboidControl/Messages/NetworkIdMsg.py +++ b/RoboidControl/Messages/NetworkIdMsg.py @@ -1,27 +1,31 @@ from RoboidControl.Messages.Messages import IMessage +from typing import Union + ## A network id message invites another participant to a site # ## This can be sent in response to a ClientMsg class NetworkIdMsg(IMessage): - id = 0xA1 - length = 2 + id: int = 0xA1 + length: int = 2 ## Create a network id message - def __init__(self, arg1 = None): + def __init__(self, arg1: Union[bytes, int]): + self.network_id: int = 0 if isinstance(arg1, bytes): - buffer = arg1 + buffer = bytearray(arg1) self.network_id = buffer[1] else: - self.network_id = arg1 + self.network_id = int(arg1) ## Serialize the message into the given buffer # ## @param buffer_ref A reference to the buffer to use. This should be a list with the buffer as its first and only element ## @returns the length of the message - def Serialize(self, buffer_ref): + def Serialize(self, buffer_ref: list[bytearray]) -> int: buffer: bytearray = buffer_ref[0] - buffer[0:NetworkIdMsg.length] = [ + last:int = NetworkIdMsg.length + buffer[0:last] = [ NetworkIdMsg.id, self.network_id ] diff --git a/RoboidControl/Messages/PoseMsg.py b/RoboidControl/Messages/PoseMsg.py index 108a49d..37337ce 100644 --- a/RoboidControl/Messages/PoseMsg.py +++ b/RoboidControl/Messages/PoseMsg.py @@ -1,7 +1,7 @@ import RoboidControl.Messages.LowLevelMessages as LowLevelMessages #from Thing import Thing -class PoseMsg(): +class PoseMsg(IMessage): id = 0x10 length = 4 diff --git a/RoboidControl/Messages/ThingMsg.py b/RoboidControl/Messages/ThingMsg.py index a557965..948b4d6 100644 --- a/RoboidControl/Messages/ThingMsg.py +++ b/RoboidControl/Messages/ThingMsg.py @@ -1,34 +1,30 @@ from RoboidControl.Messages.Messages import IMessage +from Thing import Thing + +from typing import Optional, Union class ThingMsg(IMessage): id = 0x80 length = 5 - thing_id = None - thing_type = None - parent_id = None - - def __init__(self, arg1, thing=None): - if isinstance(arg1, bytes): - buffer = arg1 - self.network_id = buffer[1] - self.thing_id = buffer[2] - self.thing_type = buffer[3] - self.parent_id = buffer[4] + def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing]=None): + if thing is None: + buffer = bytes(arg1) + self.network_id: int = buffer[1] + self.thing_id: int = buffer[2] + self.thing_type: int = buffer[3] + self.parent_id: int = buffer[4] else: - if thing is not None: - self.network_id = arg1 - self.thing_id = thing.id - self.thing_type = thing.type - if thing.parent is not None: - self.parent_id = thing.parent.id - else: - self.parent_id = 0 + self.network_id = int(arg1) + self.thing_id = thing.id + self.thing_type = thing.type + if thing.parent is not None: + self.parent_id = thing.parent.id + else: + self.parent_id = 0 - def Serialize(self, buffer_ref): - if self.thing_id is None: - return 0 - + def Serialize(self, buffer_ref: list[bytearray]): + print(f'Send ThingMsg [{self.network_id}/{self.thing_id}] {self.thing_type} {self.parent_id}') buffer: bytearray = buffer_ref[0] diff --git a/RoboidControl/Participant.py b/RoboidControl/Participant.py index 79a553a..bb2a9b0 100644 --- a/RoboidControl/Participant.py +++ b/RoboidControl/Participant.py @@ -1,3 +1,7 @@ +from Thing import Thing + +from typing import Optional + class Participant: """! A participant is a device which manages things. @@ -6,35 +10,35 @@ class Participant: It also maintains the communcation information to contact the participant. It is used as a basis for the local participant, but also as a reference to remote participants. """ - def __init__(self, ip_address, port): + def __init__(self, ip_address: str, port: int) -> None: """! Create a new participant with the given communcation info @param ip_address The IP address of the participant @param port The UDP port of the participant """ ## The Ip Address of a participant. When the participant is local, this contains 0.0.0.0 - self.ip_address = ip_address + self.ip_address: str = ip_address ## The port number for UDP communication with the participant. This is 0 for isolated participants. - self.port = port + self.port: int = port ## he network Id to identify the participant. - self.network_id =0 + self.network_id: int = 0 ## The things managed by this participant - self.things = set() + self.things: set[Thing] = set() - def Get(self, thing_id): + def Get(self, thing_id: int) -> Optional[Thing]: """! Get the thing with the given properties @param thing_id The ID of the thing @return The thing if found, None in other cases """ for thing in self.things: - if thing is not None and thing.id == thing_id: + if thing.id == thing_id: return thing return None - def Add(self, thing, check_id = True): + def Add(self, thing: Thing, check_id: bool = True): """! Add a new thing for this participant. @param thing The thing to add @param check_id If true, the thing.id is regenerated if it is zero @@ -47,16 +51,15 @@ class Participant: if found_thing == None: self.things.add(thing) - def Remove(self, thing): + def Remove(self, thing: Thing) -> None: """! Remove a thing for this participant @param thing The thing to remove """ self.things.remove(thing) - def Update(self, currentTimeMs=0): + def Update(self, currentTimeMs: int = 0) -> None: """! Update all things for this participant @param The current time in milliseconds (optional) """ for thing in list(self.things): - if thing is not None: - thing.Update(currentTimeMs) + thing.Update(currentTimeMs) diff --git a/RoboidControl/ParticipantUDP.py b/RoboidControl/ParticipantUDP.py index 6d31332..283332b 100644 --- a/RoboidControl/ParticipantUDP.py +++ b/RoboidControl/ParticipantUDP.py @@ -1,7 +1,3 @@ -import socket -import threading -import time - from RoboidControl.Participant import Participant from RoboidControl.Thing import Thing from RoboidControl.Messages.ParticipantMsg import ParticipantMsg @@ -9,13 +5,19 @@ from RoboidControl.Messages.NetworkIdMsg import NetworkIdMsg from RoboidControl.Messages.ThingMsg import ThingMsg from RoboidControl.Messages.NameMsg import NameMsg from RoboidControl.Messages.ModelUrlMsg import ModelUrlMsg +from RoboidControl.Messages.Messages import IMessage from RoboidControl.Messages import * -import sys -micropython = 'micropython' in sys.modules +import socket +import threading +import time +from typing import Optional -if micropython: - from MicroPython.uPythonParticipant import Bla +# import sys +# micropython = 'micropython' in sys.modules + +# if micropython: +# from MicroPython.uPythonParticipant import uPythonParticipant class ParticipantUDP(Participant): """! A local participant is the local device which can communicate with other participants. @@ -26,14 +28,18 @@ class ParticipantUDP(Participant): Currently, only UDP communication is supported """ - buffer = None + #buffer = None nextPublishMe = 0 - others = None - thread = None + #others = None + #thread = None name = "Participant" isolated_participant = None - def __init__(self, port=7681, ip_address=None, local_port=7681): + def __init__(self, + port: int = 7681, + ip_address: Optional[str] = None, + local_port: int = 7681 + ) -> None: super().__init__(ip_address = "127.0.0.1", port = local_port) # if local_port == 0: @@ -42,11 +48,11 @@ class ParticipantUDP(Participant): ## True if the participant is running isolated. # Isolated participants do not communicate with other participants - self.is_isolated = True - self.remote_site = None + self.is_isolated: bool = True + self.remote_site: Optional[Participant] = None ## The other participants communicating with this participant - self.others = [] + self.others: list[Participant] = [] if port != 0: self.is_isolated = False @@ -54,7 +60,7 @@ class ParticipantUDP(Participant): self.remote_site = Participant(ip_address, port) self.others.append(self.remote_site) - self.buffer = bytearray(256) + self.buffer: bytearray = bytearray(256) self.publishInterval = 3000 # 3 seconds self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -65,14 +71,15 @@ class ParticipantUDP(Participant): self.thread.daemon = True self.thread.start() - def Isolated(): + @staticmethod + def Isolated() -> 'ParticipantUDP': if ParticipantUDP.isolated_participant == None: ParticipantUDP.isolated_participant = ParticipantUDP(0) return ParticipantUDP.isolated_participant #region Update - def GetParticipant(self, ip_address, port): + def GetParticipant(self, ip_address: str, port: int): # print(f'{self.name} Get participant {ip_address} {port}') # for item in self.others: # print(f'- {item.ip_address} {item.port}') @@ -82,15 +89,15 @@ class ParticipantUDP(Participant): participant = next(found_participants, None) return participant - def AddParticipant(self, ip_address, port): + def AddParticipant(self, ip_address: str, port: int): print(f'{self.name} Add participant {ip_address} {port}') remote_participant = Participant(ip_address = ip_address, port = port) self.others.append(remote_participant) return remote_participant - def Update(self, currentTimeMs = None): + def Update(self, currentTimeMs: Optional[int] = None): if currentTimeMs is None: - currentTimeMs = time.time() * 1000 + currentTimeMs = int(time.time() * 1000) if self.is_isolated == False: if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe: @@ -104,16 +111,15 @@ class ParticipantUDP(Participant): self.UpdateMyThings(currentTimeMs) - def UpdateMyThings(self, currentTimeMs): + def UpdateMyThings(self, currentTimeMs: int): for thing in self.things: - if thing is None: - continue - # if thing.hierarchyChanged and not (self.isIsolated or self.network_id == ): # thingMsg = ThingMsg(self.network_id, thing) # self.Send(self.remote_site, thingMsg) - poseMsg = PoseMsg(thing.owner.network_id, thing) - self.Send(self.remote_site, poseMsg) + if thing.owner is not None: + poseMsg = PoseMsg(thing.owner.network_id, thing) + if self.remote_site is not None: + self.Send(self.remote_site, poseMsg) thing.Update(currentTimeMs, False) if not(self.is_isolated or self.network_id == 0): @@ -121,11 +127,12 @@ class ParticipantUDP(Participant): # destroyMsg = DestroyMsg(self.network_id, thing) # self.Send(self.remote_site, destroyMsg) # else: - # Send to remote site - poseMsg = PoseMsg(thing.owner.network_id, thing) - self.Send(self.remote_site, poseMsg) - binaryMsg = BinaryMsg(thing.owner.network_id, thing) - self.Send(self.remote_site, binaryMsg) + if self.remote_site is not None and thing.owner is not None: + # Send to remote site + poseMsg = PoseMsg(thing.owner.network_id, thing) + self.Send(self.remote_site, poseMsg) + binaryMsg = BinaryMsg(thing.owner.network_id, thing) + self.Send(self.remote_site, binaryMsg) # if thing.terminate: # self.Remove(thing) @@ -133,7 +140,7 @@ class ParticipantUDP(Participant): #region Send - def SendThingInfo(self, owner, thing, recursively: bool = False): + def SendThingInfo(self, owner: Participant, thing: 'Thing', recursively: bool = False): self.Send(owner, ThingMsg(self.network_id, thing)) self.Send(owner, NameMsg(self.network_id, thing)) self.Send(owner, ModelUrlMsg(self.network_id, thing)) @@ -145,7 +152,7 @@ class ParticipantUDP(Participant): self.SendThingInfo(owner, child, recursively) - def Send(self, owner, msg): + def Send(self, owner: Participant, msg: IMessage): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True @@ -154,7 +161,7 @@ class ParticipantUDP(Participant): self.udp_socket.sendto(self.buffer[:buffer_size], (owner.ip_address, owner.port)) return True - def Publish(self, msg): + def Publish(self, msg: IMessage): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True @@ -183,7 +190,7 @@ class ParticipantUDP(Participant): self.network_id = 0 pass - def ReceiveData(self, data, sender): + def ReceiveData(self, data: bytes, sender: Participant): msgId = data[0] # print(f'msg {msgId} ') match msgId: @@ -200,12 +207,14 @@ class ParticipantUDP(Participant): case ModelUrlMsg.id: self.ProcessModelUrlMsg(ModelUrlMsg(data)) case BinaryMsg.id: - self.ProcessBinaryMsg(BinaryMsg(data)) + self.ProcessBinaryMsg(BinaryMsg(data)) + case _: + pass - def ProcessParticipantMsg(self, sender, msg: ParticipantMsg): + def ProcessParticipantMsg(self, sender: Participant, msg: ParticipantMsg): pass - def ProcessSiteIdMsg(self, sender, msg: NetworkIdMsg): + def ProcessSiteIdMsg(self, sender: Participant, msg: NetworkIdMsg): print(f'{self.name} Process SiteMsg {self.network_id} -> {msg.network_id}') if self.network_id != msg.network_id: self.network_id = msg.network_id @@ -227,8 +236,8 @@ class ParticipantUDP(Participant): def ProcessBinaryMsg(self, msg: BinaryMsg): # print('received binary data') - thing: Thing = self.Get(msg.thing_id) - if thing != None: + thing: Optional[Thing] = self.Get(msg.thing_id) + if thing is not None: thing.ProcessBinary(msg.data) #endregion \ No newline at end of file diff --git a/RoboidControl/Thing.py b/RoboidControl/Thing.py index 580e8d8..5da7919 100644 --- a/RoboidControl/Thing.py +++ b/RoboidControl/Thing.py @@ -1,8 +1,8 @@ -from LinearAlgebra.Spherical import * -from LinearAlgebra.Quaternion import * -from LinearAlgebra.SwingTwist import * -from RoboidControl.Messages import * +from Participant import Participant +from LinearAlgebra.Spherical import Spherical +from LinearAlgebra.Quaternion import Quaternion +from typing import Optional import time class Thing: @@ -34,7 +34,7 @@ class Thing: # region Init - def __init__(self, owner = None, parent = None, thing_type = Type.Undetermined, thing_id = 0): + def __init__(self, owner: Optional[Participant] = None, parent: Optional['Thing'] = None, thing_type: int = Type.Undetermined, thing_id: int = 0) -> None: """! Create a new thing @param owner The owning participant @param parent The parent thing (will override owner if set) @@ -42,16 +42,16 @@ class Thing: @param thingId The ID of the thing, leave out or set to zero to generate an ID """ ## The participant owning this thing - self.owner = None + self.owner: Optional[Participant] = None ## The ID of the thing - self.id = thing_id + self.id: int = thing_id ## The type of the thing # ## This can be either a \ref RoboidControl::Thing::Thing::Type "Thing.Type" or a byte value for custom types. - self.type = thing_type + self.type: int = thing_type ## The parent of this thing - self.parent = None + self.parent: Optional[Thing] = None if parent is not None: self.owner = parent.owner self.SetParent(parent) @@ -62,12 +62,12 @@ class Thing: self.owner = owner ## The children of this thing - self.children = [] + self.children: list[Thing] = list() ## The name of the thing - self.name = None + self.name: Optional[str] = None ## An URL pointing to the location where a model of the thing can be found - self.model_url = None + self.model_url: Optional[str] = None ## The position of the thing in local space, in meters self.position: Spherical = Spherical.zero @@ -89,14 +89,14 @@ class Thing: ## Boolean indicating the thing has an updated angular velocity self.angular_velocity_updated: bool = False - #self.pose_updated = 0x00 # the bits indicate which fields have been updated - self.owner.Add(self) + if self.owner is not None: + self.owner.Add(self) # endregion Init # region Hierarchy - def SetParent(self, parent): + def SetParent(self, parent: Optional['Thing']): """! Sets the parent of this Thing @param The Thing which should become the parent @note Do not set Thing.parent directly, as that will break the parent-child relation @@ -109,7 +109,7 @@ class Thing: else: parent.AddChild(self) - def AddChild(self, child): + def AddChild(self, child: 'Thing'): """! Add a child Thing to this Thing @param child The Thing which should become a child @remark When the Thing is already a child, it will not be added again @@ -120,21 +120,20 @@ class Thing: child.parent = self self.children.append(child) - def RemoveChild(self, child): + + def RemoveChild(self, child: 'Thing'): """! Remove the given thing as a child of this thing @param child The child to remove """ self.children.remove(child) - def GetChild(self, thing_id, recurse = False): + def GetChild(self, thing_id: int, recurse: bool = False) -> Optional['Thing']: """! Get a child by thing Id @param id The thing ID to find @param recurse Look recursively through all descendants @returns The found thing of nullptr when nothing is found """ for child in self.children: - if child is None: - continue if child.id == thing_id: return child if recurse: @@ -143,19 +142,17 @@ class Thing: return found_child return None - def FindChild(self, name, recurse = True): + def FindChild(self, name: str, recurse: bool = True) -> Optional['Thing']: """! Find a thing by name @param name The name of the thing @return The found thing or nullptr when nothing is found @param recurse Look recursively through all descendants """ for child in self.children: - if child is None: - continue if child.name == name: return child if recurse: - found_child = child.GetChild(name, recurse) + found_child = child.FindChild(name, recurse) if found_child is not None: return found_child return None @@ -163,21 +160,21 @@ class Thing: # region Pose - def SetPosition(self, position): + def SetPosition(self, position: Spherical) -> None: """! Set the position of the thing @param position The new position in local space, in meters """ self.position = position self.position_updated = True - def SetOrientation(self, orientation): + def SetOrientation(self, orientation: Quaternion) -> None: """! Set the orientation of the thing @param orientation The new orientation in local space """ self.orientation = orientation self.orientation_updated = True - def SetLinearVelocity(self, linear_velocity): + def SetLinearVelocity(self, linear_velocity: Spherical) -> None: """! Set the linear velocity of the thing @param linearVelocity The new linear velocity in local space, in meters per second """ @@ -186,7 +183,7 @@ class Thing: self.linear_velocity = linear_velocity - def SetAngularVelocity(self, angular_velocity): + def SetAngularVelocity(self, angular_velocity: Spherical) -> None: """! Set the angular velocity of the thing @param angularVelocity the new angular velocity in local space """ @@ -200,13 +197,13 @@ class Thing: # region Update @staticmethod - def GetTimeMs(): + def GetTimeMs() -> int: """! Get the current time in milliseconds @return The current time in milliseconds """ - return time.time() * 1000 + return int(time.time() * 1000) - def Update(self, currentTime = 0, recurse = False): + def Update(self, currentTimeMs: int = 0, recurse: bool = False) -> None: """! Update de state of the thing @param currentTimeMs The current clock time in milliseconds; If this is zero, the current time is retrieved automatically @param recurse When true, this will Update the descendants recursively @@ -218,21 +215,19 @@ class Thing: if recurse: for child in self.children: - if child is None: - continue - child.Update(currentTime, recurse) + child.Update(currentTimeMs, recurse) # endregion Update - def GenerateBinary(self, buffer, ix_ref) -> int: + def GenerateBinary(self, bytes: bytearray, ix_ref: set[int]) -> int: """! Function used to generate binary data for this thing @param buffer The byte array for thw binary data - @param ix The starting position for writing the binary data + @param ix_ref A single element array with the starting position for writing the binary data @returns The size of the binary data """ return 0 - def ProcessBinary(self, data): + def ProcessBinary(self, data: bytes): """! Function used to process binary data received for this thing @param bytes The binary data """ diff --git a/RoboidControl/Things/DigitalSensor.py b/RoboidControl/Things/DigitalSensor.py new file mode 100644 index 0000000..c996c84 --- /dev/null +++ b/RoboidControl/Things/DigitalSensor.py @@ -0,0 +1,32 @@ +from RoboidControl.Thing import Thing + +class TouchSensor(Thing): + """! A sensor which can detect touches + """ + def __init__(self, owner = None, parent = None, thing_id = 0): + """! Create a touch sensor + """ + super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor, thing_id = thing_id) + + ## Value which is true when the sensor is touching something, false otherwise + self.touched_something = False + + def GenerateBinary(self, bytes, ix_ref): + """! Function used to generate binary data for this touch sensor + @param bytes The byte array for thw binary data + @param ix_ref A single element array with the starting position for writing the binary data + @returns The size of the binary data + """ + ix = ix_ref[0] + if self.touched_something: + bytes[ix] = 0 + else: + bytes[ix] = 1 + ix_ref[0] += 1 + return 1 + + def ProcessBinary(self, bytes): + """! Function used to process binary data received for this touch sensor + @param bytes The binary data to process + """ + self.touched_something = bytes[0] == 1 diff --git a/RoboidControl/Things/TemperatureSensor.py b/RoboidControl/Things/TemperatureSensor.py index 5ad0703..9d2e8c2 100644 --- a/RoboidControl/Things/TemperatureSensor.py +++ b/RoboidControl/Things/TemperatureSensor.py @@ -2,18 +2,26 @@ from RoboidControl.Thing import Thing from RoboidControl.Messages import LowLevelMessages class TemperatureSensor(Thing): - def __init__(self, thing_id): - super().__init__(thing_id, thing_type = Thing.Type.TemperatureSensor) - self.temp = 0 - self._watchers = [] - - def OnUpdate(self, handler): - self._watchers.append(handler) - def CancelOnUpdate(self, handler): - self._watchers.remove(handler) + """! A temperature sensor + """ + def __init__(self, owner = None, parent = None, thing_id = 0): + super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TemperatureSensor, thing_id = thing_id) - def ProcessBinary(self, data): + ## The temperature in degrees celcius + self.temperature = 0 + + def GenerateBinary(self, bytes, ix_ref): + """! Function used to generate binary data for this temperature sensor + @param bytes The byte array for thw binary data + @param ix_ref A single element array with the starting position for writing the binary data + @returns The size of the binary data + """ + LowLevelMessages.SendFloat16(bytes, ix_ref, self.temperature) + return 2 + + def ProcessBinary(self, bytes): + """! Function used to process the binary data received for this temperature sensor + @param bytes The binary data to process + """ ix = 0 - self.temp = LowLevelMessages.ReceiveFloat16(data, [ix]) - for watcher in self._watchers: - watcher(self.temp) + self.temperature = LowLevelMessages.ReceiveFloat16(bytes, [ix]) diff --git a/RoboidControl/Things/TouchSensor.py b/RoboidControl/Things/TouchSensor.py index b390088..c996c84 100644 --- a/RoboidControl/Things/TouchSensor.py +++ b/RoboidControl/Things/TouchSensor.py @@ -3,19 +3,30 @@ from RoboidControl.Thing import Thing class TouchSensor(Thing): """! A sensor which can detect touches """ - def __init__(self, owner = None, parent = None): + def __init__(self, owner = None, parent = None, thing_id = 0): """! Create a touch sensor """ - super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor) + super().__init__(owner = owner, parent = parent, thing_type = Thing.Type.TouchSensor, thing_id = thing_id) ## Value which is true when the sensor is touching something, false otherwise self.touched_something = False + def GenerateBinary(self, bytes, ix_ref): + """! Function used to generate binary data for this touch sensor + @param bytes The byte array for thw binary data + @param ix_ref A single element array with the starting position for writing the binary data + @returns The size of the binary data + """ + ix = ix_ref[0] + if self.touched_something: + bytes[ix] = 0 + else: + bytes[ix] = 1 + ix_ref[0] += 1 + return 1 + def ProcessBinary(self, bytes): - """! Function to extract the touch state received in a binary message + """! Function used to process binary data received for this touch sensor + @param bytes The binary data to process """ self.touched_something = bytes[0] == 1 - if self.touched_something: - print(f"{self.name} touched something!") - else: - print(f"{self.name} touching nothing") \ No newline at end of file