diff --git a/Participant.py b/Participant.py index 522d828..9d1fc3a 100644 --- a/Participant.py +++ b/Participant.py @@ -14,51 +14,61 @@ from Thing import Thing class Participant: publishInterval = 3000 # 3 seconds - buffer = bytearray(256) - network_id = 0 + buffer = None + network_id = None nextPublishMe = 0 - others = [] + others = None thread = None + name = "Participant" - def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False): - # self.buffer = bytearray(256) + def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False, udp_socket =None): self.ip_address = ipAddress self.port = port - self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - # self.buffer = bytearray(256) + self.others = [] + self.network_id = 0 + self.buffer = bytearray(256) if remote == False: - self.udp_socket.bind(("0.0.0.0", 7681)) - # self.network_id = 0 - # self.nextPublishMe = 0 - # self.others = [] + self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.udp_socket.bind(("0.0.0.0", 0)) self.AddParticipant(self.ip_address, self.port) self.thread = threading.Thread(target = self.Receiver) self.thread.daemon = True self.thread.start() - + else: + self.udp_socket = udp_socket + +#region Update + def GetParticipant(self, ip_address, port): + # print(f'{self.name} Get participant {ip_address} {port}') + # for item in self.others: + # print(f'- {item.ip_address} {item.port}') + found_participants = (item for item in self.others if item.ip_address == ip_address and item.port == port) participant = next(found_participants, None) return participant def AddParticipant(self, ip_address, port): - remote_participant = Participant(ip_address, port, remote=True) + # print(f'{self.name} Add participant {ip_address} {port}') + remote_participant = Participant(ip_address, port, remote=True, udp_socket=self.udp_socket) remote_participant.network_id = len(self.others) self.others.append(remote_participant) return remote_participant def Update(self, currentTimeMs): - if (currentTimeMs > self.nextPublishMe): + if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe: self.Publish(ClientMsg(self.network_id)) - print(f'Sent ClientMsg {self.network_id}') - self.nextPublishMe = currentTimeMs + Participant.publishInterval + print(f'Publish ClientMsg {self.network_id}') + self.nextPublishMe = currentTimeMs + self.publishInterval Thing.UpdateAll(currentTimeMs) +#endregion + #region Send def SendThingInfo(self, thing): @@ -71,7 +81,7 @@ class Participant: if buffer_size <= 0: return True - # print(f'send {self.buffer[0]} to {self.ip_address} {self.port}') + # print(f'{self.name} send {self.buffer[0]} to {self.ip_address} {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port)) return True @@ -80,6 +90,7 @@ class Participant: if buffer_size <= 0: return True + # print(f'publish {self.buffer[0]} to {self.port}') self.udp_socket.sendto(self.buffer[:buffer_size], ('', self.port)) return True @@ -95,17 +106,18 @@ class Participant: # print(f'msg received from {remote_ip_address}:{remote_port}') remote_participant = self.GetParticipant(remote_ip_address, remote_port) if remote_participant is None: - remote_participant = self.AddParticipant(remote_ip_address, self.port) - remote_participant.ReceiveData(data) + # print(f'new participant') + remote_participant = self.AddParticipant(remote_ip_address, remote_port) + self.ReceiveData(data, remote_participant) - def ReceiveData(self, data): + def ReceiveData(self, data, remote_participant): msgId = data[0] # print(f'msg {msgId} ') match msgId: case ClientMsg.id: - self.ProcessClientMsg(ClientMsg(data)) + self.ProcessClientMsg(remote_participant, ClientMsg(data)) case NetworkIdMsg.id: - self.ProcessNetworkIdMsg(NetworkIdMsg(data)) + self.ProcessNetworkIdMsg(remote_participant, NetworkIdMsg(data)) # case InvestigateMsg.id: # self.ProcessInvestigateMsg(InvestigateMsg(data)) case ThingMsg.id: @@ -118,19 +130,16 @@ class Participant: # msg = Messages.BinaryMsg(data) # msg.thing.ProcessBinary(msg.data) - def ProcessClientMsg(self, msg: ClientMsg): - if msg.network_id == 0: - self.Send(NetworkIdMsg(self.network_id)) - print(f'############################ New Client -> {self.network_id}') - else: - print(f'############################ Client') + def ProcessClientMsg(self, sender, msg: ClientMsg): + pass - def ProcessNetworkIdMsg(self, msg: NetworkIdMsg): + def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg): + print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}') if self.network_id != msg.network_id: self.network_id = msg.network_id - print(f'receive network id {msg.network_id}') + # print(f'sending all things {len(Thing.allThings)}') for thing in Thing.allThings: - self.SendThingInfo(thing) + sender.SendThingInfo(thing) # self.Send(NameMsg(self.network_id, thing)) def ProcessInvestigateMsg(self, data: bytearray): diff --git a/SiteServer.py b/SiteServer.py index 3250f5c..7306f73 100644 --- a/SiteServer.py +++ b/SiteServer.py @@ -1,52 +1,46 @@ -from .Participant import Participant -from .Thing import Thing -from . import Messages +from Participant import Participant +from ClientMsg import ClientMsg +from NetworkIdMsg import NetworkIdMsg -import select +import socket +import threading -class SiteServer(Participant): - def __init__(self, ipAddress, port): - super().__init__(ipAddress, port) - self.udp_socket.setblocking(0) +class SiteServer(Participant): + name = "Site Server" - def Update(self, currentTime): - ready_to_read, _, _ = select.select([self.udp_socket], [], [], 0.1) # Timeout of 0.1 seconds - while ready_to_read: - data, addr = self.udp_socket.recvfrom(1024) - self.ReceiveData(data) - ready_to_read, _, _ = select.select([self.udp_socket], [], [], 0.1) # Timeout of 0.1 seconds + def __init__(self, ip_address="0.0.0.0", port=7681, remote=False, udp_socket = None): + self.ip_address = ip_address + self.port = port + self.publishInterval = 0 + self.others = [] + self.network_id = 0 + self.buffer = bytearray(256) - return super().Update(currentTime) + if remote == False: + self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.udp_socket.bind(("0.0.0.0", 7681)) + self.AddParticipant(self.ip_address, self.port) + + self.thread = threading.Thread(target = self.Receiver) + self.thread.daemon = True + self.thread.start() + else: + self.udp_socket = udp_socket + + def AddParticipant(self, ip_address, port): + # print(f'{self.name} Add site participant {ip_address} {port}') + remote_participant = SiteServer(ip_address, port, remote=True, udp_socket=self.udp_socket) + remote_participant.network_id = len(self.others) + self.others.append(remote_participant) + return remote_participant + + def ProcessClientMsg(self, sender, msg): + if msg.network_id == 0: + sender.Send(NetworkIdMsg(sender.network_id)) + print(f'{self.name} New Client -> {sender.network_id}') + # else: + # print(f'{self.name} Client') - def ProcessNetworkIdMsg(self, thing_msg): - self.network_id = thing_msg.network_id - # HACK: send the root things first - for thing in Thing.allThings: - if thing is not None and thing.parent_id == 0: - self.SendThingInfo(thing) - # then sent the rest - for thing in Thing.allThings: - if thing is not None and thing.parent_id != 0: - self.SendThingInfo(thing) - - def SendThingInfo(self, thing, recurse = False): - if thing is None: - return - - thing_msg = Messages.ThingMsg(self.network_id, thing) - thing_msg.SendTo(self) - name_msg = Messages.NameMsg(self.network_id, thing) - name_msg.SendTo(self) - model_msg = Messages.ModelUrlMsg(self.network_id, thing) - model_msg.SendTo(self) - pose_msg = Messages.PoseMsg(self.network_id, thing) - pose_msg.SendTo(self) - - if recurse: - for child in thing.children: - self.SendThingInfo(child, True) - - def ProcessInvestigateMsg(self, msg: Messages.InvestigateMsg): - thing = Thing.Get(msg.network_id, msg.thing_id) - if thing is not None: - self.SendThingInfo(thing) + def ProcessNetworkId(self, msg): + pass \ No newline at end of file diff --git a/test/thing_test.py b/test/thing_test.py index 3b531a5..f99102a 100644 --- a/test/thing_test.py +++ b/test/thing_test.py @@ -3,19 +3,43 @@ import time from Thing import Thing from Participant import Participant +from SiteServer import SiteServer class ThingTest(unittest.TestCase): - def test_client_msg(self): + def test_participant(self): participant: Participant = Participant(ipAddress="127.0.0.1", port=7681) + milliseconds = time.time() * 1000 - start_time = milliseconds while milliseconds < start_time + 5000: milliseconds = time.time() * 1000 participant.Update(milliseconds) + def test_site_server(self): + site = SiteServer(port=7681) + milliseconds = time.time() * 1000 + + start_time = milliseconds + while milliseconds < start_time + 5000: + milliseconds = time.time() * 1000 + site.Update(milliseconds) + + def test_site_participant(self): + site = SiteServer(7681) + participant = Participant("127.0.0.1", 7681) + + milliseconds = time.time() * 1000 + start_time = milliseconds + while milliseconds < start_time + 7000: + milliseconds = time.time() * 1000 + site.Update(milliseconds) + participant.Update(milliseconds) + time.sleep(0.1) + def test_thing_msg(self): + site = SiteServer() + participant = Participant("127.0.0.1") thing = Thing() thing.name = "First thing" @@ -25,6 +49,7 @@ class ThingTest(unittest.TestCase): start_time = milliseconds while milliseconds < start_time + 7000: milliseconds = time.time() * 1000 + site.Update(milliseconds) participant.Update(milliseconds) if __name__ == '__main__':