Fix import issues

This commit is contained in:
Pascal Serrarens 2025-04-30 17:13:19 +02:00
parent de73cd5edc
commit b262aa1051
19 changed files with 232 additions and 106 deletions

View File

@ -3,7 +3,7 @@ from os import path
sys.path.append(path.abspath(path.join(path.dirname(__file__), '..')))
import time
from ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.ParticipantUDP import ParticipantUDP
from Thing import *
from Things.DifferentialDrive import DifferentialDrive
from Things.TouchSensor import TouchSensor

View File

@ -1,5 +1,5 @@
from Thing import Thing
from Messages import IMessage
from RoboidControl.Thing import Thing
from RoboidControl.Messages.IMessage import IMessage
from typing import Optional, Union

View File

@ -0,0 +1,8 @@
class IMessage:
id = 0x00
## Serialize the message into the given buffer
#
## @returns: the length of the message
def Serialize(self, buffer_ref: list[bytearray]) -> int:
return 0

View File

@ -1,14 +1,22 @@
import numpy as np
from LinearAlgebra.SwingTwist import SwingTwist
from LinearAlgebra.Spherical import Spherical
from LinearAlgebra.Quaternion import Quaternion
from LinearAlgebra.Angle import Angle
def SendAngle8(buffer, ix_ref, angle):
def SendAngle8(buffer, ref_ix, angle):
"""! Send an 8-bit angle value """
# angle = Angle.Normalize(angle)
ix = ix_ref[0]
ix = ref_ix[0]
buffer[ix] = int((angle.InDegrees() / 360) * 256).to_bytes(1, 'big', signed=True)[0]
ix_ref[0] += 1
ref_ix[0] += 1
def ReceiveAngle8(buffer: bytes, ref_ix: list[int]) -> Angle:
angle: Angle = Angle()
angle.value = buffer[ref_ix[0]]
ref_ix[0] += 1
return angle
def SendFloat16(buffer, ix_ref, value):
"""! Send a 16-bit floating point value """
@ -33,17 +41,27 @@ def ReceiveFloat16(buffer, ix_ref) -> float:
return float(value16)
def SendSpherical(buffer, ix_ref, vector):
"""! Send a spherical vector """
"""! Send a spherical vector
"""
SendFloat16(buffer, ix_ref, vector.distance)
SendAngle8(buffer, ix_ref, vector.direction.horizontal)
SendAngle8(buffer, ix_ref, vector.direction.vertical)
def SendQuat32(buffer, ix_ref, q):
def ReceiveSpherical(buffer: bytes, ref_ix: list[int]) -> Spherical:
"""! Receive a spherical vector
"""
distance = ReceiveFloat16(buffer, ref_ix)
horizontal = ReceiveAngle8(buffer, ref_ix)
vertical = ReceiveAngle8(buffer, ref_ix)
v: Spherical = Spherical.Degrees(distance, horizontal, vertical)
return v
def SendQuat32(buffer, ref_ix, q):
"""! Send a 32-bit quaternion value """
if isinstance(q, SwingTwist):
q = q.ToQuaternion()
ix = ix_ref[0]
ix = ref_ix[0]
qx = (int)(q.x * 127 + 128)
qy = (int)(q.y * 127 + 128)
qz = (int)(q.z * 127 + 128)
@ -59,4 +77,16 @@ def SendQuat32(buffer, ix_ref, q):
qz,
qw
]
ix_ref[0] += 4
ref_ix[0] += 4
def ReceiveQuaternion(buffer: bytes, ref_ix: list[int]):
qx: float = (buffer[ref_ix[0]] - 128) / 127
ref_ix[0] += 1
qy: float = (buffer[ref_ix[0]] - 128) / 127
ref_ix[0] += 1
qz: float = (buffer[ref_ix[0]] - 128) / 127
ref_ix[0] += 1
qw: float = buffer[ref_ix[0]] / 255
ref_ix[0] += 1
q: Quaternion = Quaternion(qx, qy, qz, qw)
return q

View File

