import socket import threading import time from .ClientMsg import ClientMsg from .NetworkIdMsg import NetworkIdMsg from .ThingMsg import ThingMsg from .NameMsg import NameMsg from .ModelUrlMsg import ModelUrlMsg from .BinaryMsg import BinaryMsg from .Thing import Thing ## A participant is device which can communicate with other participants # ## Currently only UDP communication is supported class Participant: publishInterval = 3000 # 3 seconds buffer = None network_id = None nextPublishMe = 0 others = None thread = None name = "Participant" def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False, udp_socket =None): self.ip_address = ipAddress self.port = port self.others = [] self.network_id = 0 self.buffer = bytearray(256) self.thing_msg_processors = {} self.new_thing_handlers = [] if remote == False: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.bind(("0.0.0.0", port)) self.AddParticipant(self.ip_address, self.port) self.thread = threading.Thread(target = self.Receiver) self.thread.daemon = True self.thread.start() else: self.udp_socket = udp_socket #region Update def GetParticipant(self, ip_address, port): # print(f'{self.name} Get participant {ip_address} {port}') # for item in self.others: # print(f'- {item.ip_address} {item.port}') found_participants = (item for item in self.others if item.ip_address == ip_address and item.port == port) participant = next(found_participants, None) return participant def AddParticipant(self, ip_address, port): # print(f'{self.name} Add participant {ip_address} {port}') remote_participant = Participant(ip_address, port, remote=True, udp_socket=self.udp_socket) remote_participant.network_id = len(self.others) self.others.append(remote_participant) return remote_participant def Update(self, currentTimeMs = None): if currentTimeMs is None: currentTimeMs = time.time() * 1000 if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe: self.Publish(ClientMsg(self.network_id)) print(f'Publish ClientMsg {self.network_id}') self.nextPublishMe = currentTimeMs + self.publishInterval Thing.UpdateAll(currentTimeMs) #endregion #region Send def SendThingInfo(self, thing): self.Send(ThingMsg(self.network_id, thing)) self.Send(NameMsg(self.network_id, thing)) self.Send(ModelUrlMsg(self.network_id, thing)) def Send(self, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True # print(f'{self.name} send {self.buffer[0]} to {self.ip_address} {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port)) return True def Publish(self, msg): buffer_size = msg.Serialize([self.buffer]) if buffer_size <= 0: return True # print(f'publish {self.buffer[0]} to {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], ('', self.port)) return True #endregion #region Receive def Receiver(self): while True: data, addr = self.udp_socket.recvfrom(1024) remote_ip_address = addr[0] remote_port = addr[1] # print(f'msg received from {remote_ip_address}:{remote_port}') remote_participant = self.GetParticipant(remote_ip_address, remote_port) if remote_participant is None: # print(f'new participant') remote_participant = self.AddParticipant(remote_ip_address, remote_port) self.ReceiveData(data, remote_participant) def ReceiveData(self, data, remote_participant): msgId = data[0] # print(f'msg {msgId} ') match msgId: case ClientMsg.id: self.ProcessClientMsg(remote_participant, ClientMsg(data)) case NetworkIdMsg.id: self.ProcessNetworkIdMsg(remote_participant, NetworkIdMsg(data)) # case InvestigateMsg.id: # self.ProcessInvestigateMsg(InvestigateMsg(data)) case ThingMsg.id: self.ProcessThingMsg(ThingMsg(data)) case NameMsg.id: self.ProcessNameMsg(NameMsg(data)) case ModelUrlMsg.id: self.ProcessModelUrlMsg(ModelUrlMsg(data)) case BinaryMsg.id: self.ProcessBinary(BinaryMsg(data)) # msg = BinaryMsg(data) # if msg.thing != None: # msg.thing.ProcessBinary(msg.data) def ProcessClientMsg(self, sender, msg: ClientMsg): pass def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg): print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}') if self.network_id != msg.network_id: self.network_id = msg.network_id # print(f'sending all things {len(Thing.allThings)}') for thing in Thing.allThings: sender.SendThingInfo(thing) # self.Send(NameMsg(self.network_id, thing)) def ProcessInvestigateMsg(self, data: bytearray): pass def ProcessThingMsg(self, msg: ThingMsg): print(f'received thing {msg.network_id} {msg.thing_id}') if msg.thing_type in self.thing_msg_processors: constructor = self.thing_msg_processors[msg.thing_type] constructor(msg.network_id, msg.thing_id) # not really 'new' thing, but it is a start thing = Thing.Get(msg.network_id, msg.thing_id) if thing is not None: for handler in self.new_thing_handlers: handler(thing) def ProcessNameMsg(self, msg: NameMsg): print(f'received name {msg.name}') def ProcessModelUrlMsg(self, msg: ModelUrlMsg): print(f'received model url: {msg.url}') def ProcessBinary(self, msg: BinaryMsg): print('received binary data') if msg.thing != None: msg.thing.ProcessBinary(msg.data) def Register(self, constructor, thing_type): self.thing_msg_processors[thing_type] = constructor def OnNewThing(self, event_handler): self.new_thing_handlers.append(event_handler) def OnNewThingType(self, thing_type, event_handler): def ConditionalHandler(thing): if thing.type == thing_type: event_handler(thing) self.new_thing_handlers.append(ConditionalHandler) #endregion