RoboidControl-python/Messages.py
2024-12-21 14:30:07 +01:00

173 lines
4.6 KiB
Python

# from .Participant import Participant
from . import LowLevelMessages
from .Spherical import Spherical
from .Quaternion import Quaternion
class IMessage:
id = 0x00
def Serialize(buffer):
return 0
def SendTo(self, participant):
bufferSize = self.Serialize([participant.buffer])
if bufferSize == 0:
return False
return participant.SendBuffer(bufferSize)
def Publish(self, participant):
bufferSize = self.Serialize([participant.buffer])
if bufferSize == 0:
return False
return participant.PublishBuffer(bufferSize)
class ClientMsg(IMessage):
id = 0xA0
length = 2
def __init__(self, networkId):
if isinstance(networkId, int):
self.networkId = networkId
elif isinstance(networkId, bytes):
self.networkId = networkId[1]
def Serialize(self, buffer_ref):
if self.networkId is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:2] = [
ClientMsg.id,
self.networkId
]
return ClientMsg.length
class NetworkIdMsg(IMessage):
id = 0xA1
length = 2
def __init__(self, networkId):
self.networkId = None
if isinstance(networkId, int):
self.networkId = networkId
elif isinstance(networkId, bytes):
self.networkId = networkId[1]
def Serialize(self, buffer_ref):
if self.networkId is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:2] = [
NetworkIdMsg.id,
self.networkId
]
return NetworkIdMsg.length
class ThingMsg(IMessage):
id = 0x80
length = 5
def __init__(self, networkId, thing):
self.networkId = networkId
self.thingId = thing.id
self.thingType = thing.type
self.parentId = None
def Serialize(self, buffer_ref):
if self.networkId is None or self.thingId is None:
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:5] = [
ThingMsg.id,
self.networkId,
self.thingId,
0x00,
0x00
]
return ThingMsg.length
class ModelUrlMsg(IMessage):
id = 0x90
length = 6
def __init__(self, networkId, thingId, url):
self.networkId = networkId
self.thingId = thingId
self.url = url
def Serialize(self, buffer):
buffer[0:ModelUrlMsg.length] = [
ModelUrlMsg.id,
self.networkId,
self.thingId,
0x3D, # Dummy float16 value 1
0x00,
len(self.url)
]
# Append the url string
fullLength = ModelUrlMsg.length + len(self.url)
buffer[ModelUrlMsg.length:fullLength] = \
[ord(c) for c in self.url]
return fullLength
# ix = 0
# buffer[ix] = ModelUrlMsg.id
# ix+=1
# buffer[ix] = self.networkId
# ix+=1
# buffer[ix] = self.thingId
# ix+=1
# buffer[ix] = 0x3D # Dummy float16 value 1
# ix+=1
# buffer[ix] = 0x00
# ix+=1
# buffer[ix] = len(self.url)
# ix+=1
# for c in self.url:
# buffer[ix] = ord(c)
# ix+=1
# return ix
class PoseMsg(IMessage):
id = 0x10
length = 4 + 4 + 4
Position = 0x01
Orientation = 0x02
LinearVelocity = 0x04
AngularVelocity = 0x08
def __init__(self, networkId, thing, poseType):
self.networkId = networkId
self.thingId = thing.id
self.poseType = poseType
self.position = Spherical.zero
self.orientation = Quaternion.identity
self.linearVelocity = thing.linearVelocity
self.angularVelocity = thing.angularVelocity
def Serialize(self, buffer_ref):
if (self.networkId is None) or (self.thingId is None):
return 0
buffer: bytearray = buffer_ref[0]
buffer[0:4] = [
PoseMsg.id,
self.networkId,
self.thingId,
self.poseType
]
ix = [4]
if self.poseType & PoseMsg.Position:
LowLevelMessages.SendSpherical(buffer, ix, self.position)
if self.poseType & PoseMsg.Orientation:
LowLevelMessages.SendQuat32(buffer, ix, self.orientation)
if self.poseType & PoseMsg.LinearVelocity:
LowLevelMessages.SendSpherical(buffer, ix, self.linearVelocity)
if self.poseType & PoseMsg.AngularVelocity:
LowLevelMessages.SendSpherical(buffer, ix, self.angularVelocity)
return ix[0]