# from .Participant import Participant from . import LowLevelMessages from .Spherical import Spherical from .Quaternion import Quaternion class IMessage: id = 0x00 def Serialize(buffer): return 0 def SendTo(self, participant): bufferSize = self.Serialize([participant.buffer]) if bufferSize == 0: return False return participant.SendBuffer(bufferSize) def Publish(self, participant): bufferSize = self.Serialize([participant.buffer]) if bufferSize == 0: return False return participant.PublishBuffer(bufferSize) class ClientMsg(IMessage): id = 0xA0 length = 2 def __init__(self, networkId): if isinstance(networkId, int): self.networkId = networkId elif isinstance(networkId, bytes): self.networkId = networkId[1] def Serialize(self, buffer_ref): if self.networkId is None: return 0 buffer: bytearray = buffer_ref[0] buffer[0:2] = [ ClientMsg.id, self.networkId ] return ClientMsg.length class NetworkIdMsg(IMessage): id = 0xA1 length = 2 def __init__(self, networkId): self.networkId = None if isinstance(networkId, int): self.networkId = networkId elif isinstance(networkId, bytes): self.networkId = networkId[1] def Serialize(self, buffer_ref): if self.networkId is None: return 0 buffer: bytearray = buffer_ref[0] buffer[0:2] = [ NetworkIdMsg.id, self.networkId ] return NetworkIdMsg.length class ThingMsg(IMessage): id = 0x80 length = 5 def __init__(self, networkId, thing): self.networkId = networkId self.thingId = thing.id self.thingType = thing.type self.parentId = None def Serialize(self, buffer_ref): if self.networkId is None or self.thingId is None: return 0 buffer: bytearray = buffer_ref[0] buffer[0:5] = [ ThingMsg.id, self.networkId, self.thingId, 0x00, 0x00 ] return ThingMsg.length class ModelUrlMsg(IMessage): id = 0x90 length = 6 def __init__(self, networkId, thingId, url): self.networkId = networkId self.thingId = thingId self.url = url def Serialize(self, buffer): buffer[0:ModelUrlMsg.length] = [ ModelUrlMsg.id, self.networkId, self.thingId, 0x3D, # Dummy float16 value 1 0x00, len(self.url) ] # Append the url string fullLength = ModelUrlMsg.length + len(self.url) buffer[ModelUrlMsg.length:fullLength] = \ [ord(c) for c in self.url] return fullLength # ix = 0 # buffer[ix] = ModelUrlMsg.id # ix+=1 # buffer[ix] = self.networkId # ix+=1 # buffer[ix] = self.thingId # ix+=1 # buffer[ix] = 0x3D # Dummy float16 value 1 # ix+=1 # buffer[ix] = 0x00 # ix+=1 # buffer[ix] = len(self.url) # ix+=1 # for c in self.url: # buffer[ix] = ord(c) # ix+=1 # return ix class PoseMsg(IMessage): id = 0x10 length = 4 + 4 + 4 Position = 0x01 Orientation = 0x02 LinearVelocity = 0x04 AngularVelocity = 0x08 def __init__(self, networkId, thing, poseType): self.networkId = networkId self.thingId = thing.id poseType = 0x0F # unity server currently requires position and orientation self.poseType = poseType self.position = Spherical.zero self.orientation = Quaternion.identity self.linearVelocity = thing.linearVelocity self.angularVelocity = thing.angularVelocity def Serialize(self, buffer_ref): if (self.networkId is None) or (self.thingId is None): return 0 buffer: bytearray = buffer_ref[0] buffer[0:4] = [ PoseMsg.id, self.networkId, self.thingId, self.poseType ] ix = [4] if self.poseType & PoseMsg.Position: LowLevelMessages.SendSpherical(buffer, ix, self.position) if self.poseType & PoseMsg.Orientation: LowLevelMessages.SendQuat32(buffer, ix, self.orientation) if self.poseType & PoseMsg.LinearVelocity: LowLevelMessages.SendSpherical(buffer, ix, self.linearVelocity) if self.poseType & PoseMsg.AngularVelocity: LowLevelMessages.SendSpherical(buffer, ix, self.angularVelocity) return ix[0]