@ -1,23 +0,0 @@
#import Messages.LowLevelMessages as LowLevelMessages
# from ParticipantUDP import ParticipantUDP
class IMessage:
id = 0x00
## Serialize the message into the given buffer
#
## @returns: the length of the message
def Serialize(self, buffer_ref: list[bytearray]) -> int:
return 0
# def SendTo(self, participant: ParticipantUDP) -> bool:
# buffer_size = self.Serialize([participant.buffer])
# if buffer_size == 0:
# return False
# return participant.SendBuffer(buffer_size)
# def Publish(self, participant: Participant) -> bool:
# bufferSize = self.Serialize([participant.buffer])
# if bufferSize == 0:
# return False
# return participant.PublishBuffer(bufferSize)

View File

@ -1,5 +1,5 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from RoboidControl.Messages.IMessage import IMessage
from RoboidControl.Thing import Thing
from typing import Union, Optional

View File

@ -1,5 +1,5 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from RoboidControl.Messages.IMessage import IMessage
from RoboidControl.Thing import Thing
from typing import Union, Optional

View File

@ -1,4 +1,4 @@
from RoboidControl.Messages.Messages import IMessage
from RoboidControl.Messages.IMessage import IMessage
from typing import Union

View File

@ -1,4 +1,4 @@
from RoboidControl.Messages.Messages import IMessage
from RoboidControl.Messages.IMessage import IMessage
class ParticipantMsg(IMessage):
"""! A participant messages notifies other participants of its presence.

View File

@ -1,5 +1,10 @@
import RoboidControl.Messages.LowLevelMessages as LowLevelMessages
#from Thing import Thing
from RoboidControl.Messages.IMessage import IMessage
from LinearAlgebra.Spherical import Spherical
from LinearAlgebra.Quaternion import Quaternion
from RoboidControl.Thing import Thing
from typing import Union, Optional
class PoseMsg(IMessage):
id = 0x10
@ -10,10 +15,28 @@ class PoseMsg(IMessage):
LinearVelocity = 0x04
AngularVelocity = 0x08
def __init__(self, arg1, thing, force: bool = False):
if isinstance(arg1, bytes):
self.thing_id = arg1[2]
self.data = arg1[3:]
def __init__(self, arg1: Union[bytes, int], thing: Optional[Thing], force: bool = False):
if thing is None:
buffer = bytes(arg1)
self.network_id = buffer[1]
self.thing_id = buffer[2]
self.pose_type = buffer[3]
self.position: Optional[Spherical] = None
self.orientation: Optional[Quaternion] = None
self.linear_velocity: Optional[Spherical] = None
self.angular_velocity: Optional[Spherical] = None
ix:int = 4
ix_ref = [ix]
if self.pose_type & PoseMsg.Position != 0:
self.position = LowLevelMessages.ReceiveSpherical(buffer, ix_ref)
if self.pose_type & PoseMsg.Orientation != 0:
self.orientation = LowLevelMessages.ReceiveQuaternion(buffer, ix_ref)
if self.pose_type & PoseMsg.LinearVelocity != 0:
self.linear_velocity = LowLevelMessages.ReceiveSpherical(buffer, ix_ref)
if self.pose_type & PoseMsg.AngularVelocity != 0:
self.angular_velocity = LowLevelMessages.ReceiveSpherical(buffer, ix_ref)
else:
self.network_id = arg1
self.thing = thing

View File

@ -1,5 +1,5 @@
from RoboidControl.Messages.Messages import IMessage
from Thing import Thing
from RoboidControl.Messages.IMessage import IMessage
from RoboidControl.Thing import Thing
from typing import Optional, Union

View File

@ -4,13 +4,6 @@ __all__ = ['BinaryMsg',
'ThingMsg',
'NameMsg',
'ModelUrlMsg',
'PoseMsg']
from .BinaryMsg import BinaryMsg
from .ParticipantMsg import ParticipantMsg
from .InvestigateMsg import InvestigateMsg
from .ThingMsg import ThingMsg
from .NetworkIdMsg import NetworkIdMsg
from .NameMsg import NameMsg
from .ModelUrlMsg import ModelUrlMsg
from .PoseMsg import PoseMsg
'ParticipantMsg',
'PoseMsg',
'IMessage']

View File

@ -1,6 +1,6 @@
from Thing import Thing
from RoboidControl.Thing import Thing
from typing import Optional
from typing import Union, Optional
class Participant:
"""! A participant is a device which manages things.
@ -63,3 +63,34 @@ class Participant:
"""
for thing in list(self.things):
thing.Update(currentTimeMs)
participants: set['Participant']
@staticmethod
def GetParticipant(arg1: Union[str, int], port: Optional[int]) -> Optional['Participant']:
if port is not None:
ip_address = str(arg1)
for participant in Participant.participants:
if participant.ip_address == ip_address and participant.port == port:
return participant
return None
else:
network_id = int(arg1)
for participant in Participant.participants:
if participant.network_id == network_id:
return participant
return None
def AddParticipant(arg1: Union[str, int], port: Optional[int]) -> 'Participant':
if port is not None:
ip_address = str(arg1)
participant = Participant(ip_address, port)
participant.network_id = len(Participant.participants) + 1
Participant.AddParticipant(participant)
return participant
else:
participant = Participant(arg1)
foundParticipant = Participant.GetParticipant(participant.network_id)
if foundParticipant is not None:
Participant.participants.add(participant)
return participant

View File

@ -1,17 +1,21 @@
from RoboidControl.Participant import Participant
from RoboidControl.Thing import Thing
from RoboidControl.Messages.ParticipantMsg import ParticipantMsg
from RoboidControl.Messages.NetworkIdMsg import NetworkIdMsg
from RoboidControl.Messages.ThingMsg import ThingMsg
from RoboidControl.Messages.NameMsg import NameMsg
from RoboidControl.Messages.ModelUrlMsg import ModelUrlMsg
from RoboidControl.Messages.Messages import IMessage
# from RoboidControl.Messages.ParticipantMsg import ParticipantMsg
# from RoboidControl.Messages.NetworkIdMsg import NetworkIdMsg
# from RoboidControl.Messages.ThingMsg import ThingMsg
# from RoboidControl.Messages.NameMsg import NameMsg
# from RoboidControl.Messages.ModelUrlMsg import ModelUrlMsg
# from RoboidControl.Messages.Messages import IMessage
# from RoboidControl.Messages.InvestigateMsg import InvestigateMsg
# from RoboidControl.Messages.PoseMsg import PoseMsg
from RoboidControl.Messages import *
import socket
import threading
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
# import sys
# micropython = 'micropython' in sys.modules
@ -20,39 +24,44 @@ from typing import Optional
# from MicroPython.uPythonParticipant import uPythonParticipant
class ParticipantUDP(Participant):
"""! A local participant is the local device which can communicate with other participants.
"""! A participant using UDP communication
It manages all local things and communcation with other participants. Each application has a local participant which is usually explicit in the code.
An participant can be isolated. In that case it is standalong and does not communicate with other participants.
A local participant is the local device which can communicate with
other participants It manages all local things and communcation with other
participants. Each application has a local participant which is usually
explicit in the code. An participant can be isolated. In that case it is
standalong and does not communicate with other participants.
Currently, only UDP communication is supported
It is possible to work with an hidden participant by creating things without
specifying a participant in the constructor. In that case an hidden isolated
participant is created which can be obtained using
RoboidControl::IsolatedParticipant::Isolated().
@sa RoboidControl::Thing::Thing()
"""
#buffer = None
nextPublishMe = 0
#others = None
#thread = None
name = "Participant"
isolated_participant = None
#region Init
def __init__(self,
port: int = 7681,
ip_address: Optional[str] = None,
local_port: int = 7681
) -> None:
"""! Create a participant
@param ipAddress The IP address of the site
@param port The port used by the site
@param localPort The port used by the local participant
"""
super().__init__(ip_address = "127.0.0.1", port = local_port)
# if local_port == 0:
# local_port = port
# self.local_port = local_port
## True if the participant is running isolated.
# Isolated participants do not communicate with other participants
self.is_isolated: bool = True
## The remote site when this participant is connected to a site
self.remote_site: Optional[Participant] = None
## The other participants communicating with this participant
self.others: list[Participant] = []
## The interval in milliseconds for publishing (broadcasting) data on the local network
self.publishInterval = 3000 # 3 seconds
if port != 0:
self.is_isolated = False
@ -61,7 +70,6 @@ class ParticipantUDP(Participant):
self.others.append(self.remote_site)
self.buffer: bytearray = bytearray(256)
self.publishInterval = 3000 # 3 seconds
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
@ -77,7 +85,9 @@ class ParticipantUDP(Participant):
ParticipantUDP.isolated_participant = ParticipantUDP(0)
return ParticipantUDP.isolated_participant
#region Update
#endregion Init
isolated_participant = None
def GetParticipant(self, ip_address: str, port: int):
# print(f'{self.name} Get participant {ip_address} {port}')
@ -95,6 +105,8 @@ class ParticipantUDP(Participant):
self.others.append(remote_participant)
return remote_participant
#region Update
def Update(self, currentTimeMs: Optional[int] = None):
if currentTimeMs is None:
currentTimeMs = int(time.time() * 1000)
@ -111,6 +123,8 @@ class ParticipantUDP(Participant):
self.UpdateMyThings(currentTimeMs)
nextPublishMe = 0
def UpdateMyThings(self, currentTimeMs: int):
for thing in self.things:
# if thing.hierarchyChanged and not (self.isIsolated or self.network_id == ):
@ -151,6 +165,12 @@ class ParticipantUDP(Participant):
for child in thing.children:
self.SendThingInfo(owner, child, recursively)
def PublishThingInfo(self, thing: Thing):
self.Publish(ThingMsg(self.network_id, thing))
self.Publish(NameMsg(self.network_id, thing))
self.Publish(ModelUrlMsg(self.network_id, thing))
self.Publish(PoseMsg(self.network_id, thing))
self.Publish(BinaryMsg(self.network_id, thing))
def Send(self, owner: Participant, msg: IMessage):
buffer_size = msg.Serialize([self.buffer])
@ -212,30 +232,51 @@ class ParticipantUDP(Participant):
pass
def ProcessParticipantMsg(self, sender: Participant, msg: ParticipantMsg):
pass
logger.debug(f'{self.name} Process participantMsg {msg.networkId}')
def ProcessSiteIdMsg(self, sender: Participant, msg: NetworkIdMsg):
print(f'{self.name} Process SiteMsg {self.network_id} -> {msg.network_id}')
logger.debug(f'{self.name} Process NetworkIdMsg {self.network_id} -> {msg.network_id}')
if self.network_id != msg.network_id:
self.network_id = msg.network_id
for thing in self.things:
#if thing.parent is None:
self.SendThingInfo(sender, thing, recursively=True)
def ProcessInvestigateMsg(self, data: bytearray):
pass
def ProcessInvestigateMsg(self, sender: Participant, msg: InvestigateMsg):
logger.debug(f'{self.name} Process InestigateMsg [{msg.networkId}/{msg.thingId}]')
def ProcessThingMsg(self, msg: ThingMsg):
print(f'received thing {msg.thing_id}')
logger.debug(f'{self.name}: Process ThingMsg [{msg.networkId}/{msg.thingId}] {msg.thingType} {msg.parentId}')
def ProcessNameMsg(self, msg: NameMsg):
print(f'received name {msg.name}')
logger.debug(f'{self.name}: Process NameMsg [{msg.networkId}/{msg.thingId}] {msg.nameLength} {msg.name}')
def ProcessModelUrlMsg(self, msg: ModelUrlMsg):
print(f'received model url: {msg.url}')
logger.debug(f'{self.name}: Process ModelUrlMsg [{msg.networkId}/{msg.thingId}] {msg.urlLength} {msg.url}')
def ProcessPoseMsg(self, sender: Participant, msg: PoseMsg):
logger.debug(f'{self.name}: Process PoseMsg [{msg.networkId}/{msg.thingId}] {msg.poseType}')
owner: Optional[Participant] = Participant.GetParticipant(msg.network_id)
if owner is None:
return
thing: Optional[Thing] = self.Get(msg.thing_id)
if thing is None:
return
if (msg.pose_type & PoseMsg.Position) != 0:
thing.position = msg.position
if (msg.pose_type & PoseMsg.Orientation) != 0:
thing.orientation = msg.orientation
if (msg.pose_type & PoseMsg.LinearVelocity) != 0:
thing.linear_velocity = msg.linear_velocity
if (msg.pose_type & PoseMsg.AngularVelocity) != 0:
thing.angular_velocity = msg.angular_velocity
def ProcessBinaryMsg(self, msg: BinaryMsg):
# print('received binary data')
logger.debug(f'{self.name}: Process BinaryMsg [{msg.networkId}/{msg.thingId}] {msg.dataLength}')
thing: Optional[Thing] = self.Get(msg.thing_id)
if thing is not None:
thing.ProcessBinary(msg.data)

