RoboidControl-python/Messages.py
2024-12-22 13:51:05 +01:00

211 lines
5.9 KiB
Python

# from .Participant import Participant
from . import LowLevelMessages
from .Thing import Thing
class IMessage:
id = 0x00
def Serialize(buffer):
return 0
def SendTo(self, participant):
buffer_size = self.Serialize([participant.buffer])
if buffer_size == 0:
return False
return participant.SendBuffer(buffer_size)
def Publish(self, participant):
bufferSize = self.Serialize([participant.buffer])
if bufferSize == 0:
return False
return participant.PublishBuffer(bufferSize)
class ClientMsg(IMessage):
id = 0xA0
length = 2
def __init__(self, network_id):
if isinstance(network_id, int):
self.network_id = network_id
elif isinstance(network_id, bytes):
self.network_id = network_id[1]
def Serialize(self, buffer_ref):
if self.network_id is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:ClientMsg.length] = [
ClientMsg.id,
self.network_id
]
return ClientMsg.length
class NetworkIdMsg(IMessage):
id = 0xA1
length = 2
def __init__(self, network_id):
self.network_id = None
if isinstance(network_id, int):
self.network_id = network_id
elif isinstance(network_id, bytes):
self.network_id = network_id[1]
def Serialize(self, buffer_ref):
if self.network_id is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:NetworkIdMsg.length] = [
NetworkIdMsg.id,
self.network_id
]
return NetworkIdMsg.length
class InvestigateMsg():
id = 0x81
length = 3
def __init__(self, buffer):
self.network_id = buffer[1]
self.thing_id = buffer[2]
class ThingMsg(IMessage):
id = 0x80
length = 5
def __init__(self, network_id, thing):
self.network_id = network_id
self.thing = thing
def Serialize(self, buffer_ref):
if self.network_id is None or self.thing is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:ThingMsg.length] = [
ThingMsg.id,
self.network_id,
self.thing.id,
self.thing.type,
self.thing.parent_id
]
return ThingMsg.length
class NameMsg(IMessage):
id = 0x91
length = 4
def __init__(self, network_id, thing):
self.network_id = network_id
self.thing = thing
def Serialize(self, buffer_ref):
if self.network_id is None or self.thing is None or self.thing.name is None:
return 0
buffer: bytearray = buffer_ref[0]
encoded_name = self.thing.name.encode('utf-8')
name_length = len(encoded_name)
full_length = NameMsg.length + name_length
if name_length == 0 or full_length > len(buffer):
return 0
buffer[0:NameMsg.length] = [
NameMsg.id,
self.network_id,
self.thing.id,
name_length
]
# Append the name string
buffer[NameMsg.length:full_length] = encoded_name
return full_length
class ModelUrlMsg(IMessage):
id = 0x90
length = 6
def __init__(self, network_id, thing):
self.network_id = network_id
self.thing = thing
def Serialize(self, buffer_ref):
if self.network_id is None or self.thing is None or self.thing.model_url is None:
return 0
buffer: bytearray = buffer_ref[0]
encoded_url = self.thing.model_url.encode('utf-8')
url_length = len(encoded_url)
full_length = ModelUrlMsg.length + url_length
if url_length == 0 or full_length > len(buffer):
return 0
buffer[0:ModelUrlMsg.length] = [
ModelUrlMsg.id,
self.network_id,
self.thing.id,
0x3D, # Dummy float16 value 1
0x00, # this field will disappear
url_length
]
# Append the url string
buffer[ModelUrlMsg.length:full_length] = encoded_url
return full_length
class PoseMsg(IMessage):
id = 0x10
length = 4
def __init__(self, network_id, thing):
self.network_id = network_id
self.thing = thing
def Serialize(self, buffer_ref):
if (self.network_id is None) or (self.thing is None):
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:PoseMsg.length] = [
PoseMsg.id,
self.network_id,
self.thing.id,
self.thing.pose_updated
]
ix = [4]
if self.thing.pose_updated & Thing.Position:
LowLevelMessages.SendSpherical(buffer, ix, self.thing.position)
if self.thing.pose_updated & Thing.Orientation:
LowLevelMessages.SendQuat32(buffer, ix, self.thing.orientation)
if self.thing.pose_updated & Thing.LinearVelocity:
LowLevelMessages.SendSpherical(buffer, ix, self.thing.linearVelocity)
if self.thing.pose_updated & Thing.AngularVelocity:
LowLevelMessages.SendSpherical(buffer, ix, self.thing.angularVelocity)
return ix[0]
class BinaryMsg():
id = 0xB1
def __init__(self, buffer):
self.network_id = buffer[1]
self.thing_id = buffer[2]
self.thing: Thing = Thing.Get(self.network_id, self.thing_id)
self.data = buffer[3:]
def SendTo(participant, thing, data: bytearray):
length = 3
if thing.network_id is None or thing is None or data is None:
return False
participant.buffer[0:length] = [
id,
participant.network_id,
thing.id
]
full_length = length + len(data)
participant.buffer[length:full_length] = data
participant.SendBuffer(full_length)
return True