import socket import threading import time from Participant import Participant from Thing import Thing from Messages.ParticipantMsg import ParticipantMsg from Messages.NetworkIdMsg import NetworkIdMsg from Messages.ThingMsg import ThingMsg from Messages.NameMsg import NameMsg from Messages.ModelUrlMsg import ModelUrlMsg from Messages import * import sys micropython = 'micropython' in sys.modules if micropython: from MicroPython.uPythonParticipant import Bla class LocalParticipant(Participant): """! A local participant is the local device which can communicate with other participants. It manages all local things and communcation with other participants. Each application has a local participant which is usually explicit in the code. An participant can be isolated. In that case it is standalong and does not communicate with other participants. Currently, only UDP communication is supported """ buffer = None ## The network ID of the participant network_id = None nextPublishMe = 0 others = None thread = None name = "Participant" def __init__(self, port=7681, ip_address="0.0.0.0", local_port=0): #, remote=False, udp_socket = None): super().__init__(ip_address = "0.0.0.0", port = port) if local_port == 0: local_port = port self.local_port = local_port ## True if the participant is running isolated. # Isolated participants do not communicate with other participants self.isolated = True ## The remote site when this participant is connected to a site self.remote_site = None if self.port != 0: self.isolated = False self.remote_site = Participant(ip_address, port) self.others = [] self.network_id = 0 self.buffer = bytearray(256) self.thing_msg_processors = {} self.new_thing_handlers = [] self.publishInterval = 3000 # 3 seconds #if remote == False: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.bind(("0.0.0.0", self.local_port)) self.thread = threading.Thread(target = self.Receiver) self.thread.daemon = True self.thread.start() # else: # self.udp_socket = udp_socket #region Update def GetParticipant(self, ip_address, port): # print(f'{self.name} Get participant {ip_address} {port}') # for item in self.others: # print(f'- {item.ip_address} {item.port}') found_participants = (item for item in self.others if item.ip_address == ip_address and item.port == port) participant = next(found_participants, None) return participant def AddParticipant(self, ip_address, port): print(f'{self.name} Add participant {ip_address} {port}') remote_participant = Participant(ip_address = ip_address, port = port) remote_participant.network_id = len(self.others) self.others.append(remote_participant) return remote_participant def Update(self, currentTimeMs = None): if currentTimeMs is None: currentTimeMs = time.time() * 1000 if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe: msg = ParticipantMsg(self.network_id) if self.remote_site is None: self.Publish(msg) else: self.Send(self.remote_site, msg) print(f'Publish ParticipantMsg {self.network_id}') self.nextPublishMe = currentTimeMs + self.publishInterval for thing in self.things: thing.Update(currentTimeMs) # pose_msg = PoseMsg(self.network_id, thing) # for other in self.others: # self.Send(other, pose_msg) super().Update(currentTimeMs) #endregion #region Send def SendThingInfo(self, owner, thing, recursively: bool = False): self.Send(owner, ThingMsg(self.network_id, thing)) self.Send(owner, NameMsg(self.network_id, thing)) self.Send(owner, ModelUrlMsg(self.network_id, thing)) self.Send(owner, PoseMsg(self.network_id, thing, True)) self.Send(owner, BinaryMsg(self.network_id, thing)) if recursively: for child in thing.children: self.SendThingInfo(owner, child, recursively) def Send(self, owner, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True print(f'{self.name} send {self.buffer[0]} to {owner.ip_address} {owner.port}') self.udp_socket.sendto(self.buffer[:buffer_size], (owner.ip_address, owner.port)) return True def Publish(self, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True # print(f'publish {self.buffer[0]} to {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], ('', self.port)) return True #endregion #region Receive def Receiver(self): while True: data, addr = self.udp_socket.recvfrom(1024) remote_ip_address = addr[0] remote_port = addr[1] # print(f'msg received from {remote_ip_address}:{remote_port}') remote_participant = self.GetParticipant(remote_ip_address, remote_port) if remote_participant is None: # print(f'new participant') remote_participant = self.AddParticipant(remote_ip_address, remote_port) self.ReceiveData(data, remote_participant) def ReceiveData(self, data, sender): msgId = data[0] # print(f'msg {msgId} ') match msgId: case ParticipantMsg.id: self.ProcessParticipantMsg(sender, ParticipantMsg(data)) case NetworkIdMsg.id: self.ProcessNetworkIdMsg(sender, NetworkIdMsg(data)) # case InvestigateMsg.id: # self.ProcessInvestigateMsg(InvestigateMsg(data)) case ThingMsg.id: self.ProcessThingMsg(ThingMsg(data)) case NameMsg.id: self.ProcessNameMsg(NameMsg(data)) case ModelUrlMsg.id: self.ProcessModelUrlMsg(ModelUrlMsg(data)) case BinaryMsg.id: self.ProcessBinaryMsg(BinaryMsg(data)) def ProcessParticipantMsg(self, sender, msg: ParticipantMsg): pass def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg): print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}') if self.network_id != msg.network_id: self.network_id = msg.network_id print(f'sending all things {len(self.things)}') for thing in self.things: if thing.parent is None: self.SendThingInfo(sender, thing, recursively=True) # self.Send(NameMsg(self.network_id, thing)) def ProcessInvestigateMsg(self, data: bytearray): pass def ProcessThingMsg(self, msg: ThingMsg): print(f'received thing {msg.network_id} {msg.thing_id}') if msg.thing_type in self.thing_msg_processors: constructor = self.thing_msg_processors[msg.thing_type] constructor(msg.network_id, msg.thing_id) # not really 'new' thing, but it is a start thing = Thing.Get(msg.network_id, msg.thing_id) if thing is not None: for handler in self.new_thing_handlers: handler(thing) def ProcessNameMsg(self, msg: NameMsg): print(f'received name {msg.name}') def ProcessModelUrlMsg(self, msg: ModelUrlMsg): print(f'received model url: {msg.url}') def ProcessBinaryMsg(self, msg: BinaryMsg): print('received binary data') thing: Thing = self.Get(msg.network_id, msg.thing_id) if thing != None: thing.ProcessBinary(msg.data) def Register(self, constructor, thing_type): self.thing_msg_processors[thing_type] = constructor def OnNewThing(self, event_handler): self.new_thing_handlers.append(event_handler) def OnNewThingType(self, thing_type, event_handler): def ConditionalHandler(thing): if thing.type == thing_type: event_handler(thing) self.new_thing_handlers.append(ConditionalHandler) #endregion