# from .Participant import Participant from . import LowLevelMessages from .Thing import Thing class IMessage: id = 0x00 def Serialize(buffer): return 0 def SendTo(self, participant): buffer_size = self.Serialize([participant.buffer]) if buffer_size == 0: return False return participant.SendBuffer(buffer_size) def Publish(self, participant): bufferSize = self.Serialize([participant.buffer]) if bufferSize == 0: return False return participant.PublishBuffer(bufferSize) class ClientMsg(IMessage): id = 0xA0 length = 2 def __init__(self, network_id): if isinstance(network_id, int): self.network_id = network_id elif isinstance(network_id, bytes): self.network_id = network_id[1] def Serialize(self, buffer_ref): if self.network_id is None: return 0 buffer: bytearray = buffer_ref[0] buffer[0:ClientMsg.length] = [ ClientMsg.id, self.network_id ] return ClientMsg.length class NetworkIdMsg(IMessage): id = 0xA1 length = 2 def __init__(self, network_id): # self.network_id = None # if isinstance(network_id, int): # self.network_id = network_id # elif isinstance(network_id, bytes): self.network_id = network_id[1] # def Serialize(self, buffer_ref): # if self.network_id is None: # return 0 # buffer: bytearray = buffer_ref[0] # buffer[0:NetworkIdMsg.length] = [ # NetworkIdMsg.id, # self.network_id # ] # return NetworkIdMsg.length def SendTo(participant, network_id): if network_id is None: return 0 participant.buffer[0:2] = [ NetworkIdMsg.id, network_id ] return NetworkIdMsg.length class InvestigateMsg(): id = 0x81 length = 3 def __init__(self, buffer): self.network_id = buffer[1] self.thing_id = buffer[2] class ThingMsg(IMessage): id = 0x80 length = 5 def __init__(self, network_id, thing): self.network_id = network_id self.thing = thing def Serialize(self, buffer_ref): if self.network_id is None or self.thing is None: return 0 buffer: bytearray = buffer_ref[0] buffer[0:ThingMsg.length] = [ ThingMsg.id, self.network_id, self.thing.id, self.thing.type, self.thing.parent_id ] return ThingMsg.length class NameMsg(IMessage): id = 0x91 length = 4 def __init__(self, network_id, thing): self.network_id = network_id self.thing = thing def Serialize(self, buffer_ref): if self.network_id is None or self.thing is None or self.thing.name is None: return 0 buffer: bytearray = buffer_ref[0] encoded_name = self.thing.name.encode('utf-8') name_length = len(encoded_name) full_length = NameMsg.length + name_length if name_length == 0 or full_length > len(buffer): return 0 buffer[0:NameMsg.length] = [ NameMsg.id, self.network_id, self.thing.id, name_length ] # Append the name string buffer[NameMsg.length:full_length] = encoded_name return full_length class ModelUrlMsg(IMessage): id = 0x90 length = 6 def __init__(self, network_id, thing): self.network_id = network_id self.thing = thing def Serialize(self, buffer_ref): if self.network_id is None or self.thing is None or self.thing.model_url is None: return 0 buffer: bytearray = buffer_ref[0] encoded_url = self.thing.model_url.encode('utf-8') url_length = len(encoded_url) full_length = ModelUrlMsg.length + url_length if url_length == 0 or full_length > len(buffer): return 0 buffer[0:ModelUrlMsg.length] = [ ModelUrlMsg.id, self.network_id, self.thing.id, 0x3D, # Dummy float16 value 1 0x00, # this field will disappear url_length ] # Append the url string buffer[ModelUrlMsg.length:full_length] = encoded_url return full_length class PoseMsg(IMessage): id = 0x10 length = 4 def __init__(self, network_id, thing): self.network_id = network_id self.thing = thing def Serialize(self, buffer_ref): if (self.network_id is None) or (self.thing is None): return 0 buffer: bytearray = buffer_ref[0] buffer[0:PoseMsg.length] = [ PoseMsg.id, self.network_id, self.thing.id, self.thing.pose_updated ] ix = [4] if self.thing.pose_updated & Thing.Position: LowLevelMessages.SendSpherical(buffer, ix, self.thing.position) if self.thing.pose_updated & Thing.Orientation: LowLevelMessages.SendQuat32(buffer, ix, self.thing.orientation) if self.thing.pose_updated & Thing.LinearVelocity: LowLevelMessages.SendSpherical(buffer, ix, self.thing.linearVelocity) if self.thing.pose_updated & Thing.AngularVelocity: LowLevelMessages.SendSpherical(buffer, ix, self.thing.angularVelocity) return ix[0] class BinaryMsg(): id = 0xB1 def __init__(self, buffer): self.network_id = buffer[1] self.thing_id = buffer[2] self.thing: Thing = Thing.Get(self.network_id, self.thing_id) self.data = buffer[3:] def SendTo(participant, thing, data: bytearray): length = 3 if thing.network_id is None or thing is None or data is None: return False participant.buffer[0:length] = [ BinaryMsg.id, participant.network_id, thing.id ] full_length = length + len(data) participant.buffer[length:full_length] = data participant.SendBuffer(full_length) return True