View File

@ -1,11 +1,11 @@
from RoboidControl.ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.ParticipantUDP import ParticipantUDP
from RoboidControl.Messages.ParticipantMsg import ParticipantMsg
from RoboidControl.Messages.NetworkIdMsg import NetworkIdMsg
from RoboidControl.Messages.ThingMsg import ThingMsg
from RoboidControl.Things.TemperatureSensor import TemperatureSensor
from RoboidControl.Thing import Thing
import socket
import threading
from typing import Optional
class SiteServer(ParticipantUDP):
"""! A site server is a participant which provides a shared simulated environment
@ -13,18 +13,41 @@ class SiteServer(ParticipantUDP):
name = "Site Server"
def __init__(self, port=7681):
#region Init
def __init__(self, port: int = 7681):
"""! Create a new site server
@param port The UDP port on which communication is received
@param port The port of which to receive the messages
"""
super().__init__(ip_address = "127.0.0.1", port = port)
self.isolated = False # site servers are never isolated
self.publishInterval = 0
#endregion Init
#region Update
#endregion Update
#region Receive
def ProcessParticipantMsg(self, sender, msg):
print(f'{self.name} received Participant ')
ParticipantUDP.ProcessParticipantMsg(sender, msg)
if msg.network_id != sender.network_id:
self.Send(sender, sender.network_id)
def ProcessNetworkId(self, msg):
def ProcessNetworkIdMsg(self, sender, msg):
pass
def ProcessThingMsg(self, sender, msg: ThingMsg):
thing: Optional[Thing] = sender.Get(msg.thing_id)
if thing is None:
Thing(sender, msg.thing_type, msg.thing_id)
if msg.parent_id != 0:
thing.parent = sender.Get(msg.parent_id)
if thing.parent is not None:
print(f'Could not find parent [{msg.network_id}/{msg.parent_id}]')
else:
thing.parent = None
#endregion Receive

View File

@ -1,4 +1,4 @@
from Participant import Participant
# from RoboidControl.Participant import Participant
from LinearAlgebra.Spherical import Spherical
from LinearAlgebra.Quaternion import Quaternion
@ -35,7 +35,7 @@ class Thing:
# region Init
def __init__(self, owner: Optional[Participant] = None, parent: Optional['Thing'] = None, thing_type: int = Type.Undetermined, thing_id: int = 0) -> None:
def __init__(self, owner: Optional['Participant'] = None, parent: Optional['Thing'] = None, thing_type: int = Type.Undetermined, thing_id: int = 0) -> None:
"""! Create a new thing
@param owner The owning participant
@param parent The parent thing (will override owner if set)
@ -43,7 +43,7 @@ class Thing:
@param thingId The ID of the thing, leave out or set to zero to generate an ID
"""
## The participant owning this thing
self.owner: Optional[Participant] = None
self.owner: Optional['Participant'] = None
## The ID of the thing
self.id: int = thing_id
## The type of the thing
@ -57,7 +57,7 @@ class Thing:
self.owner = parent.owner
self.SetParent(parent)
elif owner == None:
from RoboidControl.ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.ParticipantUDP import ParticipantUDP
self.owner = ParticipantUDP.Isolated()
else:
self.owner = owner

View File

@ -1,5 +1,5 @@
__all__ = ['Thing',
'ParticipantUDP']
from .ParticipantUDP import ParticipantUDP
from .Participants.ParticipantUDP import ParticipantUDP
from .Thing import Thing

View File

@ -5,7 +5,7 @@ sys.path.append(str(Path(__file__).resolve().parent.parent))
import time
from RoboidControl.Participants.SiteServer import SiteServer
from RoboidControl.ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.ParticipantUDP import ParticipantUDP
from RoboidControl.Thing import *
from RoboidControl.Things.DifferentialDrive import DifferentialDrive
from RoboidControl.Things.TouchSensor import TouchSensor

View File

@ -8,7 +8,7 @@ sys.path.append(str(Path(__file__).resolve().parent))
import unittest
from RoboidControl.Thing import Thing
from RoboidControl.ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.ParticipantUDP import ParticipantUDP
from RoboidControl.Participants.SiteServer import SiteServer
class ThingTest(unittest.TestCase):