import socket import threading from Messages.ParticipantMsg import ParticipantMsg from Messages.NetworkIdMsg import NetworkIdMsg from Messages.ThingMsg import ThingMsg from Messages.NameMsg import NameMsg from Messages.ModelUrlMsg import ModelUrlMsg from Messages import * from Thing import Thing import sys micropython = 'micropython' in sys.modules if micropython: from MicroPython.uPythonParticipant import Bla class Participant: """! A participant is used for communcation between things. Currently, only UDP communication is supported """ publishInterval = 3000 # 3 seconds buffer = None ## The network ID of the participant network_id = None nextPublishMe = 0 others = None thread = None name = "Participant" def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False, udp_socket =None): self.ip_address = ipAddress self.port = port self.others = [] self.network_id = 0 self.buffer = bytearray(256) self.thing_msg_processors = {} self.new_thing_handlers = [] if remote == False: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.bind(("0.0.0.0", port)) self.AddParticipant(self.ip_address, self.port) self.thread = threading.Thread(target = self.Receiver) self.thread.daemon = True self.thread.start() else: self.udp_socket = udp_socket #region Update def GetParticipant(self, ip_address, port): # print(f'{self.name} Get participant {ip_address} {port}') # for item in self.others: # print(f'- {item.ip_address} {item.port}') found_participants = (item for item in self.others if item.ip_address == ip_address and item.port == port) participant = next(found_participants, None) return participant def AddParticipant(self, ip_address, port): # print(f'{self.name} Add participant {ip_address} {port}') remote_participant = Participant(ip_address, port, remote=True, udp_socket=self.udp_socket) remote_participant.network_id = len(self.others) self.others.append(remote_participant) return remote_participant def Update(self, currentTimeMs = None): if currentTimeMs is None: currentTimeMs = time.time() * 1000 if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe: self.Publish(ParticipantMsg(self.network_id)) print(f'Publish ClientMsg {self.network_id}') self.nextPublishMe = currentTimeMs + self.publishInterval Thing.UpdateAll(currentTimeMs) #endregion #region Send def SendThingInfo(self, thing): self.Send(ThingMsg(self.network_id, thing)) self.Send(NameMsg(self.network_id, thing)) self.Send(ModelUrlMsg(self.network_id, thing)) def Send(self, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True # print(f'{self.name} send {self.buffer[0]} to {self.ip_address} {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port)) return True def Publish(self, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True # print(f'publish {self.buffer[0]} to {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], ('', self.port)) return True #endregion #region Receive def Receiver(self): while True: data, addr = self.udp_socket.recvfrom(1024) remote_ip_address = addr[0] remote_port = addr[1] # print(f'msg received from {remote_ip_address}:{remote_port}') remote_participant = self.GetParticipant(remote_ip_address, remote_port) if remote_participant is None: # print(f'new participant') remote_participant = self.AddParticipant(remote_ip_address, remote_port) self.ReceiveData(data, remote_participant) def ReceiveData(self, data, remote_participant): msgId = data[0] # print(f'msg {msgId} ') match msgId: case ParticipantMsg.id: self.ProcessClientMsg(remote_participant, ParticipantMsg(data)) case NetworkIdMsg.id: self.ProcessNetworkIdMsg(remote_participant, NetworkIdMsg(data)) # case InvestigateMsg.id: # self.ProcessInvestigateMsg(InvestigateMsg(data)) case ThingMsg.id: self.ProcessThingMsg(ThingMsg(data)) case NameMsg.id: self.ProcessNameMsg(NameMsg(data)) case ModelUrlMsg.id: self.ProcessModelUrlMsg(ModelUrlMsg(data)) # case Messages.BinaryMsg.id: # msg = Messages.BinaryMsg(data) # msg.thing.ProcessBinary(msg.data) def ProcessClientMsg(self, sender, msg: ParticipantMsg): pass def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg): print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}') if self.network_id != msg.network_id: self.network_id = msg.network_id # print(f'sending all things {len(Thing.allThings)}') for thing in Thing.allThings: sender.SendThingInfo(thing) # self.Send(NameMsg(self.network_id, thing)) def ProcessInvestigateMsg(self, data: bytearray): pass def ProcessThingMsg(self, msg: ThingMsg): print(f'received thing {msg.network_id} {msg.thing_id}') if msg.thing_type in self.thing_msg_processors: constructor = self.thing_msg_processors[msg.thing_type] constructor(msg.network_id, msg.thing_id) # not really 'new' thing, but it is a start thing = Thing.Get(msg.network_id, msg.thing_id) if thing is not None: for handler in self.new_thing_handlers: handler(thing) def ProcessNameMsg(self, msg: NameMsg): print(f'received name {msg.name}') def ProcessModelUrlMsg(self, msg: ModelUrlMsg): print(f'received model url: {msg.url}') def ProcessBinary(self, msg: BinaryMsg): print('received binary data') if msg.thing != None: msg.thing.ProcessBinary(msg.data) def Register(self, constructor, thing_type): self.thing_msg_processors[thing_type] = constructor def OnNewThing(self, event_handler): self.new_thing_handlers.append(event_handler) def OnNewThingType(self, thing_type, event_handler): def ConditionalHandler(thing): if thing.type == thing_type: event_handler(thing) self.new_thing_handlers.append(ConditionalHandler) #endregion